You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/21 09:32:09 UTC

svn commit: r1765973 [4/7] - in /qpid/java/branches/transfer-queue: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ bdbstore/src/test/java/org/apache/qpid/server/stor...

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,713 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+import java.security.cert.Certificate;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.util.SystemUtils;
+
+public abstract class NonBlockingConnection implements SchedulableConnection
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+
+    final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
+    private final SocketChannel _socketChannel;
+    private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
+    private final String _remoteAddressString;
+    private final AtomicBoolean _scheduled = new AtomicBoolean();
+    private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
+    private final AtomicLong _maxReadIdleMillis = new AtomicLong();
+    private final List<SchedulingDelayNotificationListener>
+            _schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
+    private final String _threadName;
+    private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+    private NonBlockingConnectionDelegate _delegate;
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+    private volatile SelectorThread.SelectionTask _selectionTask;
+    private volatile long _scheduledTime;
+    private final AtomicBoolean _hasShutdown = new AtomicBoolean();
+    private final AtomicBoolean _unexpectedByteBufferSizeReported = new AtomicBoolean();
+    private final ProtocolEngine _protocolEngine;
+    private volatile Iterator<Runnable> _pendingIterator;
+    private volatile boolean _fullyWritten = true;
+    private volatile boolean _partialRead = false;
+
+    protected NonBlockingConnection(final SocketChannel socketChannel, ProtocolEngine protocolEngine,
+                                    final NetworkConnectionScheduler scheduler,
+                                    String remoteAddressString)
+    {
+        _socketChannel = socketChannel;
+        _protocolEngine = protocolEngine;
+        _remoteAddressString = remoteAddressString;
+        _threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteAddressString;
+        pushScheduler(scheduler);
+        protocolEngine.setWorkListener(new Action<ProtocolEngine>()
+        {
+            @Override
+            public void performAction(final ProtocolEngine object)
+            {
+                NetworkConnectionScheduler scheduler = getScheduler();
+                if(scheduler != null)
+                {
+                    scheduler.schedule(NonBlockingConnection.this);
+                }
+            }
+        });
+
+
+    }
+
+    @Override
+    public final String getThreadName()
+    {
+        return _threadName;
+    }
+
+
+    protected final String getRemoteAddressString()
+    {
+        return _remoteAddressString;
+    }
+
+    protected final boolean isClosed()
+    {
+        return _closed.get();
+    }
+
+    protected final boolean setClosed()
+    {
+        return _closed.compareAndSet(false,true);
+    }
+
+    @Override
+    public final SocketChannel getSocketChannel()
+    {
+        return _socketChannel;
+    }
+
+
+    @Override
+    public final SocketAddress getRemoteAddress()
+    {
+        return getSocketChannel().socket().getRemoteSocketAddress();
+    }
+
+    @Override
+    public final SocketAddress getLocalAddress()
+    {
+        return getSocketChannel().socket().getLocalSocketAddress();
+    }
+
+
+    public final void pushScheduler(NetworkConnectionScheduler scheduler)
+    {
+        _schedulerDeque.addFirst(scheduler);
+    }
+
+    public final NetworkConnectionScheduler popScheduler()
+    {
+        return _schedulerDeque.removeFirst();
+    }
+
+    @Override
+    public final NetworkConnectionScheduler getScheduler()
+    {
+        return _schedulerDeque.peekFirst();
+    }
+
+    @Override
+    public final Principal getPeerPrincipal()
+    {
+        return _delegate.getPeerPrincipal();
+    }
+
+    @Override
+    public final Certificate getPeerCertificate()
+    {
+        return _delegate.getPeerCertificate();
+    }
+
+    @Override
+    public final String getTransportInfo()
+    {
+        return _delegate.getTransportInfo();
+    }
+
+
+    @Override
+    public final SelectorThread.SelectionTask getSelectionTask()
+    {
+        return _selectionTask;
+    }
+
+    @Override
+    public final void setSelectionTask(final SelectorThread.SelectionTask selectionTask)
+    {
+        _selectionTask = selectionTask;
+    }
+
+    @Override
+    public final boolean setScheduled()
+    {
+        final boolean scheduled = _scheduled.compareAndSet(false, true);
+        if (scheduled)
+        {
+            _scheduledTime = System.currentTimeMillis();
+        }
+        return scheduled;
+    }
+
+    @Override
+    public final void clearScheduled()
+    {
+        _scheduled.set(false);
+        _scheduledTime = 0;
+    }
+
+    @Override
+    public final long getScheduledTime()
+    {
+        return _scheduledTime;
+    }
+
+    @Override
+    public final void setMaxWriteIdleMillis(final long millis)
+    {
+        _maxWriteIdleMillis.set(millis);
+    }
+
+    @Override
+    public final void setMaxReadIdleMillis(final long millis)
+    {
+        _maxReadIdleMillis.set(millis);
+    }
+
+    @Override
+    public final long getMaxReadIdleMillis()
+    {
+        return _maxReadIdleMillis.get();
+    }
+
+    @Override
+    public final long getMaxWriteIdleMillis()
+    {
+        return _maxWriteIdleMillis.get();
+    }
+    @Override
+    public final boolean isStateChanged()
+    {
+        return _protocolEngine.hasWork();
+    }
+
+    @Override
+    public final boolean wantsRead()
+    {
+        return _fullyWritten;
+    }
+
+    @Override
+    public final boolean wantsWrite()
+    {
+        return !_fullyWritten;
+    }
+
+    @Override
+    public final boolean isPartialRead()
+    {
+        return _partialRead;
+    }
+
+    @Override
+    public boolean doWork()
+    {
+        _protocolEngine.clearWork();
+        try
+        {
+
+            if (!isClosed() && (!wantsConnect() || completeConnection()))
+            {
+                long currentTime = System.currentTimeMillis();
+                int tick = getTicker().getTimeToNextTick(currentTime);
+                if (tick <= 0)
+                {
+                    getTicker().tick(currentTime);
+                }
+
+                _protocolEngine.setIOThread(Thread.currentThread());
+                _protocolEngine.setMessageAssignmentSuspended(true, true);
+
+                boolean processPendingComplete = processPending();
+
+                if (processPendingComplete)
+                {
+                    _pendingIterator = null;
+                    _protocolEngine.setTransportBlockedForWriting(false);
+                    boolean dataRead = doRead();
+                    _fullyWritten = doWrite();
+                    _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
+
+                    if (!_fullyWritten || dataRead || (_delegate.needsWork()
+                                                       && _delegate.getNetInputBuffer().position() != 0))
+                    {
+                        _protocolEngine.notifyWork();
+                    }
+
+                    if (_fullyWritten)
+                    {
+                        _protocolEngine.setMessageAssignmentSuspended(false, true);
+                    }
+                }
+                else
+                {
+                    _protocolEngine.notifyWork();
+                }
+            }
+        }
+        catch (IOException |
+                ConnectionScopedRuntimeException e)
+        {
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Exception performing I/O for connection '{}'",
+                             getRemoteAddressString(), e);
+            }
+            else
+            {
+                LOGGER.info("Exception performing I/O for connection '{}' : {}",
+                            getRemoteAddressString(), e.getMessage());
+            }
+
+            if(setClosed())
+            {
+                _protocolEngine.notifyWork();
+            }
+        }
+        finally
+        {
+            _protocolEngine.setIOThread(null);
+        }
+
+        final boolean closed = isClosed();
+        if (closed)
+        {
+            shutdown();
+        }
+
+        return closed;
+
+    }
+
+    private boolean completeConnection() throws IOException
+    {
+        boolean finishConnect = getSocketChannel().finishConnect();
+        return finishConnect  && !wantsConnect();
+    }
+
+
+    private boolean processPending() throws IOException
+    {
+        if(_pendingIterator == null)
+        {
+            _pendingIterator = _protocolEngine.processPendingIterator();
+        }
+
+        final int networkBufferSize = getNetworkBufferSize();
+
+        while(_pendingIterator.hasNext())
+        {
+            long size = getBufferedSize();
+            if(size >= networkBufferSize)
+            {
+                _fullyWritten = doWrite();
+                long bytesWritten = size - getBufferedSize();
+                if(bytesWritten < (networkBufferSize / 2))
+                {
+                    break;
+                }
+            }
+            else
+            {
+                final Runnable task = _pendingIterator.next();
+                task.run();
+            }
+        }
+
+        boolean complete = !_pendingIterator.hasNext();
+        if (getBufferedSize() >= networkBufferSize)
+        {
+            _fullyWritten = doWrite();
+            complete &= getBufferedSize() < networkBufferSize /2;
+        }
+        return complete;
+    }
+
+    protected abstract int getNetworkBufferSize();
+
+
+    @Override
+    public final void doPreWork()
+    {
+        if (!isClosed())
+        {
+            long currentTime = System.currentTimeMillis();
+            long schedulingDelay = currentTime - getScheduledTime();
+            if (!_schedulingDelayNotificationListeners.isEmpty())
+            {
+                for (SchedulingDelayNotificationListener listener : _schedulingDelayNotificationListeners)
+                {
+                    listener.notifySchedulingDelay(schedulingDelay);
+                }
+            }
+        }
+    }
+
+
+    /**
+     * doRead is not reentrant.
+     */
+    boolean doRead() throws IOException
+    {
+        _partialRead = false;
+        if(!isClosed() && _delegate.readyForRead())
+        {
+            int readData = readFromNetwork();
+
+            if (readData > 0)
+            {
+                return _delegate.processData();
+            }
+            else
+            {
+                return false;
+            }
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    @Override
+    public long writeToTransport(Collection<QpidByteBuffer> buffers) throws IOException
+    {
+        long written  = QpidByteBuffer.write(getSocketChannel(), buffers);
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Written " + written + " bytes");
+        }
+        return written;
+    }
+
+    protected int readFromNetwork() throws IOException
+    {
+        QpidByteBuffer buffer = _delegate.getNetInputBuffer();
+
+        int read = buffer.read(getSocketChannel());
+        if (read == -1)
+        {
+            setClosed();
+        }
+
+        _partialRead = read != 0;
+
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Read " + read + " byte(s)");
+        }
+        return read;
+    }
+
+
+
+
+    @Override
+    public final void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
+    {
+        _schedulingDelayNotificationListeners.add(listener);
+    }
+
+    @Override
+    public final void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
+    {
+        _schedulingDelayNotificationListeners.remove(listener);
+    }
+
+    @Override
+    public final ByteBufferSender getSender()
+    {
+        return this;
+    }
+
+    @Override
+    public final boolean isDirectBufferPreferred()
+    {
+        return true;
+    }
+
+    private final long getBufferedSize()
+    {
+        // Avoids iterator garbage if empty
+        if (_buffers.isEmpty())
+        {
+            return 0L;
+        }
+
+        long totalSize = 0L;
+        for(QpidByteBuffer buf : _buffers)
+        {
+            totalSize += buf.remaining();
+        }
+        return totalSize;
+    }
+
+    final boolean doWrite() throws IOException
+    {
+        boolean fullyWritten = _delegate.doWrite(_buffers);
+        while(!_buffers.isEmpty())
+        {
+            QpidByteBuffer buf = _buffers.peek();
+            if(buf.hasRemaining())
+            {
+                break;
+            }
+            _buffers.poll();
+            buf.dispose();
+        }
+        if (fullyWritten)
+        {
+            _usedOutboundMessageSpace.set(0);
+        }
+        return fullyWritten;
+
+    }
+
+    @Override
+    public final void send(final QpidByteBuffer msg)
+    {
+
+        if (isClosed())
+        {
+            LOGGER.warn("Send ignored as the connection is already closed");
+        }
+        else if (msg.remaining() > 0)
+        {
+            _buffers.add(msg.duplicate());
+        }
+        msg.position(msg.limit());
+    }
+
+
+    protected final void shutdown()
+    {
+        if (_hasShutdown.getAndSet(true))
+        {
+            return;
+        }
+
+        try
+        {
+            shutdownInput();
+            shutdownFinalWrite();
+
+            _protocolEngine.closed();
+
+            shutdownOutput();
+        }
+        finally
+        {
+            try
+            {
+                try
+                {
+                    NetworkConnectionScheduler scheduler = getScheduler();
+                    if (scheduler != null)
+                    {
+                        scheduler.removeConnection(this);
+                    }
+                }
+                finally
+                {
+                    getSocketChannel().close();
+                }
+            }
+            catch (IOException e)
+            {
+                LOGGER.info("Exception closing socket '{}': {}", getRemoteAddressString(), e.getMessage());
+            }
+
+            if (SystemUtils.isWindows())
+            {
+                _delegate.shutdownInput();
+                _delegate.shutdownOutput();
+            }
+        }
+    }
+
+    private void shutdownFinalWrite()
+    {
+        try
+        {
+            while(!doWrite())
+            {
+            }
+        }
+        catch (IOException e)
+        {
+            LOGGER.info("Exception performing final write/close for '{}': {}", getRemoteAddressString(), e.getMessage());
+        }
+    }
+
+    private void shutdownOutput()
+    {
+        if(!SystemUtils.isWindows())
+        {
+            try
+            {
+                getSocketChannel().shutdownOutput();
+            }
+            catch (IOException e)
+            {
+                LOGGER.info("Exception closing socket '{}': {}", getRemoteAddressString(), e.getMessage());
+            }
+            finally
+            {
+                _delegate.shutdownOutput();
+            }
+        }
+    }
+
+    private void shutdownInput()
+    {
+
+        if(!SystemUtils.isWindows())
+        {
+            try
+            {
+                getSocketChannel().shutdownInput();
+            }
+            catch (IOException e)
+            {
+                LOGGER.info("Exception shutting down input for '{}': {}", getRemoteAddressString(), e.getMessage());
+            }
+            finally
+            {
+                _delegate.shutdownInput();
+            }
+        }
+    }
+
+
+    @Override
+    public final void start()
+    {
+    }
+
+    @Override
+    public final void flush()
+    {
+    }
+
+
+    @Override
+    public final void close()
+    {
+        LOGGER.debug("Closing " + getRemoteAddressString());
+        if(setClosed())
+        {
+            _protocolEngine.notifyWork();
+            getSelectionTask().wakeup();
+        }
+    }
+
+
+    @Override
+    public void processAmqpData(QpidByteBuffer applicationData)
+    {
+        _protocolEngine.received(applicationData);
+    }
+
+
+    @Override
+    public final void reserveOutboundMessageSpace(long size)
+    {
+        if (_usedOutboundMessageSpace.addAndGet(size) > getOutboundMessageBufferLimit())
+        {
+            _protocolEngine.setMessageAssignmentSuspended(true, false);
+        }
+    }
+
+
+    @Override
+    public final Ticker getTicker()
+    {
+        return _protocolEngine.getAggregateTicker();
+    }
+
+
+    protected abstract long getOutboundMessageBufferLimit();
+
+
+    public final void reportUnexpectedByteBufferSizeUsage()
+    {
+        if (_unexpectedByteBufferSizeReported.compareAndSet(false,true))
+        {
+            LOGGER.info("At least one frame unexpectedly does not fit into default byte buffer size ({}B) on a connection {}.",
+                        getNetworkBufferSize(), this.toString());
+        }
+    }
+
+    protected NonBlockingConnectionDelegate getDelegate()
+    {
+        return _delegate;
+    }
+
+    protected void setDelegate(NonBlockingConnectionDelegate delegate)
+    {
+        _delegate = delegate;
+    }
+
+    public final ProtocolEngine getProtocolEngine()
+    {
+        return _protocolEngine;
+    }
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java Fri Oct 21 09:32:07 2016
@@ -24,7 +24,6 @@ import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.Collection;
 
