You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/16 17:52:33 UTC
svn commit: r1667068 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/
broker-core/src/main/java/org/apache/qpid/server/transport/ broker-core/...
Author: kwall
Date: Mon Mar 16 16:52:32 2015
New Revision: 1667068
URL: http://svn.apache.org/r1667068
Log:
QPID-6429, QPID-6262: [Java Broker] Improve error handling in new NIO code; Remove MINA terminlogy (session etc) in 0-8 stack
* Also added uncaught exception handler in test framework (QBTC) to guard log the case where a thread exits abnormally.
Added:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Mon Mar 16 16:52:32 2015
@@ -69,8 +69,6 @@ public interface AMQConnectionModel<T ex
*/
public LogSubject getLogSubject();
- public boolean isSessionNameUnique(byte[] name);
-
String getRemoteAddressString();
SocketAddress getRemoteAddress();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java Mon Mar 16 16:52:32 2015
@@ -92,7 +92,6 @@ public class KerberosAuthenticationManag
}
catch (SaslException e)
{
- e.printStackTrace(System.err);
return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
}
}
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1667068&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Mon Mar 16 16:52:32 2015
@@ -0,0 +1,114 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class NetworkConnectionScheduler
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
+
+ private final SelectorThread _selectorThread;
+ private final ScheduledThreadPoolExecutor _executor;
+ private final AtomicInteger _running = new AtomicInteger();
+ private final int _poolSize;
+
+ NetworkConnectionScheduler(final SelectorThread selectorThread)
+ {
+ _selectorThread = selectorThread;
+ _poolSize = Runtime.getRuntime().availableProcessors();
+ _executor = new ScheduledThreadPoolExecutor(_poolSize);
+ _executor.prestartAllCoreThreads();
+ }
+
+ public void schedule(final NonBlockingConnection connection)
+ {
+ _executor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ String currentName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(
+ SelectorThread.IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString());
+ processConnection(connection);
+ }
+ finally
+ {
+ Thread.currentThread().setName(currentName);
+ }
+ }
+ });
+ }
+
+ private void processConnection(final NonBlockingConnection connection)
+ {
+ try
+ {
+ _running.incrementAndGet();
+ boolean rerun;
+ do
+ {
+ rerun = false;
+ boolean closed = connection.doWork();
+
+ if (!closed)
+ {
+
+ if (connection.isStateChanged())
+ {
+ if (_running.get() == _poolSize)
+ {
+ schedule(connection);
+ }
+ else
+ {
+ rerun = true;
+ }
+ }
+ else
+ {
+ _selectorThread.addConnection(connection);
+ }
+ }
+
+ } while (rerun);
+ }
+ finally
+ {
+ _running.decrementAndGet();
+ }
+ }
+
+ public void close()
+ {
+ _executor.shutdown();
+ }
+
+
+
+}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Mon Mar 16 16:52:32 2015
@@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
@@ -55,18 +55,12 @@ import org.apache.qpid.util.SystemUtils;
public class NonBlockingConnection implements NetworkConnection, ByteBufferSender
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+ private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+
private final SocketChannel _socketChannel;
- private final long _timeout;
private final Ticker _ticker;
+ private final Object _peerPrincipalLock = new Object();
private final SelectorThread _selector;
- private int _maxReadIdle;
- private int _maxWriteIdle;
- private Principal _principal;
- private boolean _principalChecked;
- private final Object _lock = new Object();
-
- public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
-
private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
@@ -74,9 +68,14 @@ public class NonBlockingConnection imple
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final ServerProtocolEngine _protocolEngine;
private final int _receiveBufSize;
- private final Set<TransportEncryption> _encryptionSet;
- private final SSLContext _sslContext;
private final Runnable _onTransportEncryptionAction;
+
+
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
+ private Principal _principal;
+ private boolean _principalChecked;
+
private ByteBuffer _netInputBuffer;
private SSLEngine _sslEngine;
@@ -90,9 +89,7 @@ public class NonBlockingConnection imple
public NonBlockingConnection(SocketChannel socketChannel,
ServerProtocolEngine delegate,
- int sendBufferSize,
int receiveBufferSize,
- long timeout,
Ticker ticker,
final Set<TransportEncryption> encryptionSet,
final SSLContext sslContext,
@@ -104,14 +101,11 @@ public class NonBlockingConnection imple
final SelectorThread selectorThread)
{
_socketChannel = socketChannel;
- _timeout = timeout;
_ticker = ticker;
_selector = selectorThread;
_protocolEngine = delegate;
_receiveBufSize = receiveBufferSize;
- _encryptionSet = encryptionSet;
- _sslContext = sslContext;
_onTransportEncryptionAction = onTransportEncryptionAction;
delegate.setWorkListener(new Action<ServerProtocolEngine>()
@@ -125,7 +119,7 @@ public class NonBlockingConnection imple
if(encryptionSet.size() == 1)
{
- _transportEncryption = _encryptionSet.iterator().next();
+ _transportEncryption = encryptionSet.iterator().next();
if (_transportEncryption == TransportEncryption.TLS)
{
onTransportEncryptionAction.run();
@@ -134,7 +128,7 @@ public class NonBlockingConnection imple
if(encryptionSet.contains(TransportEncryption.TLS))
{
- _sslEngine = _sslContext.createSSLEngine();
+ _sslEngine = sslContext.createSSLEngine();
_sslEngine.setUseClientMode(false);
SSLUtil.removeSSLv3Support(_sslEngine);
SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites);
@@ -150,26 +144,16 @@ public class NonBlockingConnection imple
_netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
}
- try
- {
- _remoteSocketAddress = _socketChannel.getRemoteAddress().toString();
- _socketChannel.configureBlocking(false);
- }
- catch (IOException e)
- {
- throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
- }
-
-
+ _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
}
- public Ticker getTicker()
+ Ticker getTicker()
{
return _ticker;
}
- public SocketChannel getSocketChannel()
+ SocketChannel getSocketChannel()
{
return _socketChannel;
}
@@ -189,7 +173,7 @@ public class NonBlockingConnection imple
if(_closed.compareAndSet(false,true))
{
_protocolEngine.notifyWork();
- getSelector().wakeup();
+ _selector.wakeup();
}
}
@@ -216,7 +200,7 @@ public class NonBlockingConnection imple
@Override
public Principal getPeerPrincipal()
{
- synchronized (_lock)
+ synchronized (_peerPrincipalLock)
{
if(!_principalChecked)
{
@@ -301,7 +285,7 @@ public class NonBlockingConnection imple
// tell all consumer targets that it is okay to accept more
_protocolEngine.setMessageAssignmentSuspended(false);
}
- catch (IOException e)
+ catch (IOException | ConnectionScopedRuntimeException e)
{
LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
LOGGER.debug("Closing " + _remoteSocketAddress);
@@ -359,22 +343,7 @@ public class NonBlockingConnection imple
}
- public SelectorThread getSelector()
- {
- return _selector;
- }
-
- public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
- {
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
- }
-
- public boolean doRead() throws IOException
+ private boolean doRead() throws IOException
{
boolean readData = false;
if(_transportEncryption == TransportEncryption.NONE)
@@ -496,7 +465,7 @@ public class NonBlockingConnection imple
return readData;
}
- public boolean doWrite() throws IOException
+ private boolean doWrite() throws IOException
{
ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
@@ -589,18 +558,7 @@ public class NonBlockingConnection imple
}
}
- public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
- {
- return headerBytes[0] == 22 && // SSL Handshake
- (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[2] == 0 || // SSL 3.0
- headerBytes[2] == 1 || // TLS 1.0
- headerBytes[2] == 2 || // TLS 1.1
- headerBytes[2] == 3)) && // TLS1.2
- (headerBytes[5] == 1); // client_hello
- }
-
- public boolean runSSLEngineTasks(final SSLEngineResult status)
+ private boolean runSSLEngineTasks(final SSLEngineResult status)
{
if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
{
@@ -614,15 +572,11 @@ public class NonBlockingConnection imple
return false;
}
- public boolean looksLikeSSL(final byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
- }
-
@Override
public void send(final ByteBuffer msg)
{
- assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName();
+ assert _selector.isIOThread() : "Send called by unexpected thread " + Thread.currentThread().getName();
+
if (_closed.get())
{
@@ -631,7 +585,7 @@ public class NonBlockingConnection imple
else
{
_buffers.add(msg);
- _protocolEngine.notifyWork();
+ _protocolEngine.notifyWork(); // TODO now redundant
}
}
@@ -639,4 +593,36 @@ public class NonBlockingConnection imple
public void flush()
{
}
+
+ @Override
+ public String toString()
+ {
+ return "[NonBlockingConnection " + _remoteSocketAddress + "]";
+ }
+
+ private boolean looksLikeSSL(final byte[] headerBytes)
+ {
+ return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+ }
+
+ private boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == 22 && // SSL Handshake
+ (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[2] == 0 || // SSL 3.0
+ headerBytes[2] == 1 || // TLS 1.0
+ headerBytes[2] == 2 || // TLS 1.1
+ headerBytes[2] == 3)) && // TLS1.2
+ (headerBytes[5] == 1); // client_hello
+ }
+
+ private boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == -128 &&
+ headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[4] == 0 || // SSL 3.0
+ headerBytes[4] == 1 || // TLS 1.0
+ headerBytes[4] == 2 || // TLS 1.1
+ headerBytes[4] == 3);
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Mon Mar 16 16:52:32 2015
@@ -25,14 +25,17 @@ import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.EnumSet;
import java.util.Set;
import javax.net.ssl.SSLContext;
+import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
@@ -43,51 +46,24 @@ import org.apache.qpid.transport.network
public class NonBlockingNetworkTransport
{
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
- private SelectorThread _selector;
-
-
- private Set<TransportEncryption> _encryptionSet;
- private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocketChannel _serverSocket;
- private int _timeout;
-
- public void close()
- {
- if(_selector != null)
- {
- try
- {
- if (_serverSocket != null)
- {
- _selector.cancelAcceptingSocket(_serverSocket);
- _serverSocket.close();
- }
- }
- catch (IOException e)
- {
- // TODO
- e.printStackTrace();
- }
- finally
- {
+ private final Set<TransportEncryption> _encryptionSet;
+ private final NetworkTransportConfiguration _config;
+ private final ProtocolEngineFactory _factory;
+ private final SSLContext _sslContext;
+ private final ServerSocketChannel _serverSocket;
+ private final int _timeout;
- _selector.close();
- }
- }
- }
+ private SelectorThread _selector;
- public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext,
- final Set<TransportEncryption> encryptionSet)
+ public NonBlockingNetworkTransport(final NetworkTransportConfiguration config,
+ final MultiVersionProtocolEngineFactory factory,
+ final SSLContext sslContext,
+ final EnumSet<TransportEncryption> encryptionSet)
{
try
{
@@ -106,80 +82,138 @@ public class NonBlockingNetworkTransport
_serverSocket.configureBlocking(false);
_encryptionSet = encryptionSet;
- _selector = new SelectorThread(config.getAddress().toString(), this);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+
+ }
+
+ public void start()
+ {
+ try
+ {
+ _selector = new SelectorThread(this);
_selector.start();
_selector.addAcceptingSocket(_serverSocket);
}
catch (IOException e)
{
- throw new TransportException("Failed to start AMQP on port : " + config, e);
+ throw new TransportException("Failed to start", e);
}
+ }
+ public void close()
+ {
+ if(_selector != null)
+ {
+ _selector.cancelAcceptingSocket(_serverSocket);
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Error closing the server socket for : " + _config.getAddress().toString(), e);
+ }
+ finally
+ {
+ _selector.close();
+ _selector = null;
+ }
+ }
}
public int getAcceptingPort()
{
- return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort();
+ return _serverSocket.socket().getLocalPort();
}
- public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
+ public NetworkTransportConfiguration getConfig()
{
- final ServerProtocolEngine engine =
- (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
- .getRemoteSocketAddress());
+ return _config;
+ }
- if(engine != null)
+ void acceptSocketChannel(final ServerSocketChannel serverSocketChannel)
+ {
+ SocketChannel socketChannel = null;
+ boolean success = false;
+ try
{
- socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
- socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
+ socketChannel = serverSocketChannel.accept();
- socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
- socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
+ .getRemoteSocketAddress());
+ if(engine != null)
+ {
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+ final int sendBufferSize = _config.getSendBufferSize();
+ final int receiveBufferSize = _config.getReceiveBufferSize();
- NonBlockingConnection connection =
- new NonBlockingConnection(socketChannel,
- engine,
- sendBufferSize,
- receiveBufferSize,
- _timeout,
- ticker,
- _encryptionSet,
- _sslContext,
- _config.wantClientAuth(),
- _config.needClientAuth(),
- _config.getEnabledCipherSuites(),
- _config.getDisabledCipherSuites(),
- new Runnable()
- {
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
- @Override
- public void run()
+ socketChannel.configureBlocking(false);
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, _timeout);
+
+ NonBlockingConnection connection =
+ new NonBlockingConnection(socketChannel,
+ engine,
+ receiveBufferSize,
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth(),
+ _config.getEnabledCipherSuites(),
+ _config.getDisabledCipherSuites(),
+ new Runnable()
{
- engine.encryptedTransport();
- }
- },
- _selector);
- engine.setNetworkConnection(connection, connection.getSender());
- connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ },
+ _selector);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
- ticker.setConnection(connection);
+ ticker.setConnection(connection);
- connection.start();
+ connection.start();
- _selector.addConnection(connection);
+ _selector.addConnection(connection);
+ success = true;
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.error("Failed to process incoming socket", e);
}
- else
+ finally
{
- socketChannel.close();
+ if (!success && socketChannel != null)
+ {
+ try
+ {
+ socketChannel.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Failed to close socket " + socketChannel, e);
+ }
+ }
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Mon Mar 16 16:52:32 2015
@@ -32,43 +32,40 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.thread.LoggingUncaughtExceptionHandler;
-
public class SelectorThread extends Thread
{
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
- public static final String IO_THREAD_NAME_PREFIX = "NCS-";
+ static final String IO_THREAD_NAME_PREFIX = "IO-";
private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Queue of connections that are not currently scheduled and not registered with the selector.
+ * These need to go back into the Selector.
+ */
private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+
+ /** Set of connections that are currently being selected upon */
private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+
private final Selector _selector;
private final AtomicBoolean _closed = new AtomicBoolean();
- private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
+ private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(this);
private final NonBlockingNetworkTransport _transport;
+ private long _nextTimeout;
- SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport)
+ SelectorThread(final NonBlockingNetworkTransport nonBlockingNetworkTransport) throws IOException
{
- super("SelectorThread-"+name);
+ super("SelectorThread-" + nonBlockingNetworkTransport.getConfig().getAddress().toString());
+
_transport = nonBlockingNetworkTransport;
- try
- {
- _selector = Selector.open();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
+ _selector = Selector.open();
}
public void addAcceptingSocket(final ServerSocketChannel socketChannel)
@@ -83,10 +80,10 @@ public class SelectorThread extends Thre
{
socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
}
- catch (ClosedChannelException e)
+ catch (IllegalStateException | ClosedChannelException e)
{
- // TODO
- e.printStackTrace();
+ // TODO Communicate condition back to model object to make it go into the ERROR state
+ LOGGER.error("Failed to register selector on accepting port", e);
}
}
});
@@ -114,91 +111,38 @@ public class SelectorThread extends Thre
public void run()
{
- long nextTimeout = 0;
+ _nextTimeout = 0;
try
{
while (!_closed.get())
{
- _selector.select(nextTimeout);
-
- while(_tasks.peek() != null)
+ try
{
- Runnable task = _tasks.poll();
- task.run();
+ _selector.select(_nextTimeout);
}
-
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
-
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- for (SelectionKey key : selectionKeys)
+ catch (IOException e)
{
- if(key.isAcceptable())
- {
- // todo - should we schedule this rather than running in this thread?
- SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept();
- _transport.acceptSocketChannel(acceptedChannel);
- }
- else
- {
- NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
-
- key.channel().register(_selector, 0);
-
- toBeScheduled.add(connection);
- _unscheduledConnections.remove(connection);
- }
-
+ // TODO Inform the model object
+ LOGGER.error("Failed to select for " + _transport.getConfig().getAddress().toString(),e );
+ break;
}
- selectionKeys.clear();
-
- while (_unregisteredConnections.peek() != null)
- {
- NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
- _unscheduledConnections.add(unregisteredConnection);
+ runTasks();
- final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
- | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
- unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+ List<NonBlockingConnection> toBeScheduled = processSelectionKeys();
- }
-
- long currentTime = System.currentTimeMillis();
- Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
- nextTimeout = Integer.MAX_VALUE;
- while (iterator.hasNext())
- {
- NonBlockingConnection connection = iterator.next();
-
- int period = connection.getTicker().getTimeToNextTick(currentTime);
+ toBeScheduled.addAll(reregisterUnregisteredConnections());
- if (period <= 0 || connection.isStateChanged())
- {
- toBeScheduled.add(connection);
- connection.getSocketChannel().register(_selector, 0).cancel();
- iterator.remove();
- }
- else
- {
- nextTimeout = Math.min(period, nextTimeout);
- }
- }
+ toBeScheduled.addAll(processUnscheduledConnections());
for (NonBlockingConnection connection : toBeScheduled)
{
_scheduler.schedule(connection);
}
-
}
}
- catch (IOException e)
- {
- //TODO
- e.printStackTrace();
- }
finally
{
try
@@ -207,114 +151,144 @@ public class SelectorThread extends Thre
}
catch (IOException e)
{
- e.printStackTrace();
+ LOGGER.debug("Failed to close selector", e);
}
}
-
-
-
}
- public void addConnection(final NonBlockingConnection connection)
+ private List<NonBlockingConnection> processUnscheduledConnections()
{
- _unregisteredConnections.add(connection);
- _selector.wakeup();
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
- }
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+ _nextTimeout = Integer.MAX_VALUE;
+ while (iterator.hasNext())
+ {
+ NonBlockingConnection connection = iterator.next();
- public void wakeup()
- {
- _selector.wakeup();
- }
+ int period = connection.getTicker().getTimeToNextTick(currentTime);
- public void close()
- {
- _closed.set(true);
- _selector.wakeup();
- _scheduler.close();
+ if (period <= 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ try
+ {
+ LOGGER.debug("KWDEBUG# Setting interest to zero (PUC) " + connection);
+
+ SelectionKey register = connection.getSocketChannel().register(_selector, 0);
+ register.cancel();
+ }
+ catch (ClosedChannelException e)
+ {
+ LOGGER.debug("Failed to register with selector for connection " + connection +
+ ". Connection is probably being closed by peer.", e);
+ }
+ iterator.remove();
+ }
+ else
+ {
+ _nextTimeout = Math.min(period, _nextTimeout);
+ }
+ }
+
+ return toBeScheduled;
}
- private class NetworkConnectionScheduler
+ private List<NonBlockingConnection> reregisterUnregisteredConnections()
{
- private final ScheduledThreadPoolExecutor _executor;
- private final AtomicInteger _running = new AtomicInteger();
- private final int _poolSize;
+ List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
- private NetworkConnectionScheduler()
+ while (_unregisteredConnections.peek() != null)
{
- _poolSize = Runtime.getRuntime().availableProcessors();
- _executor = new ScheduledThreadPoolExecutor(_poolSize);
- _executor.prestartAllCoreThreads();
- }
+ NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+ _unscheduledConnections.add(unregisteredConnection);
- public void processConnection(final NonBlockingConnection connection)
- {
+
+ final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+ | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
try
{
- _running.incrementAndGet();
- boolean rerun;
- do
- {
- rerun = false;
- boolean closed = connection.doWork();
+ LOGGER.debug("KWDEBUG# Registering " + unregisteredConnection);
+ unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+ }
+ catch (ClosedChannelException e)
+ {
+ unregisterableConnections.add(unregisteredConnection);
+ }
+ }
- if (!closed)
- {
+ return unregisterableConnections;
+ }
- if (connection.isStateChanged())
- {
- if (_running.get() == _poolSize)
- {
- schedule(connection);
- }
- else
- {
- rerun = true;
- }
- }
- else
- {
- SelectorThread.this.addConnection(connection);
- }
- }
+ private List<NonBlockingConnection> processSelectionKeys()
+ {
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
- } while (rerun);
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ if(key.isAcceptable())
+ {
+ // todo - should we schedule this rather than running in this thread?
+ _transport.acceptSocketChannel((ServerSocketChannel)key.channel());
}
- finally
+ else
{
- _running.decrementAndGet();
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+
+ try
+ {
+ LOGGER.debug("KWDEBUG# Setting interest to zero (PSK)" + connection);
+
+ key.channel().register(_selector, 0);
+ }
+ catch (ClosedChannelException e)
+ {
+ // Ignore - we will schedule the connection anyway
+ }
+
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
}
- }
- public void schedule(final NonBlockingConnection connection)
- {
- _executor.submit(new Runnable()
- {
- @Override
- public void run()
- {
- String currentName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName(
- IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString());
- processConnection(connection);
- }
- finally
- {
- Thread.currentThread().setName(currentName);
- }
- }
- });
}
+ selectionKeys.clear();
+
+ return toBeScheduled;
+ }
- public void close()
+ private void runTasks()
+ {
+ while(_tasks.peek() != null)
{
- _executor.shutdown();
+ Runnable task = _tasks.poll();
+ task.run();
}
+ }
+
+ public void addConnection(final NonBlockingConnection connection)
+ {
+ _unregisteredConnections.add(connection);
+ _selector.wakeup();
+
+ }
+ public void wakeup()
+ {
+ _selector.wakeup();
+ }
+ public void close()
+ {
+ _closed.set(true);
+ _selector.wakeup();
+ _scheduler.close();
+ }
+ boolean isIOThread()
+ {
+ return Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Mon Mar 16 16:52:32 2015
@@ -79,7 +79,8 @@ class TCPandSSLTransport implements Acce
}
final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
- _networkTransport = new NonBlockingNetworkTransport();
+
+
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(
_port.getParent(Broker.class),
@@ -97,7 +98,9 @@ class TCPandSSLTransport implements Acce
{
encryptionSet.add(TransportEncryption.TLS);
}
- _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet);
+
+ _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory, _sslContext, encryptionSet);
+ _networkTransport.start();
}
public int getAcceptingPort()
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Mon Mar 16 16:52:32 2015
@@ -592,12 +592,6 @@ public class MockConsumer implements Con
}
@Override
- public boolean isSessionNameUnique(byte[] name)
- {
- return false;
- }
-
- @Override
public String getRemoteAddressString()
{
return "remoteAddress:1234";
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Mar 16 16:52:32 2015
@@ -540,11 +540,6 @@ public class ServerConnection extends Co
return _connectionId;
}
- public boolean isSessionNameUnique(byte[] name)
- {
- return !super.hasSessionWithName(name);
- }
-
public String getRemoteAddressString()
{
return String.valueOf(getRemoteAddress());
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Mar 16 16:52:32 2015
@@ -371,12 +371,17 @@ public class ServerConnectionDelegate ex
while(connections.hasNext())
{
final AMQConnectionModel amqConnectionModel = connections.next();
- final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
- ? ""
- : amqConnectionModel.getAuthorizedPrincipal().getName();
- if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name))
+ if (amqConnectionModel instanceof ServerConnection)
{
- return false;
+ ServerConnection otherConnection = (ServerConnection)amqConnectionModel;
+
+ final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
+ ? ""
+ : amqConnectionModel.getAuthorizedPrincipal().getName();
+ if (userId.equals(userName) && otherConnection.hasSessionWithName(name))
+ {
+ return false;
+ }
}
}
return true;
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Mar 16 16:52:32 2015
@@ -857,9 +857,6 @@ public class AMQChannel
return false;
}
- /**
- * Called from the protocol session to close this channel and clean up. T
- */
@Override
public void close()
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Mar 16 16:52:32 2015
@@ -93,7 +93,6 @@ import org.apache.qpid.transport.SenderC
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.util.BytesDataOutput;
public class AMQProtocolEngine implements ServerProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
@@ -117,9 +116,8 @@ public class AMQProtocolEngine implement
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
- private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
- public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
- public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
+ private static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
+ private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
private static final long AWAIT_CLOSED_TIMEOUT = 60000;
private final AmqpPort<?> _port;
private final long _creationTime;
@@ -156,10 +154,8 @@ public class AMQProtocolEngine implement
private volatile boolean _closed;
- // maximum number of channels this session should have
private long _maxNoOfChannels;
- /* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList =
@@ -172,7 +168,7 @@ public class AMQProtocolEngine implement
private ProtocolOutputConverter _protocolOutputConverter;
private final Subject _authorizedSubject = new Subject();
- private final long _connectionID;
+ private final long _connectionId;
private Object _reference = new Object();
private LogSubject _logSubject;
@@ -190,7 +186,7 @@ public class AMQProtocolEngine implement
private ByteBufferSender _sender;
private volatile boolean _deferFlush;
- private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want?
+ private long _lastReceivedTime = System.currentTimeMillis();
private boolean _blocking;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
@@ -247,7 +243,7 @@ public class AMQProtocolEngine implement
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_decoder = new BrokerDecoder(this);
- _connectionID = connectionId;
+ _connectionId = connectionId;
_logSubject = new ConnectionLogSubject(this);
_binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH)
? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH)
@@ -272,11 +268,11 @@ public class AMQProtocolEngine implement
return null;
}
});
-
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
- _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
- _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
- _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + _connectionId);
+ _dataDelivered = new StatisticsCounter("data-delivered-" + _connectionId);
+ _messagesReceived = new StatisticsCounter("messages-received-" + _connectionId);
+ _dataReceived = new StatisticsCounter("data-received-" + _connectionId);
_creationTime = System.currentTimeMillis();
}
@@ -323,11 +319,6 @@ public class AMQProtocolEngine implement
_sender = sender;
}
- public long getSessionID()
- {
- return _connectionID;
- }
-
public void setMaxFrameSize(int frameMax)
{
_maxFrameSize = frameMax;
@@ -368,7 +359,7 @@ public class AMQProtocolEngine implement
+ _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
+ "ms to establish identity. Closing as possible DoS.");
getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
- closeProtocolSession();
+ closeNetworkConnection();
}
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
@@ -382,37 +373,37 @@ public class AMQProtocolEngine implement
catch (ConnectionScopedRuntimeException e)
{
_logger.error("Unexpected exception", e);
- closeProtocolSession();
+ closeNetworkConnection();
}
catch (AMQProtocolVersionException e)
{
_logger.error("Unexpected protocol version", e);
- closeProtocolSession();
+ closeNetworkConnection();
}
catch (SenderClosedException e)
{
_logger.debug("Sender was closed abruptly, closing network.", e);
- closeProtocolSession();
+ closeNetworkConnection();
}
catch (SenderException e)
{
_logger.info("Unexpected exception on send, closing network.", e);
- closeProtocolSession();
+ closeNetworkConnection();
}
catch (TransportException e)
{
_logger.error("Unexpected transport exception", e);
- closeProtocolSession();
+ closeNetworkConnection();
}
catch (AMQFrameDecodingException e)
{
_logger.error("Frame decoding", e);
- closeProtocolSession();
+ closeNetworkConnection();
}
catch (IOException e)
{
_logger.error("I/O Exception", e);
- closeProtocolSession();
+ closeNetworkConnection();
}
catch (StoreException e)
{
@@ -484,7 +475,6 @@ public class AMQProtocolEngine implement
ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
- // This sets the protocol version (and hence framing classes) for this session.
setProtocolVersion(pv);
StringBuilder mechanismBuilder = new StringBuilder();
@@ -538,15 +528,8 @@ public class AMQProtocolEngine implement
}
- private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
- private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
- /**
- * Convenience method that writes a frame to the protocol session. Equivalent to calling
- * getProtocolSession().write().
- *
- * @param frame the frame to write
- */
+
public synchronized void writeFrame(AMQDataBlock frame)
{
if(_logger.isDebugEnabled())
@@ -730,12 +713,7 @@ public class AMQProtocolEngine implement
sessionRemoved(session);
}
- /**
- * Initialise heartbeats on the session.
- *
- * @param delay delay in seconds (not ms)
- */
- public void initHeartbeats(int delay)
+ private void initHeartbeats(int delay)
{
if (delay > 0)
{
@@ -749,9 +727,6 @@ public class AMQProtocolEngine implement
}
}
- /**
- * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
- */
private void closeAllChannels()
{
try
@@ -792,7 +767,7 @@ public class AMQProtocolEngine implement
}
}
- public void closeSession(final boolean connectionDropped)
+ private void closeConnectionInternal(final boolean connectionDropped)
{
if(runningAsSubject())
@@ -823,7 +798,7 @@ public class AMQProtocolEngine implement
@Override
public Object run()
{
- closeSession(connectionDropped);
+ closeConnectionInternal(connectionDropped);
return null;
}
});
@@ -921,7 +896,7 @@ public class AMQProtocolEngine implement
try
{
markChannelAwaitingCloseOk(channelId);
- closeSession(false);
+ closeConnectionInternal(false);
}
finally
{
@@ -931,7 +906,7 @@ public class AMQProtocolEngine implement
}
finally
{
- closeProtocolSession();
+ closeNetworkConnection();
}
}
}
@@ -941,7 +916,7 @@ public class AMQProtocolEngine implement
}
}
- public void closeProtocolSession()
+ public void closeNetworkConnection()
{
_network.close();
}
@@ -951,19 +926,7 @@ public class AMQProtocolEngine implement
return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
}
- /** @return an object that can be used to identity */
- public Object getKey()
- {
- return getRemoteAddress();
- }
-
- /**
- * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
- * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
- *
- * @return a String FQDN
- */
- public String getLocalFQDN()
+ private String getLocalFQDN()
{
SocketAddress address = _network.getLocalAddress();
if (address instanceof InetSocketAddress)
@@ -1149,11 +1112,11 @@ public class AMQProtocolEngine implement
{
try
{
- closeSession(true);
+ closeConnectionInternal(true);
}
finally
{
- closeProtocolSession();
+ closeNetworkConnection();
}
}
catch (ConnectionScopedRuntimeException | TransportException e)
@@ -1311,7 +1274,7 @@ public class AMQProtocolEngine implement
public long getConnectionId()
{
- return getSessionID();
+ return _connectionId;
}
public String getAddress()
@@ -1452,12 +1415,6 @@ public class AMQProtocolEngine implement
_dataReceived.reset();
}
- public boolean isSessionNameUnique(byte[] name)
- {
- // 0-8/0-9/0-9-1 sessions don't have names
- return true;
- }
-
public String getRemoteAddressString()
{
return String.valueOf(getRemoteAddress());
@@ -1640,19 +1597,20 @@ public class AMQProtocolEngine implement
}
try
{
- closeSession(false);
+ closeConnectionInternal(false);
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+ writeFrame(responseBody.generateFrame(0));
}
catch (Exception e)
{
- _logger.error("Error closing protocol session: " + e, e);
+ _logger.error("Error closing connection for " + getRemoteAddressString(), e);
+ }
+ finally
+ {
+ closeNetworkConnection();
}
-
- MethodRegistry methodRegistry = getMethodRegistry();
- ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
- writeFrame(responseBody.generateFrame(0));
-
- closeProtocolSession();
-
}
@Override
@@ -1667,12 +1625,17 @@ public class AMQProtocolEngine implement
try
{
- closeSession(false);
+ closeConnectionInternal(false);
}
catch (Exception e)
{
- _logger.error("Error closing protocol session: " + e, e);
+ _logger.error("Error closing connection: " + getRemoteAddressString(), e);
}
+ finally
+ {
+ closeNetworkConnection();
+ }
+
}
@Override
@@ -1692,7 +1655,7 @@ public class AMQProtocolEngine implement
SaslServer ss = getSaslServer();
if (ss == null)
{
- closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 );
+ closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in connection",0 );
}
MethodRegistry methodRegistry = getMethodRegistry();
SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Mon Mar 16 16:52:32 2015
@@ -377,7 +377,7 @@ public abstract class ConsumerTarget_0_8
{
String subscriber = "[channel=" + _channel +
", consumerTag=" + _consumerTag +
- ", session=" + getProtocolSession().getKey() ;
+ ", session=" + getConnection().getRemoteAddressString();
return subscriber + "]";
}
@@ -450,7 +450,7 @@ public abstract class ConsumerTarget_0_8
return _consumerTag;
}
- public AMQProtocolEngine getProtocolSession()
+ private AMQProtocolEngine getConnection()
{
return _channel.getConnection();
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Mon Mar 16 16:52:32 2015
@@ -365,7 +365,7 @@ public class ProtocolOutputConverterImpl
return _underlyingBody.writePayload(sender);
}
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession)
throws AMQException
{
throw new AMQException("This block should never be dispatched!");
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Mon Mar 16 16:52:32 2015
@@ -218,7 +218,7 @@ public class InternalTestProtocolSession
}
}
- public void closeProtocolSession()
+ public void closeNetworkConnection()
{
// Override as we don't have a real IOSession to close.
// The alternative is to fully implement the TestIOSession to return a CloseFuture from close();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java Mon Mar 16 16:52:32 2015
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
-/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends QpidTestCase
{
private AMQProtocolEngine _session;
@@ -62,7 +61,6 @@ public class MaxChannelsTest extends Qpi
try
{
_session.getVirtualHost().close();
- _session.closeSession(false);
}
finally
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Mon Mar 16 16:52:32 2015
@@ -324,12 +324,6 @@ public class Connection_1_0 implements C
}
@Override
- public boolean isSessionNameUnique(byte[] name)
- {
- return true; // TODO
- }
-
- @Override
public String getRemoteAddressString()
{
return String.valueOf(_conn.getRemoteAddress());
Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java Mon Mar 16 16:52:32 2015
@@ -212,7 +212,7 @@ public class HttpManagement extends Abst
if(port.getState() != State.ACTIVE)
{
- // TODO - RG
+ // TODO - RG - probably does nothing
port.startAsync();
}
Connector connector = null;
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java?rev=1667068&r1=1667067&r2=1667068&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java Mon Mar 16 16:52:32 2015
@@ -49,6 +49,21 @@ public class InternalBrokerHolder implem
@Override
public void start(BrokerOptions options) throws Exception
{
+ if (Thread.getDefaultUncaughtExceptionHandler() != null)
+ {
+ Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
+ {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e)
+ {
+ System.err.print("Thread terminated due to uncaught exception");
+ e.printStackTrace();
+
+ LOGGER.error("Uncaught exception from thread " + t.getName(), e);
+ }
+ });
+ }
+
LOGGER.info("Starting internal broker (same JVM)");
_broker = new Broker(new Action<Integer>()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org