You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/14 12:14:34 UTC
qpid-jms-amqp-0-x git commit: QPID-8074: [JMS AMQP 0-x][System Tests]
Move more client specific tests from Broker-J sources
Repository: qpid-jms-amqp-0-x
Updated Branches:
refs/heads/master bd4590471 -> 139f1c9f2
QPID-8074: [JMS AMQP 0-x][System Tests] Move more client specific tests from Broker-J sources
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/139f1c9f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/139f1c9f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/139f1c9f
Branch: refs/heads/master
Commit: 139f1c9f23681c2cf1e155b809a6bfb4f547170c
Parents: bd45904
Author: Keith Wall <kw...@apache.org>
Authored: Sun Jan 14 12:08:46 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Sun Jan 14 12:11:20 2018 +0000
----------------------------------------------------------------------
.../org/apache/qpid/test/utils/TCPTunneler.java | 609 -------------------
.../org/apache/qpid/test/utils/TCPTunneler.java | 609 +++++++++++++++++++
.../connection/ConnectionFactoryTest.java | 132 ++++
.../connection/ExceptionListenerTest.java | 174 ++++++
.../qpid/systest/connection/HeartbeatTest.java | 342 +++++++++++
5 files changed, 1257 insertions(+), 609 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/client/src/test/java/org/apache/qpid/test/utils/TCPTunneler.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/test/utils/TCPTunneler.java b/client/src/test/java/org/apache/qpid/test/utils/TCPTunneler.java
deleted file mode 100644
index 5e1ddad..0000000
--- a/client/src/test/java/org/apache/qpid/test/utils/TCPTunneler.java
+++ /dev/null
@@ -1,609 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.utils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A basic implementation of TCP traffic forwarder between ports.
- * It is intended to use in tests.
- */
-public class TCPTunneler implements AutoCloseable
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(TCPTunneler.class);
-
- private final TCPWorker _tcpWorker;
- private final ExecutorService _executor;
-
- public TCPTunneler(final int localPort, final String remoteHost,
- final int remotePort,
- final int numberOfConcurrentClients)
- {
- _executor = Executors.newFixedThreadPool(numberOfConcurrentClients * 2 + 1);
- _tcpWorker = new TCPWorker(localPort, remoteHost, remotePort, _executor);
- }
-
- public void start() throws IOException
- {
- _tcpWorker.start();
- }
-
- public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
- {
- _tcpWorker.stopClientToServerForwarding(clientAddress);
- }
-
- public void stop()
- {
- try
- {
- _tcpWorker.stop();
- }
- finally
- {
- _executor.shutdown();
- }
- }
-
- public void addClientListener(TunnelListener listener)
- {
- _tcpWorker.addClientListener(listener);
- }
-
- public void removeClientListener(TunnelListener listener)
- {
- _tcpWorker.removeClientListener(listener);
- }
-
- public void disconnect(InetSocketAddress address)
- {
- LOGGER.info("Disconnecting {}", address);
- if (address != null)
- {
- _tcpWorker.disconnect(address);
- }
- }
-
- public int getLocalPort()
- {
- return _tcpWorker.getLocalPort();
- }
-
- @Override
- public void close() throws Exception
- {
- stop();
- }
-
- public interface TunnelListener
- {
- void clientConnected(InetSocketAddress clientAddress);
-
- void clientDisconnected(InetSocketAddress clientAddress);
- }
-
- private static class TCPWorker implements Runnable
- {
- private final String _remoteHost;
- private final int _remotePort;
- private final int _localPort;
- private final String _remoteHostPort;
- private final AtomicBoolean _closed;
- private final Collection<SocketTunnel> _tunnels;
- private final Collection<TunnelListener> _tunnelListeners;
- private final TunnelListener _notifyingListener;
- private volatile ServerSocket _serverSocket;
- private volatile ExecutorService _executor;
- private int _actualLocalPort;
-
- public TCPWorker(final int localPort,
- final String remoteHost,
- final int remotePort,
- final ExecutorService executor)
- {
- _closed = new AtomicBoolean();
- _remoteHost = remoteHost;
- _remotePort = remotePort;
- _localPort = localPort;
- _remoteHostPort = _remoteHost + ":" + _remotePort;
- _executor = executor;
- _tunnels = new CopyOnWriteArrayList<>();
- _tunnelListeners = new CopyOnWriteArrayList<>();
- _notifyingListener = new TunnelListener()
- {
- @Override
- public void clientConnected(final InetSocketAddress clientAddress)
- {
- notifyClientConnected(clientAddress);
- }
-
- @Override
- public void clientDisconnected(final InetSocketAddress clientAddress)
- {
- try
- {
- notifyClientDisconnected(clientAddress);
- }
- finally
- {
- removeTunnel(clientAddress);
- }
- }
- };
- }
-
- @Override
- public void run()
- {
- String threadName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName("TCPTunnelerAcceptingThread");
- while (!_closed.get())
- {
- Socket acceptedSocket = _serverSocket.accept();
- LOGGER.debug("Client opened socket {}", acceptedSocket);
-
- createTunnel(acceptedSocket);
- }
- }
- catch (IOException e)
- {
- if (!_closed.get())
- {
- LOGGER.error("Exception in accepting thread", e);
- }
- }
- finally
- {
- closeServerSocket();
- _closed.set(true);
- Thread.currentThread().setName(threadName);
- }
- }
-
- public void start()
- {
- _actualLocalPort = _localPort;
- try
- {
- _serverSocket = new ServerSocket(_localPort);
- _actualLocalPort = _serverSocket.getLocalPort();
- LOGGER.info ("Starting TCPTunneler forwarding from port {} to {}",
- _actualLocalPort, _remoteHostPort);
- _serverSocket.setReuseAddress(true);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Cannot start TCPTunneler on port " + _actualLocalPort, e);
- }
-
- if (_serverSocket != null)
- {
- try
- {
- _executor.execute(this);
- }
- catch (Exception e)
- {
- try
- {
- closeServerSocket();
- }
- finally
- {
- throw new RuntimeException("Cannot start acceptor thread for TCPTunneler on port " + _actualLocalPort,
- e);
- }
- }
- }
- }
-
- public void stop()
- {
- if (_closed.compareAndSet(false, true))
- {
- LOGGER.info("Stopping TCPTunneler forwarding from port {} to {}",
- _actualLocalPort,
- _remoteHostPort);
- try
- {
- for (SocketTunnel tunnel : _tunnels)
- {
- tunnel.close();
- }
- }
- finally
- {
- closeServerSocket();
- }
-
- LOGGER.info("TCPTunneler forwarding from port {} to {} is stopped",
- _actualLocalPort,
- _remoteHostPort);
- }
- }
-
- public void addClientListener(TunnelListener listener)
- {
- _tunnelListeners.add(listener);
- for (SocketTunnel socketTunnel : _tunnels)
- {
- try
- {
- listener.clientConnected(socketTunnel.getClientAddress());
- }
- catch (Exception e)
- {
- LOGGER.warn("Exception on notifying client listener about connected client", e);
- }
- }
- }
-
- public void removeClientListener(TunnelListener listener)
- {
- _tunnelListeners.remove(listener);
- }
-
- public void disconnect(final InetSocketAddress address)
- {
- SocketTunnel client = removeTunnel(address);
- if (client != null && !client.isClosed())
- {
- client.close();
- LOGGER.info("Tunnel for {} is disconnected", address);
- }
- else
- {
- LOGGER.info("Tunnel for {} not found", address);
- }
- }
-
-
- private void createTunnel(final Socket localSocket)
- {
- Socket remoteSocket = null;
- try
- {
- LOGGER.debug("Opening socket to {} for {}", _remoteHostPort, localSocket);
- remoteSocket = new Socket(_remoteHost, _remotePort);
- LOGGER.debug("Opened socket to {} for {}", remoteSocket, localSocket);
- SocketTunnel tunnel = new SocketTunnel(localSocket, remoteSocket, _notifyingListener);
- LOGGER.debug("Socket tunnel is created from {} to {}", localSocket, remoteSocket);
- _tunnels.add(tunnel);
- tunnel.start(_executor);
- }
- catch (Exception e)
- {
- LOGGER.error("Cannot forward i/o traffic between {} and {}", localSocket, _remoteHostPort, e);
- SocketTunnel.closeSocket(localSocket);
- SocketTunnel.closeSocket(remoteSocket);
- }
- }
-
- private void notifyClientConnected(final InetSocketAddress clientAddress)
- {
- for (TunnelListener listener : _tunnelListeners)
- {
- try
- {
- listener.clientConnected(clientAddress);
- }
- catch (Exception e)
- {
- LOGGER.warn("Exception on notifying client listener about connected client", e);
- }
- }
- }
-
-
- private void notifyClientDisconnected(final InetSocketAddress clientAddress)
- {
- for (TunnelListener listener : _tunnelListeners)
- {
- try
- {
- listener.clientDisconnected(clientAddress);
- }
- catch (Exception e)
- {
- LOGGER.warn("Exception on notifying client listener about disconnected client", e);
- }
- }
- }
-
- public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
- {
- SocketTunnel target = null;
- for (SocketTunnel tunnel : _tunnels)
- {
- if (tunnel.getClientAddress().equals(clientAddress))
- {
- target = tunnel;
- break;
- }
- }
- if (target != null)
- {
- LOGGER.debug("Stopping forwarding from client {} to server", clientAddress);
- target.stopClientToServerForwarding();
- }
- else
- {
- throw new IllegalArgumentException("Could not find tunnel for address " + clientAddress);
- }
- }
-
- private void closeServerSocket()
- {
- if (_serverSocket != null)
- {
- try
- {
- _serverSocket.close();
- }
- catch (IOException e)
- {
- LOGGER.warn("Exception on closing of accepting socket", e);
- }
- finally
- {
- _serverSocket = null;
- }
- }
- }
-
- private SocketTunnel removeTunnel(final InetSocketAddress clientAddress)
- {
- SocketTunnel client = null;
- for (SocketTunnel c : _tunnels)
- {
- if (c.isClientAddress(clientAddress))
- {
- client = c;
- break;
- }
- }
- if (client != null)
- {
- _tunnels.remove(client);
- }
- return client;
- }
-
- public int getLocalPort()
- {
- if (_serverSocket == null)
- {
- return -1;
- }
- return _serverSocket.getLocalPort();
- }
- }
-
- private static class SocketTunnel
- {
- private final Socket _clientSocket;
- private final Socket _serverSocket;
- private final TunnelListener _tunnelListener;
- private final AtomicBoolean _closed;
- private final AutoClosingStreamForwarder _clientToServer;
- private final AutoClosingStreamForwarder _serverToClient;
- private final InetSocketAddress _clientSocketAddress;
-
- public SocketTunnel(final Socket clientSocket,
- final Socket serverSocket,
- final TunnelListener tunnelListener) throws IOException
- {
- _clientSocket = clientSocket;
- _clientSocketAddress =
- new InetSocketAddress(clientSocket.getInetAddress().getHostName(), _clientSocket.getPort());
- _serverSocket = serverSocket;
- _closed = new AtomicBoolean();
- _tunnelListener = tunnelListener;
- _clientSocket.setKeepAlive(true);
- _serverSocket.setKeepAlive(true);
- _clientToServer = new AutoClosingStreamForwarder(new StreamForwarder(_clientSocket, _serverSocket));
- _serverToClient = new AutoClosingStreamForwarder(new StreamForwarder(_serverSocket, _clientSocket));
- }
-
- public void close()
- {
- if (_closed.compareAndSet(false, true))
- {
- try
- {
- closeSocket(_serverSocket);
- closeSocket(_clientSocket);
- }
- finally
- {
- _tunnelListener.clientDisconnected(getClientAddress());
- }
- }
- }
-
- public void start(Executor executor) throws IOException
- {
- executor.execute(_clientToServer);
- executor.execute(_serverToClient);
- _tunnelListener.clientConnected(getClientAddress());
- }
-
- public void stopClientToServerForwarding()
- {
- _clientToServer.stopForwarding();
- }
-
- public boolean isClosed()
- {
- return _closed.get();
- }
-
- public boolean isClientAddress(final InetSocketAddress clientAddress)
- {
- return getClientAddress().equals(clientAddress);
- }
-
- public InetSocketAddress getClientAddress()
- {
- return _clientSocketAddress;
- }
-
- private static void closeSocket(Socket socket)
- {
- if (socket != null)
- {
- try
- {
- socket.close();
- }
- catch (IOException e)
- {
- LOGGER.warn("Exception on closing of socket {}", socket, e);
- }
- }
- }
-
-
- private class AutoClosingStreamForwarder implements Runnable
- {
- private StreamForwarder _streamForwarder;
-
- public AutoClosingStreamForwarder(StreamForwarder streamForwarder)
- {
- _streamForwarder = streamForwarder;
- }
-
- @Override
- public void run()
- {
- Thread currentThread = Thread.currentThread();
- String originalThreadName = currentThread.getName();
- try
- {
- currentThread.setName(_streamForwarder.getName());
- _streamForwarder.run();
- }
- finally
- {
- close();
- currentThread.setName(originalThreadName);
- }
- }
-
- public void stopForwarding()
- {
- _streamForwarder.stopForwarding();
- }
- }
- }
-
- private static class StreamForwarder implements Runnable
- {
- private static final int BUFFER_SIZE = 4096;
-
- private final InputStream _inputStream;
- private final OutputStream _outputStream;
- private final String _name;
- private AtomicBoolean _stopForwarding = new AtomicBoolean();
-
- public StreamForwarder(Socket input, Socket output) throws IOException
- {
- _inputStream = input.getInputStream();
- _outputStream = output.getOutputStream();
- _name = "Forwarder-" + input.getLocalSocketAddress() + "->" + output.getRemoteSocketAddress();
- }
-
- @Override
- public void run()
- {
- byte[] buffer = new byte[BUFFER_SIZE];
- int bytesRead;
- try
- {
- while ((bytesRead = _inputStream.read(buffer)) != -1)
- {
- if (!_stopForwarding.get())
- {
- _outputStream.write(buffer, 0, bytesRead);
- _outputStream.flush();
- LOGGER.debug("Forwarded {} byte(s)", bytesRead);
- }
- else
- {
- LOGGER.debug("Discarded {} byte(s)", bytesRead);
- }
- }
- }
- catch (IOException e)
- {
- LOGGER.warn("Exception on forwarding data for {}: {}", _name, e.getMessage());
- }
- finally
- {
- try
- {
- _inputStream.close();
- }
- catch (IOException e)
- {
- // ignore
- }
-
- try
- {
- _outputStream.close();
- }
- catch (IOException e)
- {
- // ignore
- }
- }
- }
-
-
- public String getName()
- {
- return _name;
- }
-
- public void stopForwarding()
- {
- _stopForwarding.set(true);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/systests/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java b/systests/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
new file mode 100644
index 0000000..5e1ddad
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
@@ -0,0 +1,609 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic implementation of TCP traffic forwarder between ports.
+ * It is intended to use in tests.
+ */
+public class TCPTunneler implements AutoCloseable
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(TCPTunneler.class);
+
+ private final TCPWorker _tcpWorker;
+ private final ExecutorService _executor;
+
+ public TCPTunneler(final int localPort, final String remoteHost,
+ final int remotePort,
+ final int numberOfConcurrentClients)
+ {
+ _executor = Executors.newFixedThreadPool(numberOfConcurrentClients * 2 + 1);
+ _tcpWorker = new TCPWorker(localPort, remoteHost, remotePort, _executor);
+ }
+
+ public void start() throws IOException
+ {
+ _tcpWorker.start();
+ }
+
+ public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
+ {
+ _tcpWorker.stopClientToServerForwarding(clientAddress);
+ }
+
+ public void stop()
+ {
+ try
+ {
+ _tcpWorker.stop();
+ }
+ finally
+ {
+ _executor.shutdown();
+ }
+ }
+
+ public void addClientListener(TunnelListener listener)
+ {
+ _tcpWorker.addClientListener(listener);
+ }
+
+ public void removeClientListener(TunnelListener listener)
+ {
+ _tcpWorker.removeClientListener(listener);
+ }
+
+ public void disconnect(InetSocketAddress address)
+ {
+ LOGGER.info("Disconnecting {}", address);
+ if (address != null)
+ {
+ _tcpWorker.disconnect(address);
+ }
+ }
+
+ public int getLocalPort()
+ {
+ return _tcpWorker.getLocalPort();
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ stop();
+ }
+
+ public interface TunnelListener
+ {
+ void clientConnected(InetSocketAddress clientAddress);
+
+ void clientDisconnected(InetSocketAddress clientAddress);
+ }
+
+ private static class TCPWorker implements Runnable
+ {
+ private final String _remoteHost;
+ private final int _remotePort;
+ private final int _localPort;
+ private final String _remoteHostPort;
+ private final AtomicBoolean _closed;
+ private final Collection<SocketTunnel> _tunnels;
+ private final Collection<TunnelListener> _tunnelListeners;
+ private final TunnelListener _notifyingListener;
+ private volatile ServerSocket _serverSocket;
+ private volatile ExecutorService _executor;
+ private int _actualLocalPort;
+
+ public TCPWorker(final int localPort,
+ final String remoteHost,
+ final int remotePort,
+ final ExecutorService executor)
+ {
+ _closed = new AtomicBoolean();
+ _remoteHost = remoteHost;
+ _remotePort = remotePort;
+ _localPort = localPort;
+ _remoteHostPort = _remoteHost + ":" + _remotePort;
+ _executor = executor;
+ _tunnels = new CopyOnWriteArrayList<>();
+ _tunnelListeners = new CopyOnWriteArrayList<>();
+ _notifyingListener = new TunnelListener()
+ {
+ @Override
+ public void clientConnected(final InetSocketAddress clientAddress)
+ {
+ notifyClientConnected(clientAddress);
+ }
+
+ @Override
+ public void clientDisconnected(final InetSocketAddress clientAddress)
+ {
+ try
+ {
+ notifyClientDisconnected(clientAddress);
+ }
+ finally
+ {
+ removeTunnel(clientAddress);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void run()
+ {
+ String threadName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName("TCPTunnelerAcceptingThread");
+ while (!_closed.get())
+ {
+ Socket acceptedSocket = _serverSocket.accept();
+ LOGGER.debug("Client opened socket {}", acceptedSocket);
+
+ createTunnel(acceptedSocket);
+ }
+ }
+ catch (IOException e)
+ {
+ if (!_closed.get())
+ {
+ LOGGER.error("Exception in accepting thread", e);
+ }
+ }
+ finally
+ {
+ closeServerSocket();
+ _closed.set(true);
+ Thread.currentThread().setName(threadName);
+ }
+ }
+
+ public void start()
+ {
+ _actualLocalPort = _localPort;
+ try
+ {
+ _serverSocket = new ServerSocket(_localPort);
+ _actualLocalPort = _serverSocket.getLocalPort();
+ LOGGER.info ("Starting TCPTunneler forwarding from port {} to {}",
+ _actualLocalPort, _remoteHostPort);
+ _serverSocket.setReuseAddress(true);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Cannot start TCPTunneler on port " + _actualLocalPort, e);
+ }
+
+ if (_serverSocket != null)
+ {
+ try
+ {
+ _executor.execute(this);
+ }
+ catch (Exception e)
+ {
+ try
+ {
+ closeServerSocket();
+ }
+ finally
+ {
+ throw new RuntimeException("Cannot start acceptor thread for TCPTunneler on port " + _actualLocalPort,
+ e);
+ }
+ }
+ }
+ }
+
+ public void stop()
+ {
+ if (_closed.compareAndSet(false, true))
+ {
+ LOGGER.info("Stopping TCPTunneler forwarding from port {} to {}",
+ _actualLocalPort,
+ _remoteHostPort);
+ try
+ {
+ for (SocketTunnel tunnel : _tunnels)
+ {
+ tunnel.close();
+ }
+ }
+ finally
+ {
+ closeServerSocket();
+ }
+
+ LOGGER.info("TCPTunneler forwarding from port {} to {} is stopped",
+ _actualLocalPort,
+ _remoteHostPort);
+ }
+ }
+
+ public void addClientListener(TunnelListener listener)
+ {
+ _tunnelListeners.add(listener);
+ for (SocketTunnel socketTunnel : _tunnels)
+ {
+ try
+ {
+ listener.clientConnected(socketTunnel.getClientAddress());
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Exception on notifying client listener about connected client", e);
+ }
+ }
+ }
+
+ public void removeClientListener(TunnelListener listener)
+ {
+ _tunnelListeners.remove(listener);
+ }
+
+ public void disconnect(final InetSocketAddress address)
+ {
+ SocketTunnel client = removeTunnel(address);
+ if (client != null && !client.isClosed())
+ {
+ client.close();
+ LOGGER.info("Tunnel for {} is disconnected", address);
+ }
+ else
+ {
+ LOGGER.info("Tunnel for {} not found", address);
+ }
+ }
+
+
+ private void createTunnel(final Socket localSocket)
+ {
+ Socket remoteSocket = null;
+ try
+ {
+ LOGGER.debug("Opening socket to {} for {}", _remoteHostPort, localSocket);
+ remoteSocket = new Socket(_remoteHost, _remotePort);
+ LOGGER.debug("Opened socket to {} for {}", remoteSocket, localSocket);
+ SocketTunnel tunnel = new SocketTunnel(localSocket, remoteSocket, _notifyingListener);
+ LOGGER.debug("Socket tunnel is created from {} to {}", localSocket, remoteSocket);
+ _tunnels.add(tunnel);
+ tunnel.start(_executor);
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Cannot forward i/o traffic between {} and {}", localSocket, _remoteHostPort, e);
+ SocketTunnel.closeSocket(localSocket);
+ SocketTunnel.closeSocket(remoteSocket);
+ }
+ }
+
+ private void notifyClientConnected(final InetSocketAddress clientAddress)
+ {
+ for (TunnelListener listener : _tunnelListeners)
+ {
+ try
+ {
+ listener.clientConnected(clientAddress);
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Exception on notifying client listener about connected client", e);
+ }
+ }
+ }
+
+
+ private void notifyClientDisconnected(final InetSocketAddress clientAddress)
+ {
+ for (TunnelListener listener : _tunnelListeners)
+ {
+ try
+ {
+ listener.clientDisconnected(clientAddress);
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Exception on notifying client listener about disconnected client", e);
+ }
+ }
+ }
+
+ public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
+ {
+ SocketTunnel target = null;
+ for (SocketTunnel tunnel : _tunnels)
+ {
+ if (tunnel.getClientAddress().equals(clientAddress))
+ {
+ target = tunnel;
+ break;
+ }
+ }
+ if (target != null)
+ {
+ LOGGER.debug("Stopping forwarding from client {} to server", clientAddress);
+ target.stopClientToServerForwarding();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Could not find tunnel for address " + clientAddress);
+ }
+ }
+
+ private void closeServerSocket()
+ {
+ if (_serverSocket != null)
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Exception on closing of accepting socket", e);
+ }
+ finally
+ {
+ _serverSocket = null;
+ }
+ }
+ }
+
+ private SocketTunnel removeTunnel(final InetSocketAddress clientAddress)
+ {
+ SocketTunnel client = null;
+ for (SocketTunnel c : _tunnels)
+ {
+ if (c.isClientAddress(clientAddress))
+ {
+ client = c;
+ break;
+ }
+ }
+ if (client != null)
+ {
+ _tunnels.remove(client);
+ }
+ return client;
+ }
+
+ public int getLocalPort()
+ {
+ if (_serverSocket == null)
+ {
+ return -1;
+ }
+ return _serverSocket.getLocalPort();
+ }
+ }
+
+ private static class SocketTunnel
+ {
+ private final Socket _clientSocket;
+ private final Socket _serverSocket;
+ private final TunnelListener _tunnelListener;
+ private final AtomicBoolean _closed;
+ private final AutoClosingStreamForwarder _clientToServer;
+ private final AutoClosingStreamForwarder _serverToClient;
+ private final InetSocketAddress _clientSocketAddress;
+
+ public SocketTunnel(final Socket clientSocket,
+ final Socket serverSocket,
+ final TunnelListener tunnelListener) throws IOException
+ {
+ _clientSocket = clientSocket;
+ _clientSocketAddress =
+ new InetSocketAddress(clientSocket.getInetAddress().getHostName(), _clientSocket.getPort());
+ _serverSocket = serverSocket;
+ _closed = new AtomicBoolean();
+ _tunnelListener = tunnelListener;
+ _clientSocket.setKeepAlive(true);
+ _serverSocket.setKeepAlive(true);
+ _clientToServer = new AutoClosingStreamForwarder(new StreamForwarder(_clientSocket, _serverSocket));
+ _serverToClient = new AutoClosingStreamForwarder(new StreamForwarder(_serverSocket, _clientSocket));
+ }
+
+ public void close()
+ {
+ if (_closed.compareAndSet(false, true))
+ {
+ try
+ {
+ closeSocket(_serverSocket);
+ closeSocket(_clientSocket);
+ }
+ finally
+ {
+ _tunnelListener.clientDisconnected(getClientAddress());
+ }
+ }
+ }
+
+ public void start(Executor executor) throws IOException
+ {
+ executor.execute(_clientToServer);
+ executor.execute(_serverToClient);
+ _tunnelListener.clientConnected(getClientAddress());
+ }
+
+ public void stopClientToServerForwarding()
+ {
+ _clientToServer.stopForwarding();
+ }
+
+ public boolean isClosed()
+ {
+ return _closed.get();
+ }
+
+ public boolean isClientAddress(final InetSocketAddress clientAddress)
+ {
+ return getClientAddress().equals(clientAddress);
+ }
+
+ public InetSocketAddress getClientAddress()
+ {
+ return _clientSocketAddress;
+ }
+
+ private static void closeSocket(Socket socket)
+ {
+ if (socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Exception on closing of socket {}", socket, e);
+ }
+ }
+ }
+
+
+ private class AutoClosingStreamForwarder implements Runnable
+ {
+ private StreamForwarder _streamForwarder;
+
+ public AutoClosingStreamForwarder(StreamForwarder streamForwarder)
+ {
+ _streamForwarder = streamForwarder;
+ }
+
+ @Override
+ public void run()
+ {
+ Thread currentThread = Thread.currentThread();
+ String originalThreadName = currentThread.getName();
+ try
+ {
+ currentThread.setName(_streamForwarder.getName());
+ _streamForwarder.run();
+ }
+ finally
+ {
+ close();
+ currentThread.setName(originalThreadName);
+ }
+ }
+
+ public void stopForwarding()
+ {
+ _streamForwarder.stopForwarding();
+ }
+ }
+ }
+
+ private static class StreamForwarder implements Runnable
+ {
+ private static final int BUFFER_SIZE = 4096;
+
+ private final InputStream _inputStream;
+ private final OutputStream _outputStream;
+ private final String _name;
+ private AtomicBoolean _stopForwarding = new AtomicBoolean();
+
+ public StreamForwarder(Socket input, Socket output) throws IOException
+ {
+ _inputStream = input.getInputStream();
+ _outputStream = output.getOutputStream();
+ _name = "Forwarder-" + input.getLocalSocketAddress() + "->" + output.getRemoteSocketAddress();
+ }
+
+ @Override
+ public void run()
+ {
+ byte[] buffer = new byte[BUFFER_SIZE];
+ int bytesRead;
+ try
+ {
+ while ((bytesRead = _inputStream.read(buffer)) != -1)
+ {
+ if (!_stopForwarding.get())
+ {
+ _outputStream.write(buffer, 0, bytesRead);
+ _outputStream.flush();
+ LOGGER.debug("Forwarded {} byte(s)", bytesRead);
+ }
+ else
+ {
+ LOGGER.debug("Discarded {} byte(s)", bytesRead);
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Exception on forwarding data for {}: {}", _name, e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ _inputStream.close();
+ }
+ catch (IOException e)
+ {
+ // ignore
+ }
+
+ try
+ {
+ _outputStream.close();
+ }
+ catch (IOException e)
+ {
+ // ignore
+ }
+ }
+ }
+
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public void stopForwarding()
+ {
+ _stopForwarding.set(true);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java
new file mode 100644
index 0000000..c9eb034
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.connection;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.InetSocketAddress;
+
+import javax.jms.Connection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+
+public class ConnectionFactoryTest extends JmsTestBase
+{
+ private static final String CONNECTION_URL = "amqp://guest:guest@clientID/?brokerlist='tcp://%s:%d'";
+ private String _url;
+
+ @Before
+ public void setUp()
+ {
+ final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ _url = String.format(CONNECTION_URL,
+ brokerAddress.getHostString(),
+ brokerAddress.getPort());
+ }
+ /**
+ * The username & password specified should not override the default
+ * specified in the URL.
+ */
+ @Test
+ public void testCreateConnectionWithUsernamePassword() throws Exception
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(_url);
+
+ AMQConnection con = null;
+ try
+ {
+ con = factory.createConnection();
+ assertEquals("Usernames used is different from the one in URL","guest",con.getConnectionURL().getUsername());
+ assertEquals("Password used is different from the one in URL","guest",con.getConnectionURL().getPassword());
+ }
+ finally
+ {
+ if (con != null)
+ {
+ con.close();
+ }
+ }
+
+ AMQConnection con2 = null;
+ try
+ {
+ con2 = factory.createConnection("user", "pass");
+ assertEquals("Usernames used is different from the one in URL","user",con2.getConnectionURL().getUsername());
+ assertEquals("Password used is different from the one in URL","pass",con2.getConnectionURL().getPassword());
+ }
+ catch(Exception e)
+ {
+ // ignore
+ }
+ finally
+ {
+ if (con2 != null)
+ {
+ con2.close();
+ }
+ }
+
+ AMQConnection con3 = null;
+ try
+ {
+ con3 = factory.createConnection();
+ assertEquals("Usernames used is different from the one in URL","guest",con3.getConnectionURL().getUsername());
+ assertEquals("Password used is different from the one in URL","guest",con3.getConnectionURL().getPassword());
+ }
+ finally
+ {
+ if (con3 != null)
+ {
+ con3.close();
+ }
+ }
+ }
+
+ /**
+ * Verifies that a connection can be made using an instance of AMQConnectionFactory created with the
+ * default constructor and provided with the connection url via setter.
+ */
+ @Test
+ public void testCreatingConnectionWithInstanceMadeUsingDefaultConstructor() throws Exception
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory();
+ factory.setConnectionURLString(_url);
+
+ Connection con = null;
+ try
+ {
+ con = factory.createConnection();
+ }
+ finally
+ {
+ if (con != null)
+ {
+ con.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/systests/src/test/java/org/apache/qpid/systest/connection/ExceptionListenerTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/ExceptionListenerTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/ExceptionListenerTest.java
new file mode 100644
index 0000000..d913b8f
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/ExceptionListenerTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.systest.connection;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.core.util.Utils;
+
+public class ExceptionListenerTest extends JmsTestBase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionListenerTest.class);
+
+ /**
+ * This test reproduces a deadlock that was the subject of a support call. A Spring based
+ * application was using SingleConnectionFactory. It installed an ExceptionListener that
+ * stops and closes the connection in response to any exception. On receipt of a message
+ * the application would create a new session then send a response message (within onMessage).
+ * It appears that a misconfiguration in the application meant that some of these messages
+ * were bounced (no-route). Bounces are treated like connection exceptions and are passed
+ * back to the application via the ExceptionListener. The deadlock occurred between the
+ * ExceptionListener's call to stop() and the MessageListener's attempt to create a new
+ * session.
+ */
+ @Test
+ public void testExceptionListenerConnectionStopDeadlock() throws Exception
+ {
+ assumeThat("Feature 'close on no route' is only implemented Broker-J",
+ getBrokerAdmin().getBrokerType(),
+ is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
+ assumeThat("Feature 'close on no route' is only implemented for AMQP 0-8..0-9-1",
+ getProtocol(),
+ is(not(equalTo("0-10"))));
+
+ Map<String, String> options = new HashMap<>();
+ options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(false));
+
+ final Connection connection = getConnection(options);
+
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue messageQueue = session.createQueue(getTestQueueName());
+ session.createConsumer(messageQueue).close(); // Create queue by side-effect
+
+ // Put 10 messages onto messageQueue
+ Utils.sendMessages(session, messageQueue, 10);
+ // Install an exception listener that stops/closes the connection on receipt of 2nd AMQNoRouteException.
+ // (Triggering on the 2nd (rather than 1st) seems to increase the probability that the test ends in deadlock,
+ // at least on my machine).
+ final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2);
+ final AtomicBoolean doneClosed = new AtomicBoolean();
+ final CountDownLatch connectionClosedAttemptLatch = new CountDownLatch(1);
+ final AtomicReference<Exception> connectionCloseException = new AtomicReference<>();
+ final ExceptionListener listener = new ExceptionListener()
+ {
+ @Override
+ public void onException(JMSException exception)
+ {
+ exceptionReceivedLatch.countDown();
+ if (exceptionReceivedLatch.getCount() == 0)
+ {
+ try
+ {
+ if (doneClosed.compareAndSet(false, true))
+ {
+ connection.stop();
+ connection.close();
+ }
+ }
+ catch (Exception e)
+ {
+ // We expect no exception to be caught
+ connectionCloseException.set(e);
+ }
+ finally
+ {
+ connectionClosedAttemptLatch.countDown();
+ }
+
+ }
+ }
+ };
+ connection.setExceptionListener(listener);
+
+ // Create a message listener that receives from testQueue and tries to forward them to unknown queue (thus
+ // provoking AMQNoRouteException exceptions to be delivered to the ExceptionListener).
+ final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown");
+ MessageListener redirectingMessageListener = new MessageListener()
+ {
+ @Override
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ Session mlSession = connection.createSession(true, Session.SESSION_TRANSACTED); // ** Deadlock
+ mlSession.createProducer(unknownQueue).send(msg); // will cause async AMQNoRouteException;
+ mlSession.commit();
+ }
+ catch (JMSException je)
+ {
+ // Connection is closed by the listener, so exceptions here are expected.
+ LOGGER.debug("Expected exception - message listener got exception", je);
+ }
+ }
+ };
+
+ MessageConsumer consumer = session.createConsumer(messageQueue);
+ consumer.setMessageListener(redirectingMessageListener);
+ connection.start();
+
+ // Await an exception
+ boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Exception listener did not hear at least one exception within timeout", exceptionReceived);
+
+ // Await the connection listener to close the connection
+ boolean closeAttemptedReceived = connectionClosedAttemptLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Exception listener did not try to close the exception within timeout", closeAttemptedReceived);
+ assertNull("Exception listener should not have had experienced an exception : " + connectionCloseException.get(), connectionCloseException.get());
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/systests/src/test/java/org/apache/qpid/systest/connection/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/HeartbeatTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/HeartbeatTest.java
new file mode 100644
index 0000000..2123d93
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/HeartbeatTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.connection;
+
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.test.utils.TCPTunneler;
+
+public class HeartbeatTest extends JmsTestBase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatTest.class);
+ private static final String CONNECTION_URL_WITH_HEARTBEAT = "amqp://guest:guest@clientid/?brokerlist='localhost:%d?heartbeat='%d''";
+ private static final int MAXIMUM_WAIT_TIME = 2900;
+ private final TestListener _listener = new TestListener("listener", 2, 2);
+
+ @Test
+ public void testHeartbeatsEnabledUsingUrl() throws Exception
+ {
+ final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT, getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP).getPort(), 1);
+ AMQConnection conn = new AMQConnection(url);
+ try
+ {
+ setHeartbeatListener(conn, _listener);
+ conn.start();
+
+ _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+ assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
+ assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testHeartbeatsEnabledUsingSystemProperty() throws Exception
+ {
+ System.setProperty(QPID_HEARTBEAT_INTERVAL, "1");
+ AMQConnection conn = (AMQConnection) getConnection();
+ try
+ {
+ setHeartbeatListener(conn, _listener);
+ conn.start();
+
+ _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+ assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
+ assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
+ }
+ finally
+ {
+ System.clearProperty(QPID_HEARTBEAT_INTERVAL);
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testHeartbeatsDisabledUsingSystemProperty() throws Exception
+ {
+ System.setProperty(QPID_HEARTBEAT_INTERVAL, "0");
+ AMQConnection conn = (AMQConnection) getConnection();
+ try
+ {
+ setHeartbeatListener(conn, _listener);
+ conn.start();
+
+ _listener.awaitExpectedHeartbeats(2000);
+
+ assertEquals("Heartbeats unexpectedly received", 0, _listener.getHeartbeatsReceived());
+ assertEquals("Heartbeats unexpectedly sent ", 0, _listener.getHeartbeatsSent());
+ }
+ finally
+ {
+ System.clearProperty(QPID_HEARTBEAT_INTERVAL);
+ conn.close();
+ }
+
+ }
+
+ /**
+ * This test carefully arranges message flow so that bytes flow only from producer to broker
+ * on the producer side and broker to consumer on the consumer side, deliberately leaving the
+ * reverse path quiet so heartbeats will flow.
+ */
+ @Test
+ public void testUnidirectionalHeartbeating() throws Exception
+ {
+ System.setProperty(QPID_HEARTBEAT_INTERVAL,"1");
+ AMQConnection receiveConn = (AMQConnection) getConnection();
+ AMQConnection sendConn = (AMQConnection) getConnection();
+ try
+ {
+ TestListener receiveListener = new TestListener("receiverListener", 2, 0);
+ TestListener sendListener = new TestListener("senderListener", 0, 2);
+
+ Session receiveSession = receiveConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Session senderSession = sendConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Destination destination = receiveSession.createQueue(getTestQueueName());
+
+ MessageConsumer consumer = receiveSession.createConsumer(destination);
+ MessageProducer producer = senderSession.createProducer(destination);
+
+ setHeartbeatListener(receiveConn, receiveListener);
+ setHeartbeatListener(sendConn, sendListener);
+ receiveConn.start();
+
+ // Start the flow of messages to the consumer
+ consumer.receiveNoWait();
+
+ for(int i = 0; i < 5; i++)
+ {
+ producer.send(senderSession.createTextMessage("Msg " + i));
+ Thread.sleep(500);
+ assertNotNull("Expected to receive message within " + getReceiveTimeout() + "ms.", consumer.receive(getReceiveTimeout()));
+ // Consumer does not ack the message in order that no bytes flow from consumer connection back to Broker
+ }
+
+ assertTrue("Too few heartbeats sent "+ receiveListener.getHeartbeatsSent() +" (expected at least 2)", receiveListener.getHeartbeatsSent()>=2);
+ assertEquals("Unexpected number of heartbeats sent by the sender: ",0,sendListener.getHeartbeatsSent());
+
+ assertTrue("Too few heartbeats received at the sender "+ sendListener.getHeartbeatsReceived() +" (expected at least 2)", sendListener.getHeartbeatsReceived()>=2);
+ assertEquals("Unexpected number of heartbeats received by the receiver: ",0,receiveListener.getHeartbeatsReceived());
+ }
+ finally
+ {
+ System.clearProperty(QPID_HEARTBEAT_INTERVAL);
+ receiveConn.close();
+ sendConn.close();
+ }
+
+ }
+
+
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testHeartbeatsEnabledUsingAmqjLegacySystemProperty() throws Exception
+ {
+ System.setProperty("amqj.heartbeat.delay", "1");
+ AMQConnection conn = (AMQConnection) getConnection();
+ try
+ {
+ setHeartbeatListener(conn, _listener);
+ conn.start();
+
+ _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+ assertTrue("Too few heartbeats received: " + _listener.getHeartbeatsReceived() + " (expected at least 2)",
+ _listener.getHeartbeatsReceived() >= 2);
+ assertTrue("Too few heartbeats sent " + _listener.getHeartbeatsSent() + " (expected at least 2)",
+ _listener.getHeartbeatsSent() >= 2);
+ }
+ finally
+ {
+ System.clearProperty("amqj.heartbeat.delay");
+ conn.close();
+ }
+
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testHeartbeatsEnabledUsingOlderLegacySystemProperty() throws Exception
+ {
+ System.setProperty("idle_timeout", "1000");
+ AMQConnection conn = (AMQConnection) getConnection();
+ try
+ {
+ setHeartbeatListener(conn, _listener);
+ conn.start();
+
+ _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+ assertTrue("Too few heartbeats received: " + _listener.getHeartbeatsReceived() + " (expected at least 2)",
+ _listener.getHeartbeatsReceived() >= 2);
+ assertTrue("Too few heartbeats sent " + _listener.getHeartbeatsSent() + " (expected at least 2)",
+ _listener.getHeartbeatsSent() >= 2);
+ }
+ finally
+ {
+ System.clearProperty("idle_timeout");
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testClientStopsSendingHeartbeats_BrokerClosesConnection() throws Exception
+ {
+ try(TCPTunneler tcpTunneler = new TCPTunneler(0, "localhost", getBrokerAdmin().getBrokerAddress(
+ BrokerAdmin.PortType.AMQP).getPort(), 1))
+ {
+ tcpTunneler.start();
+
+ final AtomicReference<InetSocketAddress> clientAddressRef = new AtomicReference<>();
+ tcpTunneler.addClientListener(new TCPTunneler.TunnelListener()
+ {
+ @Override
+ public void clientConnected(final InetSocketAddress clientAddress)
+ {
+ clientAddressRef.set(clientAddress);
+ }
+
+ @Override
+ public void clientDisconnected(final InetSocketAddress clientAddress)
+ {
+ }
+ });
+
+ final CountDownLatch exceptionLatch = new CountDownLatch(1);
+ final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT, tcpTunneler.getLocalPort(), 1);
+ AMQConnection conn = new AMQConnection(url);
+ setHeartbeatListener(conn, _listener);
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ @Override
+ public void onException(final JMSException exception)
+ {
+ LOGGER.debug("Exception listener got exception", exception);
+ exceptionLatch.countDown();
+ }
+ });
+ conn.start();
+
+ assertNotNull(clientAddressRef.get());
+
+ _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+ assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
+ assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
+
+ tcpTunneler.stopClientToServerForwarding(clientAddressRef.get());
+
+ exceptionLatch.await(5, TimeUnit.SECONDS);
+ assertTrue("Connection should be disconnected within timeout", conn.isConnected());
+ }
+ }
+
+ private void setHeartbeatListener(final AMQConnection conn, final TestListener listener) throws Exception
+ {
+ final Method setHeartbeatListener = conn.getClass().getDeclaredMethod("setHeartbeatListener", HeartbeatListener.class);
+ setHeartbeatListener.setAccessible(true);
+ setHeartbeatListener.invoke(conn, listener);
+ }
+
+ private class TestListener implements HeartbeatListener
+ {
+
+ private final String _name;
+ private final AtomicInteger _heartbeatsReceived = new AtomicInteger(0);
+ private final AtomicInteger _heartbeatsSent = new AtomicInteger(0);
+ private final CountDownLatch _expectedReceivedHeartbeats;
+ private final CountDownLatch _expectedSentHeartbeats;
+
+ TestListener(String name, int expectedSentHeartbeats, int expectedReceivedHeartbeats)
+ {
+ _name = name;
+ _expectedReceivedHeartbeats = new CountDownLatch(expectedReceivedHeartbeats);
+ _expectedSentHeartbeats = new CountDownLatch(expectedSentHeartbeats);
+ }
+
+ @Override
+ public void heartbeatReceived()
+ {
+ LOGGER.debug(_name + " heartbeat received");
+ _heartbeatsReceived.incrementAndGet();
+ _expectedReceivedHeartbeats.countDown();
+ }
+
+ int getHeartbeatsReceived()
+ {
+ return _heartbeatsReceived.get();
+ }
+
+ @Override
+ public void heartbeatSent()
+ {
+ LOGGER.debug(_name + " heartbeat sent");
+ _heartbeatsSent.incrementAndGet();
+ _expectedSentHeartbeats.countDown();
+ }
+
+ int getHeartbeatsSent()
+ {
+ return _heartbeatsSent.get();
+ }
+
+ void awaitExpectedHeartbeats(final long maximumWaitTime) throws InterruptedException
+ {
+ long startTime = System.currentTimeMillis();
+ _expectedSentHeartbeats.await(maximumWaitTime, TimeUnit.MILLISECONDS);
+
+ long remainingTime = maximumWaitTime - (System.currentTimeMillis() - startTime);
+ if (remainingTime > 0)
+ {
+ _expectedReceivedHeartbeats.await(remainingTime, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org