-import org.apache.qpid.server.model.port.AmqpPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,14 +33,14 @@ public class NonBlockingConnectionPlainD
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnectionPlainDelegate.class);
 
-    private final NonBlockingConnection _parent;
+    private final SchedulableConnection _parent;
     private final int _networkBufferSize;
     private volatile QpidByteBuffer _netInputBuffer;
 
-    public NonBlockingConnectionPlainDelegate(NonBlockingConnection parent, AmqpPort<?> port)
+    public NonBlockingConnectionPlainDelegate(SchedulableConnection parent, int networkBufferSize)
     {
         _parent = parent;
-        _networkBufferSize = port.getNetworkBufferSize();
+        _networkBufferSize = networkBufferSize;
         _netInputBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
     }
 

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java Fri Oct 21 09:32:07 2016
@@ -19,19 +19,6 @@
 
 package org.apache.qpid.server.transport;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-
 import java.io.IOException;
 import java.security.Principal;
 import java.security.cert.Certificate;
@@ -41,12 +28,24 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
 public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDelegate
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnectionTLSDelegate.class);
 
     private final SSLEngine _sslEngine;
-    private final NonBlockingConnection _parent;
+    private final SchedulableConnection _parent;
     private final int _networkBufferSize;
     private SSLEngineResult _status;
     private final List<QpidByteBuffer> _encryptedOutput = new ArrayList<>();
