You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/21 09:32:09 UTC
svn commit: r1765973 [4/7] - in /qpid/java/branches/transfer-queue:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/
bdbstore/src/test/java/org/apache/qpid/server/stor...
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,713 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+import java.security.cert.Certificate;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.util.SystemUtils;
+
+public abstract class NonBlockingConnection implements SchedulableConnection
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+
+ final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
+ private final SocketChannel _socketChannel;
+ private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
+ private final String _remoteAddressString;
+ private final AtomicBoolean _scheduled = new AtomicBoolean();
+ private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
+ private final AtomicLong _maxReadIdleMillis = new AtomicLong();
+ private final List<SchedulingDelayNotificationListener>
+ _schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
+ private final String _threadName;
+ private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+ private NonBlockingConnectionDelegate _delegate;
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private volatile SelectorThread.SelectionTask _selectionTask;
+ private volatile long _scheduledTime;
+ private final AtomicBoolean _hasShutdown = new AtomicBoolean();
+ private final AtomicBoolean _unexpectedByteBufferSizeReported = new AtomicBoolean();
+ private final ProtocolEngine _protocolEngine;
+ private volatile Iterator<Runnable> _pendingIterator;
+ private volatile boolean _fullyWritten = true;
+ private volatile boolean _partialRead = false;
+
+ protected NonBlockingConnection(final SocketChannel socketChannel, ProtocolEngine protocolEngine,
+ final NetworkConnectionScheduler scheduler,
+ String remoteAddressString)
+ {
+ _socketChannel = socketChannel;
+ _protocolEngine = protocolEngine;
+ _remoteAddressString = remoteAddressString;
+ _threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteAddressString;
+ pushScheduler(scheduler);
+ protocolEngine.setWorkListener(new Action<ProtocolEngine>()
+ {
+ @Override
+ public void performAction(final ProtocolEngine object)
+ {
+ NetworkConnectionScheduler scheduler = getScheduler();
+ if(scheduler != null)
+ {
+ scheduler.schedule(NonBlockingConnection.this);
+ }
+ }
+ });
+
+
+ }
+
+ @Override
+ public final String getThreadName()
+ {
+ return _threadName;
+ }
+
+
+ protected final String getRemoteAddressString()
+ {
+ return _remoteAddressString;
+ }
+
+ protected final boolean isClosed()
+ {
+ return _closed.get();
+ }
+
+ protected final boolean setClosed()
+ {
+ return _closed.compareAndSet(false,true);
+ }
+
+ @Override
+ public final SocketChannel getSocketChannel()
+ {
+ return _socketChannel;
+ }
+
+
+ @Override
+ public final SocketAddress getRemoteAddress()
+ {
+ return getSocketChannel().socket().getRemoteSocketAddress();
+ }
+
+ @Override
+ public final SocketAddress getLocalAddress()
+ {
+ return getSocketChannel().socket().getLocalSocketAddress();
+ }
+
+
+ public final void pushScheduler(NetworkConnectionScheduler scheduler)
+ {
+ _schedulerDeque.addFirst(scheduler);
+ }
+
+ public final NetworkConnectionScheduler popScheduler()
+ {
+ return _schedulerDeque.removeFirst();
+ }
+
+ @Override
+ public final NetworkConnectionScheduler getScheduler()
+ {
+ return _schedulerDeque.peekFirst();
+ }
+
+ @Override
+ public final Principal getPeerPrincipal()
+ {
+ return _delegate.getPeerPrincipal();
+ }
+
+ @Override
+ public final Certificate getPeerCertificate()
+ {
+ return _delegate.getPeerCertificate();
+ }
+
+ @Override
+ public final String getTransportInfo()
+ {
+ return _delegate.getTransportInfo();
+ }
+
+
+ @Override
+ public final SelectorThread.SelectionTask getSelectionTask()
+ {
+ return _selectionTask;
+ }
+
+ @Override
+ public final void setSelectionTask(final SelectorThread.SelectionTask selectionTask)
+ {
+ _selectionTask = selectionTask;
+ }
+
+ @Override
+ public final boolean setScheduled()
+ {
+ final boolean scheduled = _scheduled.compareAndSet(false, true);
+ if (scheduled)
+ {
+ _scheduledTime = System.currentTimeMillis();
+ }
+ return scheduled;
+ }
+
+ @Override
+ public final void clearScheduled()
+ {
+ _scheduled.set(false);
+ _scheduledTime = 0;
+ }
+
+ @Override
+ public final long getScheduledTime()
+ {
+ return _scheduledTime;
+ }
+
+ @Override
+ public final void setMaxWriteIdleMillis(final long millis)
+ {
+ _maxWriteIdleMillis.set(millis);
+ }
+
+ @Override
+ public final void setMaxReadIdleMillis(final long millis)
+ {
+ _maxReadIdleMillis.set(millis);
+ }
+
+ @Override
+ public final long getMaxReadIdleMillis()
+ {
+ return _maxReadIdleMillis.get();
+ }
+
+ @Override
+ public final long getMaxWriteIdleMillis()
+ {
+ return _maxWriteIdleMillis.get();
+ }
+ @Override
+ public final boolean isStateChanged()
+ {
+ return _protocolEngine.hasWork();
+ }
+
+ @Override
+ public final boolean wantsRead()
+ {
+ return _fullyWritten;
+ }
+
+ @Override
+ public final boolean wantsWrite()
+ {
+ return !_fullyWritten;
+ }
+
+ @Override
+ public final boolean isPartialRead()
+ {
+ return _partialRead;
+ }
+
+ @Override
+ public boolean doWork()
+ {
+ _protocolEngine.clearWork();
+ try
+ {
+
+ if (!isClosed() && (!wantsConnect() || completeConnection()))
+ {
+ long currentTime = System.currentTimeMillis();
+ int tick = getTicker().getTimeToNextTick(currentTime);
+ if (tick <= 0)
+ {
+ getTicker().tick(currentTime);
+ }
+
+ _protocolEngine.setIOThread(Thread.currentThread());
+ _protocolEngine.setMessageAssignmentSuspended(true, true);
+
+ boolean processPendingComplete = processPending();
+
+ if (processPendingComplete)
+ {
+ _pendingIterator = null;
+ _protocolEngine.setTransportBlockedForWriting(false);
+ boolean dataRead = doRead();
+ _fullyWritten = doWrite();
+ _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
+
+ if (!_fullyWritten || dataRead || (_delegate.needsWork()
+ && _delegate.getNetInputBuffer().position() != 0))
+ {
+ _protocolEngine.notifyWork();
+ }
+
+ if (_fullyWritten)
+ {
+ _protocolEngine.setMessageAssignmentSuspended(false, true);
+ }
+ }
+ else
+ {
+ _protocolEngine.notifyWork();
+ }
+ }
+ }
+ catch (IOException |
+ ConnectionScopedRuntimeException e)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Exception performing I/O for connection '{}'",
+ getRemoteAddressString(), e);
+ }
+ else
+ {
+ LOGGER.info("Exception performing I/O for connection '{}' : {}",
+ getRemoteAddressString(), e.getMessage());
+ }
+
+ if(setClosed())
+ {
+ _protocolEngine.notifyWork();
+ }
+ }
+ finally
+ {
+ _protocolEngine.setIOThread(null);
+ }
+
+ final boolean closed = isClosed();
+ if (closed)
+ {
+ shutdown();
+ }
+
+ return closed;
+
+ }
+
+ private boolean completeConnection() throws IOException
+ {
+ boolean finishConnect = getSocketChannel().finishConnect();
+ return finishConnect && !wantsConnect();
+ }
+
+
+ private boolean processPending() throws IOException
+ {
+ if(_pendingIterator == null)
+ {
+ _pendingIterator = _protocolEngine.processPendingIterator();
+ }
+
+ final int networkBufferSize = getNetworkBufferSize();
+
+ while(_pendingIterator.hasNext())
+ {
+ long size = getBufferedSize();
+ if(size >= networkBufferSize)
+ {
+ _fullyWritten = doWrite();
+ long bytesWritten = size - getBufferedSize();
+ if(bytesWritten < (networkBufferSize / 2))
+ {
+ break;
+ }
+ }
+ else
+ {
+ final Runnable task = _pendingIterator.next();
+ task.run();
+ }
+ }
+
+ boolean complete = !_pendingIterator.hasNext();
+ if (getBufferedSize() >= networkBufferSize)
+ {
+ _fullyWritten = doWrite();
+ complete &= getBufferedSize() < networkBufferSize /2;
+ }
+ return complete;
+ }
+
+ protected abstract int getNetworkBufferSize();
+
+
+ @Override
+ public final void doPreWork()
+ {
+ if (!isClosed())
+ {
+ long currentTime = System.currentTimeMillis();
+ long schedulingDelay = currentTime - getScheduledTime();
+ if (!_schedulingDelayNotificationListeners.isEmpty())
+ {
+ for (SchedulingDelayNotificationListener listener : _schedulingDelayNotificationListeners)
+ {
+ listener.notifySchedulingDelay(schedulingDelay);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * doRead is not reentrant.
+ */
+ boolean doRead() throws IOException
+ {
+ _partialRead = false;
+ if(!isClosed() && _delegate.readyForRead())
+ {
+ int readData = readFromNetwork();
+
+ if (readData > 0)
+ {
+ return _delegate.processData();
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ @Override
+ public long writeToTransport(Collection<QpidByteBuffer> buffers) throws IOException
+ {
+ long written = QpidByteBuffer.write(getSocketChannel(), buffers);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " bytes");
+ }
+ return written;
+ }
+
+ protected int readFromNetwork() throws IOException
+ {
+ QpidByteBuffer buffer = _delegate.getNetInputBuffer();
+
+ int read = buffer.read(getSocketChannel());
+ if (read == -1)
+ {
+ setClosed();
+ }
+
+ _partialRead = read != 0;
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " byte(s)");
+ }
+ return read;
+ }
+
+
+
+
+ @Override
+ public final void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
+ {
+ _schedulingDelayNotificationListeners.add(listener);
+ }
+
+ @Override
+ public final void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
+ {
+ _schedulingDelayNotificationListeners.remove(listener);
+ }
+
+ @Override
+ public final ByteBufferSender getSender()
+ {
+ return this;
+ }
+
+ @Override
+ public final boolean isDirectBufferPreferred()
+ {
+ return true;
+ }
+
+ private final long getBufferedSize()
+ {
+ // Avoids iterator garbage if empty
+ if (_buffers.isEmpty())
+ {
+ return 0L;
+ }
+
+ long totalSize = 0L;
+ for(QpidByteBuffer buf : _buffers)
+ {
+ totalSize += buf.remaining();
+ }
+ return totalSize;
+ }
+
+ final boolean doWrite() throws IOException
+ {
+ boolean fullyWritten = _delegate.doWrite(_buffers);
+ while(!_buffers.isEmpty())
+ {
+ QpidByteBuffer buf = _buffers.peek();
+ if(buf.hasRemaining())
+ {
+ break;
+ }
+ _buffers.poll();
+ buf.dispose();
+ }
+ if (fullyWritten)
+ {
+ _usedOutboundMessageSpace.set(0);
+ }
+ return fullyWritten;
+
+ }
+
+ @Override
+ public final void send(final QpidByteBuffer msg)
+ {
+
+ if (isClosed())
+ {
+ LOGGER.warn("Send ignored as the connection is already closed");
+ }
+ else if (msg.remaining() > 0)
+ {
+ _buffers.add(msg.duplicate());
+ }
+ msg.position(msg.limit());
+ }
+
+
+ protected final void shutdown()
+ {
+ if (_hasShutdown.getAndSet(true))
+ {
+ return;
+ }
+
+ try
+ {
+ shutdownInput();
+ shutdownFinalWrite();
+
+ _protocolEngine.closed();
+
+ shutdownOutput();
+ }
+ finally
+ {
+ try
+ {
+ try
+ {
+ NetworkConnectionScheduler scheduler = getScheduler();
+ if (scheduler != null)
+ {
+ scheduler.removeConnection(this);
+ }
+ }
+ finally
+ {
+ getSocketChannel().close();
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception closing socket '{}': {}", getRemoteAddressString(), e.getMessage());
+ }
+
+ if (SystemUtils.isWindows())
+ {
+ _delegate.shutdownInput();
+ _delegate.shutdownOutput();
+ }
+ }
+ }
+
+ private void shutdownFinalWrite()
+ {
+ try
+ {
+ while(!doWrite())
+ {
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing final write/close for '{}': {}", getRemoteAddressString(), e.getMessage());
+ }
+ }
+
+ private void shutdownOutput()
+ {
+ if(!SystemUtils.isWindows())
+ {
+ try
+ {
+ getSocketChannel().shutdownOutput();
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception closing socket '{}': {}", getRemoteAddressString(), e.getMessage());
+ }
+ finally
+ {
+ _delegate.shutdownOutput();
+ }
+ }
+ }
+
+ private void shutdownInput()
+ {
+
+ if(!SystemUtils.isWindows())
+ {
+ try
+ {
+ getSocketChannel().shutdownInput();
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception shutting down input for '{}': {}", getRemoteAddressString(), e.getMessage());
+ }
+ finally
+ {
+ _delegate.shutdownInput();
+ }
+ }
+ }
+
+
+ @Override
+ public final void start()
+ {
+ }
+
+ @Override
+ public final void flush()
+ {
+ }
+
+
+ @Override
+ public final void close()
+ {
+ LOGGER.debug("Closing " + getRemoteAddressString());
+ if(setClosed())
+ {
+ _protocolEngine.notifyWork();
+ getSelectionTask().wakeup();
+ }
+ }
+
+
+ @Override
+ public void processAmqpData(QpidByteBuffer applicationData)
+ {
+ _protocolEngine.received(applicationData);
+ }
+
+
+ @Override
+ public final void reserveOutboundMessageSpace(long size)
+ {
+ if (_usedOutboundMessageSpace.addAndGet(size) > getOutboundMessageBufferLimit())
+ {
+ _protocolEngine.setMessageAssignmentSuspended(true, false);
+ }
+ }
+
+
+ @Override
+ public final Ticker getTicker()
+ {
+ return _protocolEngine.getAggregateTicker();
+ }
+
+
+ protected abstract long getOutboundMessageBufferLimit();
+
+
+ public final void reportUnexpectedByteBufferSizeUsage()
+ {
+ if (_unexpectedByteBufferSizeReported.compareAndSet(false,true))
+ {
+ LOGGER.info("At least one frame unexpectedly does not fit into default byte buffer size ({}B) on a connection {}.",
+ getNetworkBufferSize(), this.toString());
+ }
+ }
+
+ protected NonBlockingConnectionDelegate getDelegate()
+ {
+ return _delegate;
+ }
+
+ protected void setDelegate(NonBlockingConnectionDelegate delegate)
+ {
+ _delegate = delegate;
+ }
+
+ public final ProtocolEngine getProtocolEngine()
+ {
+ return _protocolEngine;
+ }
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java Fri Oct 21 09:32:07 2016
@@ -24,7 +24,6 @@ import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
-import org.apache.qpid.server.model.port.AmqpPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,14 +33,14 @@ public class NonBlockingConnectionPlainD
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnectionPlainDelegate.class);
- private final NonBlockingConnection _parent;
+ private final SchedulableConnection _parent;
private final int _networkBufferSize;
private volatile QpidByteBuffer _netInputBuffer;
- public NonBlockingConnectionPlainDelegate(NonBlockingConnection parent, AmqpPort<?> port)
+ public NonBlockingConnectionPlainDelegate(SchedulableConnection parent, int networkBufferSize)
{
_parent = parent;
- _networkBufferSize = port.getNetworkBufferSize();
+ _networkBufferSize = networkBufferSize;
_netInputBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
}
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java Fri Oct 21 09:32:07 2016
@@ -19,19 +19,6 @@
package org.apache.qpid.server.transport;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-
import java.io.IOException;
import java.security.Principal;
import java.security.cert.Certificate;
@@ -41,12 +28,24 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDelegate
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnectionTLSDelegate.class);
private final SSLEngine _sslEngine;
- private final NonBlockingConnection _parent;
+ private final SchedulableConnection _parent;
private final int _networkBufferSize;
private SSLEngineResult _status;
private final List<QpidByteBuffer> _encryptedOutput = new ArrayList<>();
@@ -58,11 +57,11 @@ public class NonBlockingConnectionTLSDel
private QpidByteBuffer _applicationBuffer;
- public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent, AmqpPort port)
+ public NonBlockingConnectionTLSDelegate(SchedulableConnection parent, int networkBufferSize, SSLEngine sslEngine)
{
_parent = parent;
- _sslEngine = createSSLEngine(port);
- _networkBufferSize = port.getNetworkBufferSize();
+ _sslEngine = sslEngine;
+ _networkBufferSize = networkBufferSize;
final int tlsPacketBufferSize = _sslEngine.getSession().getPacketBufferSize();
if (tlsPacketBufferSize > _networkBufferSize)
@@ -313,28 +312,6 @@ public class NonBlockingConnectionTLSDel
}
}
- private SSLEngine createSSLEngine(AmqpPort<?> port)
- {
- SSLEngine sslEngine = port.getSSLContext().createSSLEngine();
- sslEngine.setUseClientMode(false);
- SSLUtil.updateEnabledTlsProtocols(sslEngine, port.getTlsProtocolWhiteList(), port.getTlsProtocolBlackList());
- SSLUtil.updateEnabledCipherSuites(sslEngine, port.getTlsCipherSuiteWhiteList(), port.getTlsCipherSuiteBlackList());
- if(port.getTlsCipherSuiteWhiteList() != null && !port.getTlsCipherSuiteWhiteList().isEmpty())
- {
- SSLUtil.useCipherOrderIfPossible(sslEngine);
- }
-
- if(port.getNeedClientAuth())
- {
- sslEngine.setNeedClientAuth(true);
- }
- else if(port.getWantClientAuth())
- {
- sslEngine.setWantClientAuth(true);
- }
- return sslEngine;
- }
-
@Override
public QpidByteBuffer getNetInputBuffer()
{
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java Fri Oct 21 09:32:07 2016
@@ -19,22 +19,22 @@
package org.apache.qpid.server.transport;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.transport.network.TransportEncryption;
-
import java.io.IOException;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.transport.network.TransportEncryption;
+
public class NonBlockingConnectionUndecidedDelegate implements NonBlockingConnectionDelegate
{
private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
- public final NonBlockingConnection _parent;
+ public final NonBlockingInboundConnection _parent;
private QpidByteBuffer _netInputBuffer;
- public NonBlockingConnectionUndecidedDelegate(NonBlockingConnection parent)
+ public NonBlockingConnectionUndecidedDelegate(NonBlockingInboundConnection parent)
{
_parent = parent;
_netInputBuffer = QpidByteBuffer.allocateDirect(NUMBER_OF_BYTES_FOR_TLS_CHECK);
Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingInboundConnection.java (from r1761440, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingInboundConnection.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingInboundConnection.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java&r1=1761440&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingInboundConnection.java Fri Oct 21 09:32:07 2016
@@ -20,94 +20,44 @@
*/
package org.apache.qpid.server.transport;
-import java.io.IOException;
-import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
-import java.security.Principal;
-import java.security.cert.Certificate;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.List;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.util.SystemUtils;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-public class NonBlockingConnection implements ServerNetworkConnection, ByteBufferSender
+public class NonBlockingInboundConnection extends NonBlockingConnection
{
- private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingInboundConnection.class);
- private final SocketChannel _socketChannel;
- private NonBlockingConnectionDelegate _delegate;
- private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
- private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
-
- private final String _remoteSocketAddress;
- private final AtomicBoolean _closed = new AtomicBoolean(false);
- private final ProtocolEngine _protocolEngine;
private final Runnable _onTransportEncryptionAction;
- private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
private final long _outboundMessageBufferLimit;
- private volatile boolean _fullyWritten = true;
-
- private boolean _partialRead = false;
private final AmqpPort _port;
- private final AtomicBoolean _scheduled = new AtomicBoolean();
- private volatile long _scheduledTime;
- private volatile boolean _unexpectedByteBufferSizeReported;
- private final String _threadName;
- private volatile SelectorThread.SelectionTask _selectionTask;
- private Iterator<Runnable> _pendingIterator;
- private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
- private final AtomicLong _maxReadIdleMillis = new AtomicLong();
- private final List<SchedulingDelayNotificationListener> _schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
- private final AtomicBoolean _hasShutdown = new AtomicBoolean();
-
- public NonBlockingConnection(SocketChannel socketChannel,
- ProtocolEngine protocolEngine,
- final Set<TransportEncryption> encryptionSet,
- final Runnable onTransportEncryptionAction,
- final NetworkConnectionScheduler scheduler,
- final AmqpPort port)
- {
- _socketChannel = socketChannel;
- pushScheduler(scheduler);
- _protocolEngine = protocolEngine;
+ public NonBlockingInboundConnection(SocketChannel socketChannel,
+ ProtocolEngine protocolEngine,
+ final Set<TransportEncryption> encryptionSet,
+ final Runnable onTransportEncryptionAction,
+ final NetworkConnectionScheduler scheduler,
+ final AmqpPort port)
+ {
+ super(socketChannel, protocolEngine, scheduler, socketChannel.socket().getRemoteSocketAddress().toString());
_onTransportEncryptionAction = onTransportEncryptionAction;
- _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
_port = port;
- _threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteSocketAddress.toString();
_outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
- protocolEngine.setWorkListener(new Action<ProtocolEngine>()
- {
- @Override
- public void performAction(final ProtocolEngine object)
- {
- getScheduler().schedule(NonBlockingConnection.this);
- }
- });
if(encryptionSet.size() == 1)
{
@@ -115,518 +65,71 @@ public class NonBlockingConnection imple
}
else
{
- _delegate = new NonBlockingConnectionUndecidedDelegate(this);
- }
-
- }
-
- String getThreadName()
- {
- return _threadName;
- }
-
- public boolean isPartialRead()
- {
- return _partialRead;
- }
-
- Ticker getTicker()
- {
- return _protocolEngine.getAggregateTicker();
- }
-
- SocketChannel getSocketChannel()
- {
- return _socketChannel;
- }
-
- @Override
- public void start()
- {
- }
-
- @Override
- public ByteBufferSender getSender()
- {
- return this;
- }
-
- @Override
- public void close()
- {
- LOGGER.debug("Closing " + _remoteSocketAddress);
- if(_closed.compareAndSet(false,true))
- {
- _protocolEngine.notifyWork();
- _selectionTask.wakeup();
- }
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _socketChannel.socket().getRemoteSocketAddress();
- }
-
- @Override
- public SocketAddress getLocalAddress()
- {
- return _socketChannel.socket().getLocalSocketAddress();
- }
-
- @Override
- public void setMaxWriteIdleMillis(final long millis)
- {
- _maxWriteIdleMillis.set(millis);
- }
-
- @Override
- public void setMaxReadIdleMillis(final long millis)
- {
- _maxReadIdleMillis.set(millis);
- }
-
- @Override
- public Principal getPeerPrincipal()
- {
- return _delegate.getPeerPrincipal();
- }
-
- @Override
- public Certificate getPeerCertificate()
- {
- return _delegate.getPeerCertificate();
- }
-
- @Override
- public long getMaxReadIdleMillis()
- {
- return _maxReadIdleMillis.get();
- }
-
- @Override
- public long getMaxWriteIdleMillis()
- {
- return _maxWriteIdleMillis.get();
- }
-
- @Override
- public void reserveOutboundMessageSpace(long size)
- {
- if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
- {
- _protocolEngine.setMessageAssignmentSuspended(true, false);
- }
- }
-
- @Override
- public String getTransportInfo()
- {
- return _delegate.getTransportInfo();
- }
-
- boolean wantsRead()
- {
- return _fullyWritten;
- }
-
- boolean wantsWrite()
- {
- return !_fullyWritten;
- }
-
- public boolean isStateChanged()
- {
- return _protocolEngine.hasWork();
- }
-
- public void doPreWork()
- {
- if (!_closed.get())
- {
- long currentTime = System.currentTimeMillis();
- long schedulingDelay = currentTime - getScheduledTime();
- if (!_schedulingDelayNotificationListeners.isEmpty())
- {
- for (SchedulingDelayNotificationListener listener : _schedulingDelayNotificationListeners)
- {
- listener.notifySchedulingDelay(schedulingDelay);
- }
- }
- }
- }
-
- public boolean doWork()
- {
- _protocolEngine.clearWork();
- if (!_closed.get())
- {
- try
- {
- long currentTime = System.currentTimeMillis();
- int tick = getTicker().getTimeToNextTick(currentTime);
- if (tick <= 0)
- {
- getTicker().tick(currentTime);
- }
-
- _protocolEngine.setIOThread(Thread.currentThread());
- _protocolEngine.setMessageAssignmentSuspended(true, true);
-
- boolean processPendingComplete = processPending();
-
- if(processPendingComplete)
- {
- _pendingIterator = null;
- _protocolEngine.setTransportBlockedForWriting(false);
- boolean dataRead = doRead();
- _protocolEngine.setTransportBlockedForWriting(!doWrite());
-
- if (!_fullyWritten || dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
- {
- _protocolEngine.notifyWork();
- }
-
- if (_fullyWritten)
- {
- _protocolEngine.setMessageAssignmentSuspended(false, true);
- }
- }
- else
- {
- _protocolEngine.notifyWork();
- }
-
- }
- catch (IOException |
- ConnectionScopedRuntimeException e)
- {
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Exception performing I/O for connection '{}'",
- _remoteSocketAddress, e);
- }
- else
- {
- LOGGER.info("Exception performing I/O for connection '{}' : {}",
- _remoteSocketAddress, e.getMessage());
- }
-
- if(_closed.compareAndSet(false,true))
- {
- _protocolEngine.notifyWork();
- }
- }
- finally
- {
- _protocolEngine.setIOThread(null);
- }
- }
-
- final boolean closed = _closed.get();
- if (closed)
- {
- shutdown();
- }
-
- return closed;
-
- }
-
- @Override
- public void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
- {
- _schedulingDelayNotificationListeners.add(listener);
- }
-
- @Override
- public void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
- {
- _schedulingDelayNotificationListeners.remove(listener);
- }
-
- private boolean processPending() throws IOException
- {
- if(_pendingIterator == null)
- {
- _pendingIterator = _protocolEngine.processPendingIterator();
- }
-
- final int networkBufferSize = _port.getNetworkBufferSize();
-
- while(_pendingIterator.hasNext())
- {
- long size = getBufferedSize();
- if(size >= networkBufferSize)
- {
- doWrite();
- long bytesWritten = size - getBufferedSize();
- if(bytesWritten < (networkBufferSize / 2))
- {
- break;
- }
- }
- else
- {
- final Runnable task = _pendingIterator.next();
- task.run();
- }
- }
-
- boolean complete = !_pendingIterator.hasNext();
- if (getBufferedSize() >= networkBufferSize)
- {
- doWrite();
- complete &= getBufferedSize() < networkBufferSize /2;
- }
- return complete;
- }
-
- private long getBufferedSize()
- {
- long totalSize = 0l;
- for(QpidByteBuffer buf : _buffers)
- {
- totalSize += buf.remaining();
- }
- return totalSize;
- }
-
- private void shutdown()
- {
- if (_hasShutdown.getAndSet(true))
- {
- return;
- }
-
- try
- {
- shutdownInput();
- shutdownFinalWrite();
- _protocolEngine.closed();
- shutdownOutput();
- }
- finally
- {
- try
- {
- try
- {
- NetworkConnectionScheduler scheduler = getScheduler();
- if (scheduler != null)
- {
- scheduler.removeConnection(this);
- }
- }
- finally
- {
- _socketChannel.close();
- }
- }
- catch (IOException e)
- {
- LOGGER.info("Exception closing socket '{}': {}", _remoteSocketAddress, e.getMessage());
- }
-
- if (SystemUtils.isWindows())
- {
- _delegate.shutdownInput();
- _delegate.shutdownOutput();
- }
- }
- }
-
- private void shutdownFinalWrite()
- {
- try
- {
- while(!doWrite())
- {
- }
- }
- catch (IOException e)
- {
- LOGGER.info("Exception performing final write/close for '{}': {}", _remoteSocketAddress, e.getMessage());
- }
- }
-
- private void shutdownOutput()
- {
- if(!SystemUtils.isWindows())
- {
- try
- {
- _socketChannel.shutdownOutput();
- }
- catch (IOException e)
- {
- LOGGER.info("Exception closing socket '{}': {}", _remoteSocketAddress, e.getMessage());
- }
- finally
- {
- _delegate.shutdownOutput();
- }
- }
- }
-
- private void shutdownInput()
- {
-
- if(!SystemUtils.isWindows())
- {
- try
- {
- _socketChannel.shutdownInput();
- }
- catch (IOException e)
- {
- LOGGER.info("Exception shutting down input for '{}': {}", _remoteSocketAddress, e.getMessage());
- }
- finally
- {
- _delegate.shutdownInput();
- }
- }
- }
-
- /**
- * doRead is not reentrant.
- */
- boolean doRead() throws IOException
- {
- _partialRead = false;
- if(!_closed.get() && _delegate.readyForRead())
- {
- int readData = readFromNetwork();
-
- if (readData > 0)
- {
- return _delegate.processData();
- }
- else
- {
- return false;
- }
- }
- else
- {
- return false;
+ setDelegate(new NonBlockingConnectionUndecidedDelegate(this));
}
- }
- long writeToTransport(Collection<QpidByteBuffer> buffers) throws IOException
- {
- long written = QpidByteBuffer.write(_socketChannel, buffers);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Written " + written + " bytes");
- }
- return written;
}
- private boolean doWrite() throws IOException
+ static SSLEngine createSSLEngine(AmqpPort<?> port)
{
- _fullyWritten = _delegate.doWrite(_buffers);
- while(!_buffers.isEmpty())
+ SSLEngine sslEngine = port.getSSLContext().createSSLEngine();
+ sslEngine.setUseClientMode(false);
+ SSLUtil.updateEnabledTlsProtocols(sslEngine, port.getTlsProtocolWhiteList(), port.getTlsProtocolBlackList());
+ SSLUtil.updateEnabledCipherSuites(sslEngine, port.getTlsCipherSuiteWhiteList(), port.getTlsCipherSuiteBlackList());
+ if(port.getTlsCipherSuiteWhiteList() != null && !port.getTlsCipherSuiteWhiteList().isEmpty())
{
- QpidByteBuffer buf = _buffers.peek();
- if(buf.hasRemaining())
- {
- break;
- }
- _buffers.poll();
- buf.dispose();
+ SSLUtil.useCipherOrderIfPossible(sslEngine);
}
- if (_fullyWritten)
- {
- _usedOutboundMessageSpace.set(0);
- }
- return _fullyWritten;
-
- }
-
- protected int readFromNetwork() throws IOException
- {
- QpidByteBuffer buffer = _delegate.getNetInputBuffer();
- int read = buffer.read(_socketChannel);
- if (read == -1)
+ if(port.getNeedClientAuth())
{
- _closed.set(true);
+ sslEngine.setNeedClientAuth(true);
}
-
- _partialRead = read != 0;
-
- if (LOGGER.isDebugEnabled())
+ else if(port.getWantClientAuth())
{
- LOGGER.debug("Read " + read + " byte(s)");
+ sslEngine.setWantClientAuth(true);
}
- return read;
+ return sslEngine;
}
@Override
- public boolean isDirectBufferPreferred()
+ protected long getOutboundMessageBufferLimit()
{
- return true;
+ return _outboundMessageBufferLimit;
}
@Override
- public void send(final QpidByteBuffer msg)
+ protected int getNetworkBufferSize()
{
-
- if (_closed.get())
- {
- LOGGER.warn("Send ignored as the connection is already closed");
- }
- else if (msg.remaining() > 0)
- {
- _buffers.add(msg.duplicate());
- }
- msg.position(msg.limit());
+ return _port.getNetworkBufferSize();
}
@Override
- public void flush()
- {
- }
-
- public final void pushScheduler(NetworkConnectionScheduler scheduler)
- {
- _schedulerDeque.addFirst(scheduler);
- }
-
- public final NetworkConnectionScheduler popScheduler()
+ public boolean wantsConnect()
{
- return _schedulerDeque.removeFirst();
+ return false;
}
- public final NetworkConnectionScheduler getScheduler()
- {
- return _schedulerDeque.peekFirst();
- }
@Override
public String toString()
{
- return "[NonBlockingConnection " + _remoteSocketAddress + "]";
- }
-
- public void processAmqpData(QpidByteBuffer applicationData)
- {
- _protocolEngine.received(applicationData);
+ return "[InboundConnection " + getRemoteAddressString() + "]";
}
public void setTransportEncryption(TransportEncryption transportEncryption)
{
- NonBlockingConnectionDelegate oldDelegate = _delegate;
+ NonBlockingConnectionDelegate oldDelegate = getDelegate();
switch (transportEncryption)
{
case TLS:
_onTransportEncryptionAction.run();
- _delegate = new NonBlockingConnectionTLSDelegate(this, _port);
+ setDelegate(new NonBlockingConnectionTLSDelegate(this,
+ _port.getNetworkBufferSize(),
+ createSSLEngine(_port)));
break;
case NONE:
- _delegate = new NonBlockingConnectionPlainDelegate(this, _port);
+ setDelegate(new NonBlockingConnectionPlainDelegate(this, _port.getNetworkBufferSize()));
break;
default:
throw new IllegalArgumentException("unknown TransportEncryption " + transportEncryption);
@@ -635,51 +138,11 @@ public class NonBlockingConnection imple
{
QpidByteBuffer src = oldDelegate.getNetInputBuffer().duplicate();
src.flip();
- _delegate.getNetInputBuffer().put(src);
+ getDelegate().getNetInputBuffer().put(src);
src.dispose();
}
LOGGER.debug("Identified transport encryption as " + transportEncryption);
}
- public boolean setScheduled()
- {
- final boolean scheduled = _scheduled.compareAndSet(false, true);
- if (scheduled)
- {
- _scheduledTime = System.currentTimeMillis();
- }
- return scheduled;
- }
-
- public void clearScheduled()
- {
- _scheduled.set(false);
- _scheduledTime = 0;
- }
-
- @Override
- public long getScheduledTime()
- {
- return _scheduledTime;
- }
- void reportUnexpectedByteBufferSizeUsage()
- {
- if (!_unexpectedByteBufferSizeReported)
- {
- LOGGER.info("At least one frame unexpectedly does not fit into default byte buffer size ({}B) on a connection {}.",
- _port.getNetworkBufferSize(), this.toString());
- _unexpectedByteBufferSizeReported = true;
- }
- }
-
- public SelectorThread.SelectionTask getSelectionTask()
- {
- return _selectionTask;
- }
-
- public void setSelectionTask(final SelectorThread.SelectionTask selectionTask)
- {
- _selectionTask = selectionTask;
- }
}
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Fri Oct 21 09:32:07 2016
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.transport;
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
@@ -30,16 +32,14 @@ import java.nio.channels.SocketChannel;
import java.util.EnumSet;
import java.util.Set;
-import org.apache.qpid.server.model.port.AmqpPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.TransportEncryption;
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-
public class NonBlockingNetworkTransport
{
@@ -156,10 +156,10 @@ public class NonBlockingNetworkTransport
NonBlockingConnection connection =
- new NonBlockingConnection(socketChannel,
- engine,
- _encryptionSet,
- new Runnable()
+ new NonBlockingInboundConnection(socketChannel,
+ engine,
+ _encryptionSet,
+ new Runnable()
{
@Override
@@ -168,8 +168,8 @@ public class NonBlockingNetworkTransport
engine.encryptedTransport();
}
},
- _scheduler,
- _port);
+ _scheduler,
+ _port);
engine.setNetworkConnection(connection);
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.nio.channels.SocketChannel;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.federation.OutboundProtocolEngine;
+import org.apache.qpid.server.model.Container;
+import org.apache.qpid.server.model.KeyStore;
+import org.apache.qpid.server.model.RemoteHostAddress;
+import org.apache.qpid.server.model.TrustStore;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.plugin.OutboundProtocolEngineCreator;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+public class NonBlockingOutboundConnection extends NonBlockingConnection
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingOutboundConnection.class);
+
+ private final AbstractVirtualHost<?> _virtualHost;
+ private final RemoteHostAddress<?> _address;
+ private final int _networkBufferSize;
+ private final long _outboundMessageBufferLimit;
+ private volatile boolean _connected;
+
+ public NonBlockingOutboundConnection(SocketChannel socketChannel,
+ final RemoteHostAddress<?> address,
+ final NetworkConnectionScheduler networkConnectionScheduler,
+ final AbstractVirtualHost<?> virtualHost,
+ final Action<Boolean> onConnectionLoss)
+ {
+ super(socketChannel, createProtocolEngine(address, virtualHost), networkConnectionScheduler, address.getAddress()+":"+address.getPort());
+ OutboundProtocolEngine protocolEngine = (OutboundProtocolEngine) getProtocolEngine();
+ protocolEngine.setConnection(this);
+ protocolEngine.setOnClosedTask(onConnectionLoss);
+ _virtualHost = virtualHost;
+ _address = address;
+ _networkBufferSize = virtualHost.getAncestor(Container.class).getNetworkBufferSize();
+ _outboundMessageBufferLimit = (long) virtualHost.getContextValue(Long.class,
+ AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
+
+ final NonBlockingConnectionDelegate delegate;
+ switch(address.getTransport())
+ {
+ case TCP:
+ delegate = new NonBlockingConnectionPlainDelegate(this, getNetworkBufferSize());
+ break;
+ case SSL:
+ delegate = new NonBlockingConnectionTLSDelegate(this, getNetworkBufferSize(), createSSLEngine(address));
+ break;
+ default:
+ throw new IllegalArgumentException("Transport '"+address.getTransport()+"' is not supported");
+ }
+ setDelegate(delegate);
+ }
+
+ private static ProtocolEngine createProtocolEngine(final RemoteHostAddress<?> address, final AbstractVirtualHost<?> virtualHost)
+ {
+ for(OutboundProtocolEngineCreator engineCreator : (new QpidServiceLoader()).instancesOf(OutboundProtocolEngineCreator.class))
+ {
+ if(engineCreator.getVersion().equals(address.getProtocol()))
+ {
+ return engineCreator.newProtocolEngine(address, virtualHost);
+ }
+ }
+
+ return null;
+ }
+
+
+ @Override
+ protected long getOutboundMessageBufferLimit()
+ {
+ return _outboundMessageBufferLimit;
+ }
+
+ @Override
+ protected int getNetworkBufferSize()
+ {
+ return _networkBufferSize;
+ }
+
+
+ @Override
+ public boolean wantsConnect()
+ {
+ return !_connected && !(_connected = getSocketChannel().isConnected());
+ }
+
+ static SSLEngine createSSLEngine(RemoteHostAddress<?> address)
+ {
+ SSLEngine sslEngine = createSslContext(address).createSSLEngine();
+ sslEngine.setUseClientMode(true);
+ SSLUtil.updateEnabledTlsProtocols(sslEngine, address.getTlsProtocolWhiteList(), address.getTlsProtocolBlackList());
+ SSLUtil.updateEnabledCipherSuites(sslEngine, address.getTlsCipherSuiteWhiteList(), address.getTlsCipherSuiteBlackList());
+ if(address.getTlsCipherSuiteWhiteList() != null && !address.getTlsCipherSuiteWhiteList().isEmpty())
+ {
+ SSLUtil.useCipherOrderIfPossible(sslEngine);
+ }
+
+ return sslEngine;
+ }
+
+ private static SSLContext createSslContext(RemoteHostAddress<?> address)
+ {
+ KeyStore keyStore = address.getKeyStore();
+ Collection<TrustStore> trustStores = address.getTrustStores();
+
+ try
+ {
+ SSLContext sslContext = SSLUtil.tryGetSSLContext();
+
+ KeyManager[] keyManagers = keyStore.getKeyManagers();
+
+ TrustManager[] trustManagers;
+ if(trustStores == null || trustStores.isEmpty())
+ {
+ trustManagers = null;
+ }
+ else if(trustStores.size() == 1)
+ {
+ trustManagers = trustStores.iterator().next().getTrustManagers();
+ }
+ else
+ {
+ Collection<TrustManager> trustManagerList = new ArrayList<>();
+ final QpidMultipleTrustManager mulTrustManager = new QpidMultipleTrustManager();
+
+ for(TrustStore ts : trustStores)
+ {
+ TrustManager[] managers = ts.getTrustManagers();
+ if(managers != null)
+ {
+ for(TrustManager manager : managers)
+ {
+ if(manager instanceof X509TrustManager)
+ {
+ mulTrustManager.addTrustManager((X509TrustManager)manager);
+ }
+ else
+ {
+ trustManagerList.add(manager);
+ }
+ }
+ }
+ }
+ if(!mulTrustManager.isEmpty())
+ {
+ trustManagerList.add(mulTrustManager);
+ }
+ trustManagers = trustManagerList.toArray(new TrustManager[trustManagerList.size()]);
+ }
+ sslContext.init(keyManagers, trustManagers, null);
+
+ return sslContext;
+
+ }
+ catch (GeneralSecurityException e)
+ {
+ throw new IllegalArgumentException("Unable to create SSLContext for key or trust store", e);
+ }
+ }
+
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingOutboundConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.Ticker;
+
+public interface SchedulableConnection extends ServerNetworkConnection, ByteBufferSender
+{
+ String getThreadName();
+
+ boolean isPartialRead();
+
+ Ticker getTicker();
+
+ SocketChannel getSocketChannel();
+
+ boolean wantsRead();
+
+ boolean wantsWrite();
+
+ boolean wantsConnect();
+
+ boolean isStateChanged();
+
+ void doPreWork();
+
+ boolean doWork();
+
+ NetworkConnectionScheduler getScheduler();
+
+ boolean setScheduled();
+
+ void clearScheduled();
+
+ SelectorThread.SelectionTask getSelectionTask();
+
+ void setSelectionTask(SelectorThread.SelectionTask selectionTask);
+
+ void processAmqpData(QpidByteBuffer applicationData);
+
+ long writeToTransport(Collection<QpidByteBuffer> bufferArray) throws IOException;
+
+ void reportUnexpectedByteBufferSizeUsage();
+}
Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulableConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Fri Oct 21 09:32:07 2016
@@ -69,10 +69,10 @@ class SelectorThread extends Thread
* Queue of connections that are not currently scheduled and not registered with the selector.
* These need to go back into the Selector.
*/
- private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+ private final Queue<SchedulableConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
/** Set of connections that are currently being selected upon */
- private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+ private final Set<SchedulableConnection> _unscheduledConnections = new HashSet<>();
@@ -102,17 +102,17 @@ class SelectorThread extends Thread
return _selector;
}
- public Queue<NonBlockingConnection> getUnregisteredConnections()
+ public Queue<SchedulableConnection> getUnregisteredConnections()
{
return _unregisteredConnections;
}
- public Set<NonBlockingConnection> getUnscheduledConnections()
+ public Set<SchedulableConnection> getUnscheduledConnections()
{
return _unscheduledConnections;
}
- private List<NonBlockingConnection> processUnscheduledConnections()
+ private List<SchedulableConnection> processUnscheduledConnections()
{
_nextTimeout = Integer.MAX_VALUE;
if (getUnscheduledConnections().isEmpty())
@@ -120,13 +120,13 @@ class SelectorThread extends Thread
return Collections.emptyList();
}
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+ List<SchedulableConnection> toBeScheduled = new ArrayList<>();
long currentTime = System.currentTimeMillis();
- Iterator<NonBlockingConnection> iterator = getUnscheduledConnections().iterator();
+ Iterator<SchedulableConnection> iterator = getUnscheduledConnections().iterator();
while (iterator.hasNext())
{
- NonBlockingConnection connection = iterator.next();
+ SchedulableConnection connection = iterator.next();
int period = connection.getTicker().getTimeToNextTick(currentTime);
@@ -153,7 +153,7 @@ class SelectorThread extends Thread
return toBeScheduled;
}
- private List<NonBlockingConnection> processSelectionKeys()
+ private List<SchedulableConnection> processSelectionKeys()
{
Set<SelectionKey> selectionKeys = _selector.selectedKeys();
if (selectionKeys.isEmpty())
@@ -161,7 +161,7 @@ class SelectorThread extends Thread
return Collections.emptyList();
}
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+ List<SchedulableConnection> toBeScheduled = new ArrayList<>();
for (SelectionKey key : selectionKeys)
{
if(key.isAcceptable())
@@ -212,7 +212,7 @@ class SelectorThread extends Thread
}
else
{
- NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+ SchedulableConnection connection = (SchedulableConnection) key.attachment();
if(connection != null)
{
try
@@ -235,15 +235,15 @@ class SelectorThread extends Thread
return toBeScheduled;
}
- private List<NonBlockingConnection> reregisterUnregisteredConnections()
+ private List<SchedulableConnection> reregisterUnregisteredConnections()
{
if (getUnregisteredConnections().isEmpty())
{
return Collections.emptyList();
}
- List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
+ List<SchedulableConnection> unregisterableConnections = new ArrayList<>();
- NonBlockingConnection unregisteredConnection;
+ SchedulableConnection unregisteredConnection;
while ((unregisteredConnection = getUnregisteredConnections().poll()) != null)
{
getUnscheduledConnections().add(unregisteredConnection);
@@ -302,21 +302,21 @@ class SelectorThread extends Thread
{
_inSelect.set(false);
}
- for (NonBlockingConnection connection : processSelectionKeys())
+ for (SchedulableConnection connection : processSelectionKeys())
{
if (connection.setScheduled())
{
connections.add(new ConnectionProcessor(_scheduler, connection));
}
}
- for (NonBlockingConnection connection : reregisterUnregisteredConnections())
+ for (SchedulableConnection connection : reregisterUnregisteredConnections())
{
if (connection.setScheduled())
{
connections.add(new ConnectionProcessor(_scheduler, connection));
}
}
- for (NonBlockingConnection connection : processUnscheduledConnections())
+ for (SchedulableConnection connection : processUnscheduledConnections())
{
if (connection.setScheduled())
{
@@ -474,10 +474,10 @@ class SelectorThread extends Thread
{
private final NetworkConnectionScheduler _scheduler;
- private final NonBlockingConnection _connection;
+ private final SchedulableConnection _connection;
private AtomicBoolean _running = new AtomicBoolean();
- public ConnectionProcessor(final NetworkConnectionScheduler scheduler, final NonBlockingConnection connection)
+ public ConnectionProcessor(final NetworkConnectionScheduler scheduler, final SchedulableConnection connection)
{
_scheduler = scheduler;
_connection = connection;
@@ -506,7 +506,7 @@ class SelectorThread extends Thread
}
}
- private void unregisterConnection(final NonBlockingConnection connection) throws ClosedChannelException
+ private void unregisterConnection(final SchedulableConnection connection) throws ClosedChannelException
{
SelectionKey register = connection.getSocketChannel().register(connection.getSelectionTask().getSelector(), 0);
register.cancel();
@@ -521,14 +521,15 @@ class SelectorThread extends Thread
}
}
- private boolean selectionInterestRequiresUpdate(NonBlockingConnection connection)
+ private boolean selectionInterestRequiresUpdate(SchedulableConnection connection)
{
SelectionTask selectionTask = connection.getSelectionTask();
if(selectionTask != null)
{
final SelectionKey selectionKey = connection.getSocketChannel().keyFor(selectionTask.getSelector());
int expectedOps = (connection.wantsRead() ? SelectionKey.OP_READ : 0)
- | (connection.wantsWrite() ? SelectionKey.OP_WRITE : 0);
+ | (connection.wantsWrite() ? SelectionKey.OP_WRITE : 0)
+ | (connection.wantsConnect() ? SelectionKey.OP_CONNECT : 0);
try
{
@@ -545,7 +546,7 @@ class SelectorThread extends Thread
}
}
- public void addConnection(final NonBlockingConnection connection)
+ public void addConnection(final SchedulableConnection connection)
{
if(selectionInterestRequiresUpdate(connection))
{
@@ -557,7 +558,7 @@ class SelectorThread extends Thread
}
- public void returnConnectionToSelector(final NonBlockingConnection connection)
+ public void returnConnectionToSelector(final SchedulableConnection connection)
{
if(selectionInterestRequiresUpdate(connection))
{
@@ -583,7 +584,7 @@ class SelectorThread extends Thread
return _selectionTasks[index];
}
- void removeConnection(NonBlockingConnection connection)
+ void removeConnection(SchedulableConnection connection)
{
try
{
@@ -631,7 +632,7 @@ class SelectorThread extends Thread
}
- public void addToWork(final NonBlockingConnection connection)
+ public void addToWork(final SchedulableConnection connection)
{
if (_closed.get())
{
Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Fri Oct 21 09:32:07 2016
@@ -29,12 +29,14 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.ConsumerOption;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
@@ -48,12 +50,12 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
-public abstract class AbstractSystemMessageSource implements MessageSource
+public abstract class AbstractSystemMessageSource implements MessageSource<AbstractSystemMessageSource.SystemMessageSourceConsumer>
{
- protected final UUID _id;
- protected final String _name;
- protected final NamedAddressSpace _addressSpace;
- private List<Consumer> _consumers = new CopyOnWriteArrayList<>();
+ private final UUID _id;
+ private final String _name;
+ private final NamedAddressSpace _addressSpace;
+ private List<SystemMessageSourceConsumer> _consumers = new CopyOnWriteArrayList<>();
public AbstractSystemMessageSource(String name, final NamedAddressSpace addressSpace)
{
@@ -82,22 +84,22 @@ public abstract class AbstractSystemMess
}
@Override
- public Consumer addConsumer(final ConsumerTarget target,
- final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName,
- final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+ public SystemMessageSourceConsumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ final EnumSet<ConsumerOption> options, final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused
{
- final Consumer consumer = new Consumer(consumerName, target);
+ final SystemMessageSourceConsumer consumer = new SystemMessageSourceConsumer(consumerName, target);
target.consumerAdded(consumer);
_consumers.add(consumer);
return consumer;
}
@Override
- public Collection<Consumer> getConsumers()
+ public Collection<SystemMessageSourceConsumer> getConsumers()
{
return new ArrayList<>(_consumers);
}
@@ -108,19 +110,23 @@ public abstract class AbstractSystemMess
return true;
}
- protected class Consumer implements ConsumerImpl
+ public NamedAddressSpace getAddressSpace()
+ {
+ return _addressSpace;
+ }
+
+ protected class SystemMessageSourceConsumer implements MessageInstanceConsumer
{
- private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
private final List<PropertiesMessageInstance> _queue =
Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
private final ConsumerTarget _target;
private final String _name;
private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener =
- new Consumer.TargetChangeListener();
-
+ new SystemMessageSourceConsumer.TargetChangeListener();
+ private final Object _identifier = new Object();
- public Consumer(final String consumerName, ConsumerTarget target)
+ SystemMessageSourceConsumer(final String consumerName, ConsumerTarget target)
{
_name = consumerName;
_target = target;
@@ -133,61 +139,22 @@ public abstract class AbstractSystemMess
}
- @Override
- public ConsumerTarget getTarget()
- {
- return _target;
- }
-
- @Override
- public long getBytesOut()
- {
- return 0;
- }
-
- @Override
- public long getMessagesOut()
- {
- return 0;
- }
-
- @Override
- public long getUnacknowledgedBytes()
+ public Object getIdentifier()
{
- return 0;
+ return _identifier;
}
- @Override
- public long getUnacknowledgedMessages()
+ public ConsumerTarget getTarget()
{
- return 0;
+ return _target;
}
- @Override
public AMQSessionModel getSessionModel()
{
return _target.getSessionModel();
}
@Override
- public MessageSource getMessageSource()
- {
- return AbstractSystemMessageSource.this;
- }
-
- @Override
- public long getConsumerNumber()
- {
- return _id;
- }
-
- @Override
- public boolean isSuspended()
- {
- return false;
- }
-
- @Override
public boolean isClosed()
{
return false;
@@ -200,42 +167,12 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean seesRequeues()
- {
- return false;
- }
-
- @Override
public void close()
{
_consumers.remove(this);
}
@Override
- public boolean trySendLock()
- {
- return _target.trySendLock();
- }
-
- @Override
- public void getSendLock()
- {
- _target.getSendLock();
- }
-
- @Override
- public void releaseSendLock()
- {
- _target.releaseSendLock();
- }
-
- @Override
- public boolean isActive()
- {
- return false;
- }
-
- @Override
public String getName()
{
return _name;
@@ -322,16 +259,15 @@ public abstract class AbstractSystemMess
}
- class PropertiesMessageInstance implements MessageInstance
+ private class PropertiesMessageInstance implements MessageInstance
{
- private final Consumer _consumer;
+ private final SystemMessageSourceConsumer _consumer;
private int _deliveryCount;
private boolean _isRedelivered;
- private boolean _isDelivered;
private boolean _isDeleted;
private InternalMessage _message;
- PropertiesMessageInstance(final Consumer consumer, final InternalMessage message)
+ PropertiesMessageInstance(final SystemMessageSourceConsumer consumer, final InternalMessage message)
{
_consumer = consumer;
_message = message;
@@ -340,7 +276,7 @@ public abstract class AbstractSystemMess
@Override
public int getDeliveryCount()
{
- return 0;
+ return _deliveryCount;
}
@Override
@@ -375,7 +311,7 @@ public abstract class AbstractSystemMess
}
@Override
- public ConsumerImpl getAcquiringConsumer()
+ public MessageInstanceConsumer getAcquiringConsumer()
{
return _consumer;
}
@@ -387,13 +323,13 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean isAcquiredBy(final ConsumerImpl consumer)
+ public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
{
return consumer == _consumer && !isDeleted();
}
@Override
- public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
{
return consumer == _consumer;
}
@@ -411,7 +347,7 @@ public abstract class AbstractSystemMess
}
@Override
- public Consumer getDeliveredConsumer()
+ public SystemMessageSourceConsumer getDeliveredConsumer()
{
return isDeleted() ? null : _consumer;
}
@@ -423,7 +359,7 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean isRejectedBy(final ConsumerImpl consumer)
+ public boolean isRejectedBy(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -431,7 +367,7 @@ public abstract class AbstractSystemMess
@Override
public boolean getDeliveredToConsumer()
{
- return _isDelivered;
+ return _deliveryCount > 0;
}
@Override
@@ -441,13 +377,13 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean acquire(final ConsumerImpl sub)
+ public boolean acquire(final MessageInstanceConsumer sub)
{
return false;
}
@Override
- public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -465,7 +401,7 @@ public abstract class AbstractSystemMess
}
@Override
- public int routeToAlternate(final Action<? super MessageInstance> action,
+ public int routeToAlternate(final Action<? super BaseMessageInstance> action,
final ServerTransaction txn)
{
return 0;
@@ -503,7 +439,7 @@ public abstract class AbstractSystemMess
}
@Override
- public void release(ConsumerImpl consumer)
+ public void release(MessageInstanceConsumer consumer)
{
release();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org