@@ -58,11 +57,11 @@ public class NonBlockingConnectionTLSDel
     private QpidByteBuffer _applicationBuffer;
 
 
-    public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent, AmqpPort port)
+    public NonBlockingConnectionTLSDelegate(SchedulableConnection parent, int networkBufferSize, SSLEngine sslEngine)
     {
         _parent = parent;
-        _sslEngine = createSSLEngine(port);
-        _networkBufferSize = port.getNetworkBufferSize();
+        _sslEngine = sslEngine;
+        _networkBufferSize = networkBufferSize;
 
         final int tlsPacketBufferSize = _sslEngine.getSession().getPacketBufferSize();
         if (tlsPacketBufferSize > _networkBufferSize)
@@ -313,28 +312,6 @@ public class NonBlockingConnectionTLSDel
         }
     }
 
-    private SSLEngine createSSLEngine(AmqpPort<?> port)
-    {
-        SSLEngine sslEngine = port.getSSLContext().createSSLEngine();
-        sslEngine.setUseClientMode(false);
-        SSLUtil.updateEnabledTlsProtocols(sslEngine, port.getTlsProtocolWhiteList(), port.getTlsProtocolBlackList());
-        SSLUtil.updateEnabledCipherSuites(sslEngine, port.getTlsCipherSuiteWhiteList(), port.getTlsCipherSuiteBlackList());
-        if(port.getTlsCipherSuiteWhiteList() != null && !port.getTlsCipherSuiteWhiteList().isEmpty())
-        {
-            SSLUtil.useCipherOrderIfPossible(sslEngine);
-        }
-
-        if(port.getNeedClientAuth())
-        {
-            sslEngine.setNeedClientAuth(true);
-        }
-        else if(port.getWantClientAuth())
-        {
-            sslEngine.setWantClientAuth(true);
-        }
-        return sslEngine;
-    }
-
     @Override
     public QpidByteBuffer getNetInputBuffer()
     {

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java Fri Oct 21 09:32:07 2016
@@ -19,22 +19,22 @@
 
 package org.apache.qpid.server.transport;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.transport.network.TransportEncryption;
-
 import java.io.IOException;
 import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.Collection;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.transport.network.TransportEncryption;
+
 public class NonBlockingConnectionUndecidedDelegate implements NonBlockingConnectionDelegate
 {
     private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
-    public final NonBlockingConnection _parent;
+    public final NonBlockingInboundConnection _parent;
 
     private QpidByteBuffer _netInputBuffer;
 
-    public NonBlockingConnectionUndecidedDelegate(NonBlockingConnection parent)
+    public NonBlockingConnectionUndecidedDelegate(NonBlockingInboundConnection parent)
     {
         _parent = parent;
         _netInputBuffer = QpidByteBuffer.allocateDirect(NUMBER_OF_BYTES_FOR_TLS_CHECK);

Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingInboundConnection.java (from r1761440, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingInboundConnection.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingInboundConnection.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java&r1=1761440&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingInboundConnection.java Fri Oct 21 09:32:07 2016
@@ -20,94 +20,44 @@
  */
 package org.apache.qpid.server.transport;
 
-import java.io.IOException;
-import java.net.SocketAddress;
 import java.nio.channels.SocketChannel;
-import java.security.Principal;
-import java.security.cert.Certificate;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.ssl.SSLEngine;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.util.SystemUtils;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
-public class NonBlockingConnection implements ServerNetworkConnection, ByteBufferSender
+public class NonBlockingInboundConnection extends NonBlockingConnection
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingInboundConnection.class);
 
-    private final SocketChannel _socketChannel;
-    private NonBlockingConnectionDelegate _delegate;
-    private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
-    private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
-
-    private final String _remoteSocketAddress;
-    private final AtomicBoolean _closed = new AtomicBoolean(false);
-    private final ProtocolEngine _protocolEngine;
     private final Runnable _onTransportEncryptionAction;
-    private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
     private final long _outboundMessageBufferLimit;
 
-    private volatile boolean _fullyWritten = true;
-
-    private boolean _partialRead = false;
 
     private final AmqpPort _port;
-    private final AtomicBoolean _scheduled = new AtomicBoolean();
-    private volatile long _scheduledTime;
-    private volatile boolean _unexpectedByteBufferSizeReported;
-    private final String _threadName;
-    private volatile SelectorThread.SelectionTask _selectionTask;
-    private Iterator<Runnable> _pendingIterator;
-    private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
-    private final AtomicLong _maxReadIdleMillis = new AtomicLong();
-    private final List<SchedulingDelayNotificationListener> _schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
-    private final AtomicBoolean _hasShutdown = new AtomicBoolean();
-
-    public NonBlockingConnection(SocketChannel socketChannel,
-                                 ProtocolEngine protocolEngine,
-                                 final Set<TransportEncryption> encryptionSet,
-                                 final Runnable onTransportEncryptionAction,
-                                 final NetworkConnectionScheduler scheduler,
-                                 final AmqpPort port)
-    {
-        _socketChannel = socketChannel;
-        pushScheduler(scheduler);
 
-        _protocolEngine = protocolEngine;
+    public NonBlockingInboundConnection(SocketChannel socketChannel,
+                                        ProtocolEngine protocolEngine,
+                                        final Set<TransportEncryption> encryptionSet,
+                                        final Runnable onTransportEncryptionAction,
+                                        final NetworkConnectionScheduler scheduler,
+                                        final AmqpPort port)
+    {
+        super(socketChannel, protocolEngine, scheduler, socketChannel.socket().getRemoteSocketAddress().toString());
         _onTransportEncryptionAction = onTransportEncryptionAction;
 
-        _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
         _port = port;
-        _threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteSocketAddress.toString();
 
         _outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
                                                                    AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
 
-        protocolEngine.setWorkListener(new Action<ProtocolEngine>()
-        {
-            @Override
-            public void performAction(final ProtocolEngine object)
-            {
-                getScheduler().schedule(NonBlockingConnection.this);
-            }
-        });
 
         if(encryptionSet.size() == 1)
         {
@@ -115,518 +65,71 @@ public class NonBlockingConnection imple
         }
         else
         {
-            _delegate = new NonBlockingConnectionUndecidedDelegate(this);
-        }
-
-    }
-
-    String getThreadName()
-    {
-        return _threadName;
-    }
-
-    public boolean isPartialRead()
-    {
-        return _partialRead;
-    }
-
-    Ticker getTicker()
-    {
-        return _protocolEngine.getAggregateTicker();
-    }
-
-    SocketChannel getSocketChannel()
-    {
-        return _socketChannel;
-    }
-
-    @Override
-    public void start()
-    {
-    }
-
-    @Override
-    public ByteBufferSender getSender()
-    {
-        return this;
-    }
-
-    @Override
-    public void close()
-    {
-        LOGGER.debug("Closing " + _remoteSocketAddress);
-        if(_closed.compareAndSet(false,true))
-        {
-            _protocolEngine.notifyWork();
-            _selectionTask.wakeup();
-        }
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress()
-    {
-        return _socketChannel.socket().getRemoteSocketAddress();
-    }
-
-    @Override
-    public SocketAddress getLocalAddress()
-    {
-        return _socketChannel.socket().getLocalSocketAddress();
-    }
-
-    @Override
-    public void setMaxWriteIdleMillis(final long millis)
-    {
-        _maxWriteIdleMillis.set(millis);
-    }
-
-    @Override
-    public void setMaxReadIdleMillis(final long millis)
-    {
-        _maxReadIdleMillis.set(millis);
-    }
-
-    @Override
-    public Principal getPeerPrincipal()
-    {
-        return _delegate.getPeerPrincipal();
-    }
-
-    @Override
-    public Certificate getPeerCertificate()
-    {
-        return _delegate.getPeerCertificate();
-    }
-
-    @Override
-    public long getMaxReadIdleMillis()
-    {
-        return _maxReadIdleMillis.get();
-    }
-
-    @Override
-    public long getMaxWriteIdleMillis()
-    {
-        return _maxWriteIdleMillis.get();
-    }
-
-    @Override
-    public void reserveOutboundMessageSpace(long size)
-    {
-        if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
-        {
-            _protocolEngine.setMessageAssignmentSuspended(true, false);
-        }
-    }
-
-    @Override
-    public String getTransportInfo()
-    {
-        return _delegate.getTransportInfo();
-    }
-
-    boolean wantsRead()
-    {
-        return _fullyWritten;
-    }
-
-    boolean wantsWrite()
-    {
-        return !_fullyWritten;
-    }
-
-    public boolean isStateChanged()
-    {
-        return _protocolEngine.hasWork();
-    }
-
-    public void doPreWork()
-    {
-        if (!_closed.get())
-        {
-            long currentTime = System.currentTimeMillis();
-            long schedulingDelay = currentTime - getScheduledTime();
-            if (!_schedulingDelayNotificationListeners.isEmpty())
-            {
-                for (SchedulingDelayNotificationListener listener : _schedulingDelayNotificationListeners)
-                {
-                    listener.notifySchedulingDelay(schedulingDelay);
-                }
-            }
-        }
-    }
-
-    public boolean doWork()
-    {
-        _protocolEngine.clearWork();
-        if (!_closed.get())
-        {
-            try
-            {
-                long currentTime = System.currentTimeMillis();
-                int tick = getTicker().getTimeToNextTick(currentTime);
-                if (tick <= 0)
-                {
-                    getTicker().tick(currentTime);
-                }
-
-                _protocolEngine.setIOThread(Thread.currentThread());
-                _protocolEngine.setMessageAssignmentSuspended(true, true);
-
-                boolean processPendingComplete = processPending();
-
-                if(processPendingComplete)
-                {
-                    _pendingIterator = null;
-                    _protocolEngine.setTransportBlockedForWriting(false);
-                    boolean dataRead = doRead();
-                    _protocolEngine.setTransportBlockedForWriting(!doWrite());
-
-                    if (!_fullyWritten || dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
-                    {
-                        _protocolEngine.notifyWork();
-                    }
-
-                    if (_fullyWritten)
-                    {
-                        _protocolEngine.setMessageAssignmentSuspended(false, true);
-                    }
-                }
-                else
-                {
-                    _protocolEngine.notifyWork();
-                }
-
-            }
-            catch (IOException |
-                    ConnectionScopedRuntimeException e)
-            {
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Exception performing I/O for connection '{}'",
-                                 _remoteSocketAddress, e);
-                }
-                else
-                {
-                    LOGGER.info("Exception performing I/O for connection '{}' : {}",
-                                _remoteSocketAddress, e.getMessage());
-                }
-
-                if(_closed.compareAndSet(false,true))
-                {
-                    _protocolEngine.notifyWork();
-                }
-            }
-            finally
-            {
-                _protocolEngine.setIOThread(null);
-            }
-        }
-
-        final boolean closed = _closed.get();
-        if (closed)
-        {
-            shutdown();
-        }
-
-        return closed;
-
-    }
-
-    @Override
-    public void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
-    {
-        _schedulingDelayNotificationListeners.add(listener);
-    }
-
-    @Override
-    public void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
-    {
-        _schedulingDelayNotificationListeners.remove(listener);
-    }
-
-    private boolean processPending() throws IOException
-    {
-        if(_pendingIterator == null)
-        {
-            _pendingIterator = _protocolEngine.processPendingIterator();
-        }
-
-        final int networkBufferSize = _port.getNetworkBufferSize();
-
-        while(_pendingIterator.hasNext())
-        {
-            long size = getBufferedSize();
-            if(size >= networkBufferSize)
-            {
-                doWrite();
-                long bytesWritten = size - getBufferedSize();
-                if(bytesWritten < (networkBufferSize / 2))
-                {
-                    break;
-                }
-            }
-            else
-            {
-                final Runnable task = _pendingIterator.next();
-                task.run();
-            }
-        }
-
-        boolean complete = !_pendingIterator.hasNext();
-        if (getBufferedSize() >= networkBufferSize)
-        {
-            doWrite();
-            complete &= getBufferedSize() < networkBufferSize /2;
-        }
-        return complete;
-    }
-
-    private long getBufferedSize()
-    {
-        long totalSize = 0l;
-        for(QpidByteBuffer buf : _buffers)
-        {
-            totalSize += buf.remaining();
-        }
-        return totalSize;
-    }
-
-    private void shutdown()
-    {
-        if (_hasShutdown.getAndSet(true))
-        {
-            return;
-        }
-
-        try
-        {
-            shutdownInput();
-            shutdownFinalWrite();
-            _protocolEngine.closed();
-            shutdownOutput();
-        }
-        finally
-        {
-            try
-            {
-                try
-                {
-                    NetworkConnectionScheduler scheduler = getScheduler();
-                    if (scheduler != null)
-                    {
-                        scheduler.removeConnection(this);
-                    }
-                }
-                finally
-                {
-                    _socketChannel.close();
-                }
-            }
-            catch (IOException e)
-            {
-                LOGGER.info("Exception closing socket '{}': {}", _remoteSocketAddress, e.getMessage());
-            }
-
-            if (SystemUtils.isWindows())
-            {
-                _delegate.shutdownInput();
-                _delegate.shutdownOutput();
-            }
-        }
-    }
-
-    private void shutdownFinalWrite()
-    {
-        try
-        {
-            while(!doWrite())
-            {
-            }
-        }
-        catch (IOException e)
-        {
-            LOGGER.info("Exception performing final write/close for '{}': {}", _remoteSocketAddress, e.getMessage());
-        }
-    }
-
-    private void shutdownOutput()
-    {
-        if(!SystemUtils.isWindows())
-        {
-            try
-            {
-                _socketChannel.shutdownOutput();
-            }
-            catch (IOException e)
-            {
-                LOGGER.info("Exception closing socket '{}': {}", _remoteSocketAddress, e.getMessage());
-            }
-            finally
-            {
-                _delegate.shutdownOutput();
-            }
-        }
-    }
-
-    private void shutdownInput()
-    {
-
-        if(!SystemUtils.isWindows())
-        {
-            try
-            {
-                _socketChannel.shutdownInput();
-            }
-            catch (IOException e)
-            {
-                LOGGER.info("Exception shutting down input for '{}': {}", _remoteSocketAddress, e.getMessage());
-            }
-            finally
-            {
-                _delegate.shutdownInput();
-            }
-        }
-    }
-
-    /**
-     * doRead is not reentrant.
-     */
-    boolean doRead() throws IOException
-    {
-        _partialRead = false;
-        if(!_closed.get() && _delegate.readyForRead())
-        {
-            int readData = readFromNetwork();
-
-            if (readData > 0)
-            {
-                return _delegate.processData();
-            }
-            else
-            {
-                return false;
-            }
-        }
-        else
-        {
-            return false;
+            setDelegate(new NonBlockingConnectionUndecidedDelegate(this));
         }
-    }
 
-    long writeToTransport(Collection<QpidByteBuffer> buffers) throws IOException
-    {
-        long written  = QpidByteBuffer.write(_socketChannel, buffers);
-        if (LOGGER.isDebugEnabled())
-        {
-            LOGGER.debug("Written " + written + " bytes");
-        }
-        return written;
     }
 
-    private boolean doWrite() throws IOException
+    static SSLEngine createSSLEngine(AmqpPort<?> port)
     {
-        _fullyWritten = _delegate.doWrite(_buffers);
-        while(!_buffers.isEmpty())
+        SSLEngine sslEngine = port.getSSLContext().createSSLEngine();
+        sslEngine.setUseClientMode(false);
+        SSLUtil.updateEnabledTlsProtocols(sslEngine, port.getTlsProtocolWhiteList(), port.getTlsProtocolBlackList());
+        SSLUtil.updateEnabledCipherSuites(sslEngine, port.getTlsCipherSuiteWhiteList(), port.getTlsCipherSuiteBlackList());
+        if(port.getTlsCipherSuiteWhiteList() != null && !port.getTlsCipherSuiteWhiteList().isEmpty())
         {
-            QpidByteBuffer buf = _buffers.peek();
-            if(buf.hasRemaining())
-            {
-                break;
-            }
-            _buffers.poll();
-            buf.dispose();
+            SSLUtil.useCipherOrderIfPossible(sslEngine);
         }
-        if (_fullyWritten)
-        {
-            _usedOutboundMessageSpace.set(0);
-        }
-        return _fullyWritten;
-
-    }
-
-    protected int readFromNetwork() throws IOException
-    {
-        QpidByteBuffer buffer = _delegate.getNetInputBuffer();
 
-        int read = buffer.read(_socketChannel);
-        if (read == -1)
+        if(port.getNeedClientAuth())
         {
-            _closed.set(true);
+            sslEngine.setNeedClientAuth(true);
         }
-
-        _partialRead = read != 0;
-
-        if (LOGGER.isDebugEnabled())
+        else if(port.getWantClientAuth())
         {
-            LOGGER.debug("Read " + read + " byte(s)");
+            sslEngine.setWantClientAuth(true);
         }
-        return read;
+        return sslEngine;
     }
 
     @Override
-    public boolean isDirectBufferPreferred()
+    protected long getOutboundMessageBufferLimit()
     {
-        return true;
+        return _outboundMessageBufferLimit;
     }
 
     @Override
-    public void send(final QpidByteBuffer msg)
+    protected int getNetworkBufferSize()
     {
-
-        if (_closed.get())
-        {
-            LOGGER.warn("Send ignored as the connection is already closed");
-        }
-        else if (msg.remaining() > 0)
-        {
-            _buffers.add(msg.duplicate());
-        }
-        msg.position(msg.limit());
+        return _port.getNetworkBufferSize();
     }
 
     @Override
-    public void flush()
-    {
-    }
-
-    public final void pushScheduler(NetworkConnectionScheduler scheduler)
-    {
-        _schedulerDeque.addFirst(scheduler);
-    }
-
-    public final NetworkConnectionScheduler popScheduler()
+    public boolean wantsConnect()
     {
-        return _schedulerDeque.removeFirst();
+        return false;
     }
 
-    public final NetworkConnectionScheduler getScheduler()
-    {
-        return _schedulerDeque.peekFirst();
-    }
 
     @Override
     public String toString()
     {
-        return "[NonBlockingConnection " + _remoteSocketAddress + "]";
-    }
-
-    public void processAmqpData(QpidByteBuffer applicationData)
-    {
-        _protocolEngine.received(applicationData);
+        return "[InboundConnection " + getRemoteAddressString() + "]";
     }
 
     public void setTransportEncryption(TransportEncryption transportEncryption)
     {
-        NonBlockingConnectionDelegate oldDelegate = _delegate;
+        NonBlockingConnectionDelegate oldDelegate = getDelegate();
         switch (transportEncryption)
         {
             case TLS:
                 _onTransportEncryptionAction.run();
-                _delegate = new NonBlockingConnectionTLSDelegate(this, _port);
+                setDelegate(new NonBlockingConnectionTLSDelegate(this,
+                                                                 _port.getNetworkBufferSize(),
+                                                                 createSSLEngine(_port)));
                 break;
             case NONE:
-                _delegate = new NonBlockingConnectionPlainDelegate(this, _port);
+                setDelegate(new NonBlockingConnectionPlainDelegate(this, _port.getNetworkBufferSize()));
                 break;
             default:
                 throw new IllegalArgumentException("unknown TransportEncryption " + transportEncryption);
@@ -635,51 +138,11 @@ public class NonBlockingConnection imple
         {
             QpidByteBuffer src = oldDelegate.getNetInputBuffer().duplicate();
             src.flip();
-            _delegate.getNetInputBuffer().put(src);
+            getDelegate().getNetInputBuffer().put(src);
             src.dispose();
         }
         LOGGER.debug("Identified transport encryption as " + transportEncryption);
     }
 
-    public boolean setScheduled()
-    {
-        final boolean scheduled = _scheduled.compareAndSet(false, true);
-        if (scheduled)
-        {
-            _scheduledTime = System.currentTimeMillis();
-        }
-        return scheduled;
-    }
-
-    public void clearScheduled()
-    {
-        _scheduled.set(false);
-        _scheduledTime = 0;
-    }
-
-    @Override
-    public long getScheduledTime()
-    {
-        return _scheduledTime;
-    }
 
-    void reportUnexpectedByteBufferSizeUsage()
-    {
-        if (!_unexpectedByteBufferSizeReported)
-        {
-            LOGGER.info("At least one frame unexpectedly does not fit into default byte buffer size ({}B) on a connection {}.",
-                    _port.getNetworkBufferSize(), this.toString());
-            _unexpectedByteBufferSizeReported = true;
-        }
-    }
-
-    public SelectorThread.SelectionTask getSelectionTask()
-    {
-        return _selectionTask;
-    }
-
-    public void setSelectionTask(final SelectorThread.SelectionTask selectionTask)
-    {
-        _selectionTask = selectionTask;
-    }
 }

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Fri Oct 21 09:32:07 2016
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.transport;
 
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
 import java.io.IOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
@@ -30,16 +32,14 @@ import java.nio.channels.SocketChannel;
 import java.util.EnumSet;
 import java.util.Set;
 
-import org.apache.qpid.server.model.port.AmqpPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.TransportEncryption;
 
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
 public class NonBlockingNetworkTransport
 {
 
@@ -156,10 +156,10 @@ public class NonBlockingNetworkTransport
 
 
                     NonBlockingConnection connection =
-                            new NonBlockingConnection(socketChannel,
-                                                      engine,
-                                                      _encryptionSet,
-                                                      new Runnable()
+                            new NonBlockingInboundConnection(socketChannel,
+                                                             engine,
+                                                             _encryptionSet,
+                                                             new Runnable()
                                                       {
 
                                                           @Override
@@ -168,8 +168,8 @@ public class NonBlockingNetworkTransport
                                                               engine.encryptedTransport();
                                                           }
                                                       },
-                                                      _scheduler,
-                                                      _port);
+                                                             _scheduler,
+                                                             _port);
 
                     engine.setNetworkConnection(connection);
 

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.nio.channels.SocketChannel;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.federation.OutboundProtocolEngine;
+import org.apache.qpid.server.model.Container;
+import org.apache.qpid.server.model.KeyStore;
+import org.apache.qpid.server.model.RemoteHostAddress;
+import org.apache.qpid.server.model.TrustStore;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.plugin.OutboundProtocolEngineCreator;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+public class NonBlockingOutboundConnection extends NonBlockingConnection
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingOutboundConnection.class);
+
+    private final AbstractVirtualHost<?> _virtualHost;
+    private final RemoteHostAddress<?> _address;
+    private final int _networkBufferSize;
+    private final long _outboundMessageBufferLimit;
+    private volatile boolean _connected;
+
+    public NonBlockingOutboundConnection(SocketChannel socketChannel,
+                                         final RemoteHostAddress<?> address,
+                                         final NetworkConnectionScheduler networkConnectionScheduler,
+                                         final AbstractVirtualHost<?> virtualHost,
+                                         final Action<Boolean> onConnectionLoss)
+    {
+        super(socketChannel, createProtocolEngine(address, virtualHost), networkConnectionScheduler, address.getAddress()+":"+address.getPort());
+        OutboundProtocolEngine protocolEngine = (OutboundProtocolEngine) getProtocolEngine();
+        protocolEngine.setConnection(this);
+        protocolEngine.setOnClosedTask(onConnectionLoss);
+        _virtualHost = virtualHost;
+        _address = address;
+        _networkBufferSize = virtualHost.getAncestor(Container.class).getNetworkBufferSize();
+        _outboundMessageBufferLimit = (long) virtualHost.getContextValue(Long.class,
+                                                                   AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
+
+        final NonBlockingConnectionDelegate delegate;
+        switch(address.getTransport())
+        {
+            case TCP:
+                delegate = new NonBlockingConnectionPlainDelegate(this, getNetworkBufferSize());
+                break;
+            case SSL:
+                delegate = new NonBlockingConnectionTLSDelegate(this, getNetworkBufferSize(), createSSLEngine(address));
+                break;
+            default:
+                throw new IllegalArgumentException("Transport '"+address.getTransport()+"' is not supported");
+        }
+        setDelegate(delegate);
+    }
+
+    private static ProtocolEngine createProtocolEngine(final RemoteHostAddress<?> address, final AbstractVirtualHost<?> virtualHost)
+    {
+        for(OutboundProtocolEngineCreator engineCreator : (new QpidServiceLoader()).instancesOf(OutboundProtocolEngineCreator.class))
+        {
+            if(engineCreator.getVersion().equals(address.getProtocol()))
+            {
+                return engineCreator.newProtocolEngine(address, virtualHost);
+            }
+        }
+
+        return null;
+    }
+
+
+    @Override
+    protected long getOutboundMessageBufferLimit()
+    {
+        return _outboundMessageBufferLimit;
+    }
+
+    @Override
+    protected int getNetworkBufferSize()
+    {
+        return _networkBufferSize;
+    }
+
+
+    @Override
+    public boolean wantsConnect()
+    {
+        return !_connected && !(_connected = getSocketChannel().isConnected());
+    }
+
+    static SSLEngine createSSLEngine(RemoteHostAddress<?> address)
+    {
+        SSLEngine sslEngine = createSslContext(address).createSSLEngine();
+        sslEngine.setUseClientMode(true);
+        SSLUtil.updateEnabledTlsProtocols(sslEngine, address.getTlsProtocolWhiteList(), address.getTlsProtocolBlackList());
+        SSLUtil.updateEnabledCipherSuites(sslEngine, address.getTlsCipherSuiteWhiteList(), address.getTlsCipherSuiteBlackList());
+        if(address.getTlsCipherSuiteWhiteList() != null && !address.getTlsCipherSuiteWhiteList().isEmpty())
+        {
+            SSLUtil.useCipherOrderIfPossible(sslEngine);
+        }
+
+        return sslEngine;
+    }
+
+    private static SSLContext createSslContext(RemoteHostAddress<?> address)
+    {
+        KeyStore keyStore = address.getKeyStore();
+        Collection<TrustStore> trustStores = address.getTrustStores();
+
+        try
+        {
+            SSLContext sslContext = SSLUtil.tryGetSSLContext();
+
+            KeyManager[] keyManagers = keyStore.getKeyManagers();
+
+            TrustManager[] trustManagers;
+            if(trustStores == null || trustStores.isEmpty())
+            {
+                trustManagers = null;
+            }
+            else if(trustStores.size() == 1)
+            {
+                trustManagers = trustStores.iterator().next().getTrustManagers();
+            }
+            else
+            {
+                Collection<TrustManager> trustManagerList = new ArrayList<>();
+                final QpidMultipleTrustManager mulTrustManager = new QpidMultipleTrustManager();
+
+                for(TrustStore ts : trustStores)
+                {
+                    TrustManager[] managers = ts.getTrustManagers();
+                    if(managers != null)
+                    {
+                        for(TrustManager manager : managers)
+                        {
+                            if(manager instanceof X509TrustManager)
+                            {
+                                mulTrustManager.addTrustManager((X509TrustManager)manager);
+                            }
+                            else
+                            {
+                                trustManagerList.add(manager);
+                            }
+                        }
+                    }
+                }
+                if(!mulTrustManager.isEmpty())
+                {
+                    trustManagerList.add(mulTrustManager);
+                }
+                trustManagers = trustManagerList.toArray(new TrustManager[trustManagerList.size()]);
+            }
+            sslContext.init(keyManagers, trustManagers, null);
+
+            return sslContext;
+
+        }
+        catch (GeneralSecurityException e)
+        {
+            throw new IllegalArgumentException("Unable to create SSLContext for key or trust store", e);
+        }
+    }
+
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.Ticker;
+
+public interface SchedulableConnection extends ServerNetworkConnection, ByteBufferSender
+{
+    String getThreadName();
+
+    boolean isPartialRead();
+
+    Ticker getTicker();
+
+    SocketChannel getSocketChannel();
+
+    boolean wantsRead();
+
+    boolean wantsWrite();
+
+    boolean wantsConnect();
+
+    boolean isStateChanged();
+
+    void doPreWork();
+
+    boolean doWork();
+
+    NetworkConnectionScheduler getScheduler();
+
+    boolean setScheduled();
+
+    void clearScheduled();
+
+    SelectorThread.SelectionTask getSelectionTask();
+
+    void setSelectionTask(SelectorThread.SelectionTask selectionTask);
+
+    void processAmqpData(QpidByteBuffer applicationData);
+
+    long writeToTransport(Collection<QpidByteBuffer> bufferArray) throws IOException;
+
+    void reportUnexpectedByteBufferSizeUsage();
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Fri Oct 21 09:32:07 2016
@@ -69,10 +69,10 @@ class SelectorThread extends Thread
          * Queue of connections that are not currently scheduled and not registered with the selector.
          * These need to go back into the Selector.
          */
-        private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+        private final Queue<SchedulableConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
 
         /** Set of connections that are currently being selected upon */
-        private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+        private final Set<SchedulableConnection> _unscheduledConnections = new HashSet<>();
 
 
 
@@ -102,17 +102,17 @@ class SelectorThread extends Thread
             return _selector;
         }
 
-        public Queue<NonBlockingConnection> getUnregisteredConnections()
+        public Queue<SchedulableConnection> getUnregisteredConnections()
         {
             return _unregisteredConnections;
         }
 
-        public Set<NonBlockingConnection> getUnscheduledConnections()
+        public Set<SchedulableConnection> getUnscheduledConnections()
         {
             return _unscheduledConnections;
         }
 
-        private List<NonBlockingConnection> processUnscheduledConnections()
+        private List<SchedulableConnection> processUnscheduledConnections()
         {
             _nextTimeout = Integer.MAX_VALUE;
             if (getUnscheduledConnections().isEmpty())
@@ -120,13 +120,13 @@ class SelectorThread extends Thread
                 return Collections.emptyList();
             }
 
-            List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+            List<SchedulableConnection> toBeScheduled = new ArrayList<>();
 
             long currentTime = System.currentTimeMillis();
-            Iterator<NonBlockingConnection> iterator = getUnscheduledConnections().iterator();
+            Iterator<SchedulableConnection> iterator = getUnscheduledConnections().iterator();
             while (iterator.hasNext())
             {
-                NonBlockingConnection connection = iterator.next();
+                SchedulableConnection connection = iterator.next();
 
                 int period = connection.getTicker().getTimeToNextTick(currentTime);
 
@@ -153,7 +153,7 @@ class SelectorThread extends Thread
             return toBeScheduled;
         }
 
-        private List<NonBlockingConnection> processSelectionKeys()
+        private List<SchedulableConnection> processSelectionKeys()
         {
             Set<SelectionKey> selectionKeys = _selector.selectedKeys();
             if (selectionKeys.isEmpty())
@@ -161,7 +161,7 @@ class SelectorThread extends Thread
                 return Collections.emptyList();
             }
 
-            List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+            List<SchedulableConnection> toBeScheduled = new ArrayList<>();
             for (SelectionKey key : selectionKeys)
             {
                 if(key.isAcceptable())
@@ -212,7 +212,7 @@ class SelectorThread extends Thread
                 }
                 else
                 {
-                    NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+                    SchedulableConnection connection = (SchedulableConnection) key.attachment();
                     if(connection != null)
                     {
                         try
@@ -235,15 +235,15 @@ class SelectorThread extends Thread
             return toBeScheduled;
         }
 
-        private List<NonBlockingConnection> reregisterUnregisteredConnections()
+        private List<SchedulableConnection> reregisterUnregisteredConnections()
         {
             if (getUnregisteredConnections().isEmpty())
             {
                 return Collections.emptyList();
             }
-            List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
+            List<SchedulableConnection> unregisterableConnections = new ArrayList<>();
 
-            NonBlockingConnection unregisteredConnection;
+            SchedulableConnection unregisteredConnection;
             while ((unregisteredConnection = getUnregisteredConnections().poll()) != null)
             {
                 getUnscheduledConnections().add(unregisteredConnection);
@@ -302,21 +302,21 @@ class SelectorThread extends Thread
                                 {
                                     _inSelect.set(false);
                                 }
-                                for (NonBlockingConnection connection : processSelectionKeys())
+                                for (SchedulableConnection connection : processSelectionKeys())
                                 {
                                     if (connection.setScheduled())
                                     {
                                         connections.add(new ConnectionProcessor(_scheduler, connection));
                                     }
                                 }
-                                for (NonBlockingConnection connection : reregisterUnregisteredConnections())
+                                for (SchedulableConnection connection : reregisterUnregisteredConnections())
                                 {
                                     if (connection.setScheduled())
                                     {
                                         connections.add(new ConnectionProcessor(_scheduler, connection));
                                     }
                                 }
-                                for (NonBlockingConnection connection : processUnscheduledConnections())
+                                for (SchedulableConnection connection : processUnscheduledConnections())
                                 {
                                     if (connection.setScheduled())
                                     {
@@ -474,10 +474,10 @@ class SelectorThread extends Thread
     {
 
         private final NetworkConnectionScheduler _scheduler;
-        private final NonBlockingConnection _connection;
+        private final SchedulableConnection _connection;
         private AtomicBoolean _running = new AtomicBoolean();
 
-        public ConnectionProcessor(final NetworkConnectionScheduler scheduler, final NonBlockingConnection connection)
+        public ConnectionProcessor(final NetworkConnectionScheduler scheduler, final SchedulableConnection connection)
         {
             _scheduler = scheduler;
             _connection = connection;
@@ -506,7 +506,7 @@ class SelectorThread extends Thread
         }
     }
 
-    private void unregisterConnection(final NonBlockingConnection connection) throws ClosedChannelException
+    private void unregisterConnection(final SchedulableConnection connection) throws ClosedChannelException
     {
         SelectionKey register = connection.getSocketChannel().register(connection.getSelectionTask().getSelector(), 0);
         register.cancel();
@@ -521,14 +521,15 @@ class SelectorThread extends Thread
         }
     }
 
-    private boolean selectionInterestRequiresUpdate(NonBlockingConnection connection)
+    private boolean selectionInterestRequiresUpdate(SchedulableConnection connection)
     {
         SelectionTask selectionTask = connection.getSelectionTask();
         if(selectionTask != null)
         {
             final SelectionKey selectionKey = connection.getSocketChannel().keyFor(selectionTask.getSelector());
             int expectedOps = (connection.wantsRead() ? SelectionKey.OP_READ : 0)
-                              | (connection.wantsWrite() ? SelectionKey.OP_WRITE : 0);
+                              | (connection.wantsWrite() ? SelectionKey.OP_WRITE : 0)
+                              | (connection.wantsConnect() ? SelectionKey.OP_CONNECT : 0);
 
             try
             {
@@ -545,7 +546,7 @@ class SelectorThread extends Thread
         }
     }
 
-    public void addConnection(final NonBlockingConnection connection)
+    public void addConnection(final SchedulableConnection connection)
     {
         if(selectionInterestRequiresUpdate(connection))
         {
@@ -557,7 +558,7 @@ class SelectorThread extends Thread
 
     }
 
-    public void returnConnectionToSelector(final NonBlockingConnection connection)
+    public void returnConnectionToSelector(final SchedulableConnection connection)
     {
         if(selectionInterestRequiresUpdate(connection))
         {
@@ -583,7 +584,7 @@ class SelectorThread extends Thread
         return _selectionTasks[index];
     }
 
-    void removeConnection(NonBlockingConnection connection)
+    void removeConnection(SchedulableConnection connection)
     {
         try
         {
@@ -631,7 +632,7 @@ class SelectorThread extends Thread
 
     }
 
-     public void addToWork(final NonBlockingConnection connection)
+     public void addToWork(final SchedulableConnection connection)
      {
          if (_closed.get())
          {

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Fri Oct 21 09:32:07 2016
@@ -29,12 +29,14 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.ConsumerOption;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
@@ -48,12 +50,12 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
 
-public abstract class AbstractSystemMessageSource implements MessageSource
+public abstract class AbstractSystemMessageSource implements MessageSource<AbstractSystemMessageSource.SystemMessageSourceConsumer>
 {
-    protected final UUID _id;
-    protected final String _name;
-    protected final NamedAddressSpace _addressSpace;
-    private List<Consumer> _consumers = new CopyOnWriteArrayList<>();
+    private final UUID _id;
+    private final String _name;
+    private final NamedAddressSpace _addressSpace;
+    private List<SystemMessageSourceConsumer> _consumers = new CopyOnWriteArrayList<>();
 
     public AbstractSystemMessageSource(String name, final NamedAddressSpace addressSpace)
     {
@@ -82,22 +84,22 @@ public abstract class AbstractSystemMess
     }
 
     @Override
-    public Consumer addConsumer(final ConsumerTarget target,
-                                final FilterManager filters,
-                                final Class<? extends ServerMessage> messageClass,
-                                final String consumerName,
-                                final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+    public SystemMessageSourceConsumer addConsumer(final ConsumerTarget target,
+                                                   final FilterManager filters,
+                                                   final Class<? extends ServerMessage> messageClass,
+                                                   final String consumerName,
+                                                   final EnumSet<ConsumerOption> options, final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused
     {
-        final Consumer consumer = new Consumer(consumerName, target);
+        final SystemMessageSourceConsumer consumer = new SystemMessageSourceConsumer(consumerName, target);
         target.consumerAdded(consumer);
         _consumers.add(consumer);
         return consumer;
     }
 
     @Override
-    public Collection<Consumer> getConsumers()
+    public Collection<SystemMessageSourceConsumer> getConsumers()
     {
         return new ArrayList<>(_consumers);
     }
@@ -108,19 +110,23 @@ public abstract class AbstractSystemMess
         return true;
     }
 
-    protected class Consumer implements ConsumerImpl
+    public NamedAddressSpace getAddressSpace()
+    {
+        return _addressSpace;
+    }
+
+    protected class SystemMessageSourceConsumer implements MessageInstanceConsumer
     {
 
-        private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
         private final List<PropertiesMessageInstance> _queue =
                 Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
         private final ConsumerTarget _target;
         private final String _name;
         private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener =
-                new Consumer.TargetChangeListener();
-
+                new SystemMessageSourceConsumer.TargetChangeListener();
+        private final Object _identifier = new Object();
 
-        public Consumer(final String consumerName, ConsumerTarget target)
+        SystemMessageSourceConsumer(final String consumerName, ConsumerTarget target)
         {
             _name = consumerName;
             _target = target;
@@ -133,61 +139,22 @@ public abstract class AbstractSystemMess
 
         }
 
-        @Override
-        public ConsumerTarget getTarget()
-        {
-            return _target;
-        }
-
-        @Override
-        public long getBytesOut()
-        {
-            return 0;
-        }
-
-        @Override
-        public long getMessagesOut()
-        {
-            return 0;
-        }
-
-        @Override
-        public long getUnacknowledgedBytes()
+        public Object getIdentifier()
         {
-            return 0;
+            return _identifier;
         }
 
-        @Override
-        public long getUnacknowledgedMessages()
+        public ConsumerTarget getTarget()
         {
-            return 0;
+            return _target;
         }
 
-        @Override
         public AMQSessionModel getSessionModel()
         {
             return _target.getSessionModel();
         }
 
         @Override
-        public MessageSource getMessageSource()
-        {
-            return AbstractSystemMessageSource.this;
-        }
-
-        @Override
-        public long getConsumerNumber()
-        {
-            return _id;
-        }
-
-        @Override
-        public boolean isSuspended()
-        {
-            return false;
-        }
-
-        @Override
         public boolean isClosed()
         {
             return false;
@@ -200,42 +167,12 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean seesRequeues()
-        {
-            return false;
-        }
-
-        @Override
         public void close()
         {
             _consumers.remove(this);
         }
 
         @Override
-        public boolean trySendLock()
-        {
-            return _target.trySendLock();
-        }
-
-        @Override
-        public void getSendLock()
-        {
-            _target.getSendLock();
-        }
-
-        @Override
-        public void releaseSendLock()
-        {
-            _target.releaseSendLock();
-        }
-
-        @Override
-        public boolean isActive()
-        {
-            return false;
-        }
-
-        @Override
         public String getName()
         {
             return _name;
@@ -322,16 +259,15 @@ public abstract class AbstractSystemMess
 
     }
 
-    class PropertiesMessageInstance implements MessageInstance
+    private class PropertiesMessageInstance implements MessageInstance
     {
-        private final Consumer _consumer;
+        private final SystemMessageSourceConsumer _consumer;
         private int _deliveryCount;
         private boolean _isRedelivered;
-        private boolean _isDelivered;
         private boolean _isDeleted;
         private InternalMessage _message;
 
-        PropertiesMessageInstance(final Consumer consumer, final InternalMessage message)
+        PropertiesMessageInstance(final SystemMessageSourceConsumer consumer, final InternalMessage message)
         {
             _consumer = consumer;
             _message = message;
@@ -340,7 +276,7 @@ public abstract class AbstractSystemMess
         @Override
         public int getDeliveryCount()
         {
-            return 0;
+            return _deliveryCount;
         }
 
         @Override
@@ -375,7 +311,7 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public ConsumerImpl getAcquiringConsumer()
+        public MessageInstanceConsumer getAcquiringConsumer()
         {
             return _consumer;
         }
@@ -387,13 +323,13 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean isAcquiredBy(final ConsumerImpl consumer)
+        public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
         {
             return consumer == _consumer && !isDeleted();
         }
 
         @Override
-        public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+        public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
         {
             return consumer == _consumer;
         }
@@ -411,7 +347,7 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public Consumer getDeliveredConsumer()
+        public SystemMessageSourceConsumer getDeliveredConsumer()
         {
             return isDeleted() ? null : _consumer;
         }
@@ -423,7 +359,7 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean isRejectedBy(final ConsumerImpl consumer)
+        public boolean isRejectedBy(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -431,7 +367,7 @@ public abstract class AbstractSystemMess
         @Override
         public boolean getDeliveredToConsumer()
         {
-            return _isDelivered;
+            return _deliveryCount > 0;
         }
 
         @Override
@@ -441,13 +377,13 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean acquire(final ConsumerImpl sub)
+        public boolean acquire(final MessageInstanceConsumer sub)
         {
             return false;
         }
 
         @Override
-        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -465,7 +401,7 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public int routeToAlternate(final Action<? super MessageInstance> action,
+        public int routeToAlternate(final Action<? super BaseMessageInstance> action,
                                     final ServerTransaction txn)
         {
             return 0;
@@ -503,7 +439,7 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public void release(ConsumerImpl consumer)
+        public void release(MessageInstanceConsumer consumer)
         {
             release();
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org