You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/14 12:14:34 UTC

qpid-jms-amqp-0-x git commit: QPID-8074: [JMS AMQP 0-x][System Tests] Move more client specific tests from Broker-J sources

Repository: qpid-jms-amqp-0-x
Updated Branches:
  refs/heads/master bd4590471 -> 139f1c9f2


QPID-8074: [JMS AMQP 0-x][System Tests] Move more client specific tests from Broker-J sources


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/139f1c9f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/139f1c9f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/139f1c9f

Branch: refs/heads/master
Commit: 139f1c9f23681c2cf1e155b809a6bfb4f547170c
Parents: bd45904
Author: Keith Wall <kw...@apache.org>
Authored: Sun Jan 14 12:08:46 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Sun Jan 14 12:11:20 2018 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/test/utils/TCPTunneler.java | 609 -------------------
 .../org/apache/qpid/test/utils/TCPTunneler.java | 609 +++++++++++++++++++
 .../connection/ConnectionFactoryTest.java       | 132 ++++
 .../connection/ExceptionListenerTest.java       | 174 ++++++
 .../qpid/systest/connection/HeartbeatTest.java  | 342 +++++++++++
 5 files changed, 1257 insertions(+), 609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/client/src/test/java/org/apache/qpid/test/utils/TCPTunneler.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/test/utils/TCPTunneler.java b/client/src/test/java/org/apache/qpid/test/utils/TCPTunneler.java
deleted file mode 100644
index 5e1ddad..0000000
--- a/client/src/test/java/org/apache/qpid/test/utils/TCPTunneler.java
+++ /dev/null
@@ -1,609 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.utils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A basic implementation of TCP traffic forwarder between ports.
- * It is intended to use in tests.
- */
-public class TCPTunneler implements AutoCloseable
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(TCPTunneler.class);
-
-    private final TCPWorker _tcpWorker;
-    private final ExecutorService _executor;
-
-    public TCPTunneler(final int localPort, final String remoteHost,
-                       final int remotePort,
-                       final int numberOfConcurrentClients)
-    {
-        _executor = Executors.newFixedThreadPool(numberOfConcurrentClients * 2 + 1);
-        _tcpWorker = new TCPWorker(localPort, remoteHost, remotePort, _executor);
-    }
-
-    public void start() throws IOException
-    {
-        _tcpWorker.start();
-    }
-
-    public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
-    {
-        _tcpWorker.stopClientToServerForwarding(clientAddress);
-    }
-
-    public void stop()
-    {
-        try
-        {
-            _tcpWorker.stop();
-        }
-        finally
-        {
-            _executor.shutdown();
-        }
-    }
-
-    public void addClientListener(TunnelListener listener)
-    {
-        _tcpWorker.addClientListener(listener);
-    }
-
-    public void removeClientListener(TunnelListener listener)
-    {
-        _tcpWorker.removeClientListener(listener);
-    }
-
-    public void disconnect(InetSocketAddress address)
-    {
-        LOGGER.info("Disconnecting {}", address);
-        if (address != null)
-        {
-            _tcpWorker.disconnect(address);
-        }
-    }
-
-    public int getLocalPort()
-    {
-        return _tcpWorker.getLocalPort();
-    }
-
-    @Override
-    public void close() throws Exception
-    {
-        stop();
-    }
-
-    public interface TunnelListener
-    {
-        void clientConnected(InetSocketAddress clientAddress);
-
-        void clientDisconnected(InetSocketAddress clientAddress);
-    }
-
-    private static class TCPWorker implements Runnable
-    {
-        private final String _remoteHost;
-        private final int _remotePort;
-        private final int _localPort;
-        private final String _remoteHostPort;
-        private final AtomicBoolean _closed;
-        private final Collection<SocketTunnel> _tunnels;
-        private final Collection<TunnelListener> _tunnelListeners;
-        private final TunnelListener _notifyingListener;
-        private volatile ServerSocket _serverSocket;
-        private volatile ExecutorService _executor;
-        private int _actualLocalPort;
-
-        public TCPWorker(final int localPort,
-                         final String remoteHost,
-                         final int remotePort,
-                         final ExecutorService executor)
-        {
-            _closed = new AtomicBoolean();
-            _remoteHost = remoteHost;
-            _remotePort = remotePort;
-            _localPort = localPort;
-            _remoteHostPort = _remoteHost + ":" + _remotePort;
-            _executor = executor;
-            _tunnels = new CopyOnWriteArrayList<>();
-            _tunnelListeners = new CopyOnWriteArrayList<>();
-            _notifyingListener = new TunnelListener()
-            {
-                @Override
-                public void clientConnected(final InetSocketAddress clientAddress)
-                {
-                    notifyClientConnected(clientAddress);
-                }
-
-                @Override
-                public void clientDisconnected(final InetSocketAddress clientAddress)
-                {
-                    try
-                    {
-                        notifyClientDisconnected(clientAddress);
-                    }
-                    finally
-                    {
-                        removeTunnel(clientAddress);
-                    }
-                }
-            };
-        }
-
-        @Override
-        public void run()
-        {
-            String threadName = Thread.currentThread().getName();
-            try
-            {
-                Thread.currentThread().setName("TCPTunnelerAcceptingThread");
-                while (!_closed.get())
-                {
-                    Socket acceptedSocket = _serverSocket.accept();
-                    LOGGER.debug("Client opened socket {}", acceptedSocket);
-
-                    createTunnel(acceptedSocket);
-                }
-            }
-            catch (IOException e)
-            {
-                if (!_closed.get())
-                {
-                    LOGGER.error("Exception in accepting thread", e);
-                }
-            }
-            finally
-            {
-                closeServerSocket();
-                _closed.set(true);
-                Thread.currentThread().setName(threadName);
-            }
-        }
-
-        public void start()
-        {
-            _actualLocalPort = _localPort;
-            try
-            {
-                _serverSocket = new ServerSocket(_localPort);
-                _actualLocalPort = _serverSocket.getLocalPort();
-                LOGGER.info                                  ("Starting TCPTunneler forwarding from port {} to {}",
-                            _actualLocalPort, _remoteHostPort);
-                _serverSocket.setReuseAddress(true);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException("Cannot start TCPTunneler on port " + _actualLocalPort, e);
-            }
-
-            if (_serverSocket != null)
-            {
-                try
-                {
-                    _executor.execute(this);
-                }
-                catch (Exception e)
-                {
-                    try
-                    {
-                        closeServerSocket();
-                    }
-                    finally
-                    {
-                        throw new RuntimeException("Cannot start acceptor thread for TCPTunneler on port " + _actualLocalPort,
-                                                   e);
-                    }
-                }
-            }
-        }
-
-        public void stop()
-        {
-            if (_closed.compareAndSet(false, true))
-            {
-                LOGGER.info("Stopping TCPTunneler forwarding from port {} to {}",
-                            _actualLocalPort,
-                            _remoteHostPort);
-                try
-                {
-                    for (SocketTunnel tunnel : _tunnels)
-                    {
-                        tunnel.close();
-                    }
-                }
-                finally
-                {
-                    closeServerSocket();
-                }
-
-                LOGGER.info("TCPTunneler forwarding from port {} to {} is stopped",
-                            _actualLocalPort,
-                            _remoteHostPort);
-            }
-        }
-
-        public void addClientListener(TunnelListener listener)
-        {
-            _tunnelListeners.add(listener);
-            for (SocketTunnel socketTunnel : _tunnels)
-            {
-                try
-                {
-                    listener.clientConnected(socketTunnel.getClientAddress());
-                }
-                catch (Exception e)
-                {
-                    LOGGER.warn("Exception on notifying client listener about connected client", e);
-                }
-            }
-        }
-
-        public void removeClientListener(TunnelListener listener)
-        {
-            _tunnelListeners.remove(listener);
-        }
-
-        public void disconnect(final InetSocketAddress address)
-        {
-            SocketTunnel client = removeTunnel(address);
-            if (client != null && !client.isClosed())
-            {
-                client.close();
-                LOGGER.info("Tunnel for {} is disconnected", address);
-            }
-            else
-            {
-                LOGGER.info("Tunnel for {} not found", address);
-            }
-        }
-
-
-        private void createTunnel(final Socket localSocket)
-        {
-            Socket remoteSocket = null;
-            try
-            {
-                LOGGER.debug("Opening socket to {} for {}", _remoteHostPort, localSocket);
-                remoteSocket = new Socket(_remoteHost, _remotePort);
-                LOGGER.debug("Opened socket to {} for {}", remoteSocket, localSocket);
-                SocketTunnel tunnel = new SocketTunnel(localSocket, remoteSocket, _notifyingListener);
-                LOGGER.debug("Socket tunnel is created from {} to {}", localSocket, remoteSocket);
-                _tunnels.add(tunnel);
-                tunnel.start(_executor);
-            }
-            catch (Exception e)
-            {
-                LOGGER.error("Cannot forward i/o traffic between {} and {}", localSocket, _remoteHostPort, e);
-                SocketTunnel.closeSocket(localSocket);
-                SocketTunnel.closeSocket(remoteSocket);
-            }
-        }
-
-        private void notifyClientConnected(final InetSocketAddress clientAddress)
-        {
-            for (TunnelListener listener : _tunnelListeners)
-            {
-                try
-                {
-                    listener.clientConnected(clientAddress);
-                }
-                catch (Exception e)
-                {
-                    LOGGER.warn("Exception on notifying client listener about connected client", e);
-                }
-            }
-        }
-
-
-        private void notifyClientDisconnected(final InetSocketAddress clientAddress)
-        {
-            for (TunnelListener listener : _tunnelListeners)
-            {
-                try
-                {
-                    listener.clientDisconnected(clientAddress);
-                }
-                catch (Exception e)
-                {
-                    LOGGER.warn("Exception on notifying client listener about disconnected client", e);
-                }
-            }
-        }
-
-        public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
-        {
-            SocketTunnel target = null;
-            for (SocketTunnel tunnel : _tunnels)
-            {
-                if (tunnel.getClientAddress().equals(clientAddress))
-                {
-                    target = tunnel;
-                    break;
-                }
-            }
-            if (target != null)
-            {
-                LOGGER.debug("Stopping forwarding from client {} to server", clientAddress);
-                target.stopClientToServerForwarding();
-            }
-            else
-            {
-                throw new IllegalArgumentException("Could not find tunnel for address " + clientAddress);
-            }
-        }
-
-        private void closeServerSocket()
-        {
-            if (_serverSocket != null)
-            {
-                try
-                {
-                    _serverSocket.close();
-                }
-                catch (IOException e)
-                {
-                    LOGGER.warn("Exception on closing of accepting socket", e);
-                }
-                finally
-                {
-                    _serverSocket = null;
-                }
-            }
-        }
-
-        private SocketTunnel removeTunnel(final InetSocketAddress clientAddress)
-        {
-            SocketTunnel client = null;
-            for (SocketTunnel c : _tunnels)
-            {
-                if (c.isClientAddress(clientAddress))
-                {
-                    client = c;
-                    break;
-                }
-            }
-            if (client != null)
-            {
-                _tunnels.remove(client);
-            }
-            return client;
-        }
-
-        public int getLocalPort()
-        {
-            if (_serverSocket == null)
-            {
-                return -1;
-            }
-            return _serverSocket.getLocalPort();
-        }
-    }
-
-    private static class SocketTunnel
-    {
-        private final Socket _clientSocket;
-        private final Socket _serverSocket;
-        private final TunnelListener _tunnelListener;
-        private final AtomicBoolean _closed;
-        private final AutoClosingStreamForwarder _clientToServer;
-        private final AutoClosingStreamForwarder _serverToClient;
-        private final InetSocketAddress _clientSocketAddress;
-
-        public SocketTunnel(final Socket clientSocket,
-                            final Socket serverSocket,
-                            final TunnelListener tunnelListener) throws IOException
-        {
-            _clientSocket = clientSocket;
-            _clientSocketAddress =
-                    new InetSocketAddress(clientSocket.getInetAddress().getHostName(), _clientSocket.getPort());
-            _serverSocket = serverSocket;
-            _closed = new AtomicBoolean();
-            _tunnelListener = tunnelListener;
-            _clientSocket.setKeepAlive(true);
-            _serverSocket.setKeepAlive(true);
-            _clientToServer = new AutoClosingStreamForwarder(new StreamForwarder(_clientSocket, _serverSocket));
-            _serverToClient = new AutoClosingStreamForwarder(new StreamForwarder(_serverSocket, _clientSocket));
-        }
-
-        public void close()
-        {
-            if (_closed.compareAndSet(false, true))
-            {
-                try
-                {
-                    closeSocket(_serverSocket);
-                    closeSocket(_clientSocket);
-                }
-                finally
-                {
-                    _tunnelListener.clientDisconnected(getClientAddress());
-                }
-            }
-        }
-
-        public void start(Executor executor) throws IOException
-        {
-            executor.execute(_clientToServer);
-            executor.execute(_serverToClient);
-            _tunnelListener.clientConnected(getClientAddress());
-        }
-
-        public void stopClientToServerForwarding()
-        {
-            _clientToServer.stopForwarding();
-        }
-
-        public boolean isClosed()
-        {
-            return _closed.get();
-        }
-
-        public boolean isClientAddress(final InetSocketAddress clientAddress)
-        {
-            return getClientAddress().equals(clientAddress);
-        }
-
-        public InetSocketAddress getClientAddress()
-        {
-            return _clientSocketAddress;
-        }
-
-        private static void closeSocket(Socket socket)
-        {
-            if (socket != null)
-            {
-                try
-                {
-                    socket.close();
-                }
-                catch (IOException e)
-                {
-                    LOGGER.warn("Exception on closing of socket {}", socket, e);
-                }
-            }
-        }
-
-
-        private class AutoClosingStreamForwarder implements Runnable
-        {
-            private StreamForwarder _streamForwarder;
-
-            public AutoClosingStreamForwarder(StreamForwarder streamForwarder)
-            {
-                _streamForwarder = streamForwarder;
-            }
-
-            @Override
-            public void run()
-            {
-                Thread currentThread = Thread.currentThread();
-                String originalThreadName = currentThread.getName();
-                try
-                {
-                    currentThread.setName(_streamForwarder.getName());
-                    _streamForwarder.run();
-                }
-                finally
-                {
-                    close();
-                    currentThread.setName(originalThreadName);
-                }
-            }
-
-            public void stopForwarding()
-            {
-                _streamForwarder.stopForwarding();
-            }
-        }
-    }
-
-    private static class StreamForwarder implements Runnable
-    {
-        private static final int BUFFER_SIZE = 4096;
-
-        private final InputStream _inputStream;
-        private final OutputStream _outputStream;
-        private final String _name;
-        private AtomicBoolean _stopForwarding = new AtomicBoolean();
-
-        public StreamForwarder(Socket input, Socket output) throws IOException
-        {
-            _inputStream = input.getInputStream();
-            _outputStream = output.getOutputStream();
-            _name = "Forwarder-" + input.getLocalSocketAddress() + "->" + output.getRemoteSocketAddress();
-        }
-
-        @Override
-        public void run()
-        {
-            byte[] buffer = new byte[BUFFER_SIZE];
-            int bytesRead;
-            try
-            {
-                while ((bytesRead = _inputStream.read(buffer)) != -1)
-                {
-                    if (!_stopForwarding.get())
-                    {
-                        _outputStream.write(buffer, 0, bytesRead);
-                        _outputStream.flush();
-                        LOGGER.debug("Forwarded {} byte(s)", bytesRead);
-                    }
-                    else
-                    {
-                        LOGGER.debug("Discarded {} byte(s)", bytesRead);
-                    }
-                }
-            }
-            catch (IOException e)
-            {
-                LOGGER.warn("Exception on forwarding data for {}: {}", _name, e.getMessage());
-            }
-            finally
-            {
-                try
-                {
-                    _inputStream.close();
-                }
-                catch (IOException e)
-                {
-                    // ignore
-                }
-
-                try
-                {
-                    _outputStream.close();
-                }
-                catch (IOException e)
-                {
-                    // ignore
-                }
-            }
-        }
-
-
-        public String getName()
-        {
-            return _name;
-        }
-
-        public void stopForwarding()
-        {
-            _stopForwarding.set(true);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/systests/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java b/systests/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
new file mode 100644
index 0000000..5e1ddad
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
@@ -0,0 +1,609 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic implementation of TCP traffic forwarder between ports.
+ * It is intended to use in tests.
+ */
+public class TCPTunneler implements AutoCloseable
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(TCPTunneler.class);
+
+    private final TCPWorker _tcpWorker;
+    private final ExecutorService _executor;
+
+    public TCPTunneler(final int localPort, final String remoteHost,
+                       final int remotePort,
+                       final int numberOfConcurrentClients)
+    {
+        _executor = Executors.newFixedThreadPool(numberOfConcurrentClients * 2 + 1);
+        _tcpWorker = new TCPWorker(localPort, remoteHost, remotePort, _executor);
+    }
+
+    public void start() throws IOException
+    {
+        _tcpWorker.start();
+    }
+
+    public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
+    {
+        _tcpWorker.stopClientToServerForwarding(clientAddress);
+    }
+
+    public void stop()
+    {
+        try
+        {
+            _tcpWorker.stop();
+        }
+        finally
+        {
+            _executor.shutdown();
+        }
+    }
+
+    public void addClientListener(TunnelListener listener)
+    {
+        _tcpWorker.addClientListener(listener);
+    }
+
+    public void removeClientListener(TunnelListener listener)
+    {
+        _tcpWorker.removeClientListener(listener);
+    }
+
+    public void disconnect(InetSocketAddress address)
+    {
+        LOGGER.info("Disconnecting {}", address);
+        if (address != null)
+        {
+            _tcpWorker.disconnect(address);
+        }
+    }
+
+    public int getLocalPort()
+    {
+        return _tcpWorker.getLocalPort();
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+        stop();
+    }
+
+    public interface TunnelListener
+    {
+        void clientConnected(InetSocketAddress clientAddress);
+
+        void clientDisconnected(InetSocketAddress clientAddress);
+    }
+
+    private static class TCPWorker implements Runnable
+    {
+        private final String _remoteHost;
+        private final int _remotePort;
+        private final int _localPort;
+        private final String _remoteHostPort;
+        private final AtomicBoolean _closed;
+        private final Collection<SocketTunnel> _tunnels;
+        private final Collection<TunnelListener> _tunnelListeners;
+        private final TunnelListener _notifyingListener;
+        private volatile ServerSocket _serverSocket;
+        private volatile ExecutorService _executor;
+        private int _actualLocalPort;
+
+        public TCPWorker(final int localPort,
+                         final String remoteHost,
+                         final int remotePort,
+                         final ExecutorService executor)
+        {
+            _closed = new AtomicBoolean();
+            _remoteHost = remoteHost;
+            _remotePort = remotePort;
+            _localPort = localPort;
+            _remoteHostPort = _remoteHost + ":" + _remotePort;
+            _executor = executor;
+            _tunnels = new CopyOnWriteArrayList<>();
+            _tunnelListeners = new CopyOnWriteArrayList<>();
+            _notifyingListener = new TunnelListener()
+            {
+                @Override
+                public void clientConnected(final InetSocketAddress clientAddress)
+                {
+                    notifyClientConnected(clientAddress);
+                }
+
+                @Override
+                public void clientDisconnected(final InetSocketAddress clientAddress)
+                {
+                    try
+                    {
+                        notifyClientDisconnected(clientAddress);
+                    }
+                    finally
+                    {
+                        removeTunnel(clientAddress);
+                    }
+                }
+            };
+        }
+
+        @Override
+        public void run()
+        {
+            String threadName = Thread.currentThread().getName();
+            try
+            {
+                Thread.currentThread().setName("TCPTunnelerAcceptingThread");
+                while (!_closed.get())
+                {
+                    Socket acceptedSocket = _serverSocket.accept();
+                    LOGGER.debug("Client opened socket {}", acceptedSocket);
+
+                    createTunnel(acceptedSocket);
+                }
+            }
+            catch (IOException e)
+            {
+                if (!_closed.get())
+                {
+                    LOGGER.error("Exception in accepting thread", e);
+                }
+            }
+            finally
+            {
+                closeServerSocket();
+                _closed.set(true);
+                Thread.currentThread().setName(threadName);
+            }
+        }
+
+        public void start()
+        {
+            _actualLocalPort = _localPort;
+            try
+            {
+                _serverSocket = new ServerSocket(_localPort);
+                _actualLocalPort = _serverSocket.getLocalPort();
+                LOGGER.info                                  ("Starting TCPTunneler forwarding from port {} to {}",
+                            _actualLocalPort, _remoteHostPort);
+                _serverSocket.setReuseAddress(true);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException("Cannot start TCPTunneler on port " + _actualLocalPort, e);
+            }
+
+            if (_serverSocket != null)
+            {
+                try
+                {
+                    _executor.execute(this);
+                }
+                catch (Exception e)
+                {
+                    try
+                    {
+                        closeServerSocket();
+                    }
+                    finally
+                    {
+                        throw new RuntimeException("Cannot start acceptor thread for TCPTunneler on port " + _actualLocalPort,
+                                                   e);
+                    }
+                }
+            }
+        }
+
+        public void stop()
+        {
+            if (_closed.compareAndSet(false, true))
+            {
+                LOGGER.info("Stopping TCPTunneler forwarding from port {} to {}",
+                            _actualLocalPort,
+                            _remoteHostPort);
+                try
+                {
+                    for (SocketTunnel tunnel : _tunnels)
+                    {
+                        tunnel.close();
+                    }
+                }
+                finally
+                {
+                    closeServerSocket();
+                }
+
+                LOGGER.info("TCPTunneler forwarding from port {} to {} is stopped",
+                            _actualLocalPort,
+                            _remoteHostPort);
+            }
+        }
+
+        public void addClientListener(TunnelListener listener)
+        {
+            _tunnelListeners.add(listener);
+            for (SocketTunnel socketTunnel : _tunnels)
+            {
+                try
+                {
+                    listener.clientConnected(socketTunnel.getClientAddress());
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Exception on notifying client listener about connected client", e);
+                }
+            }
+        }
+
+        public void removeClientListener(TunnelListener listener)
+        {
+            _tunnelListeners.remove(listener);
+        }
+
+        public void disconnect(final InetSocketAddress address)
+        {
+            SocketTunnel client = removeTunnel(address);
+            if (client != null && !client.isClosed())
+            {
+                client.close();
+                LOGGER.info("Tunnel for {} is disconnected", address);
+            }
+            else
+            {
+                LOGGER.info("Tunnel for {} not found", address);
+            }
+        }
+
+
+        private void createTunnel(final Socket localSocket)
+        {
+            Socket remoteSocket = null;
+            try
+            {
+                LOGGER.debug("Opening socket to {} for {}", _remoteHostPort, localSocket);
+                remoteSocket = new Socket(_remoteHost, _remotePort);
+                LOGGER.debug("Opened socket to {} for {}", remoteSocket, localSocket);
+                SocketTunnel tunnel = new SocketTunnel(localSocket, remoteSocket, _notifyingListener);
+                LOGGER.debug("Socket tunnel is created from {} to {}", localSocket, remoteSocket);
+                _tunnels.add(tunnel);
+                tunnel.start(_executor);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Cannot forward i/o traffic between {} and {}", localSocket, _remoteHostPort, e);
+                SocketTunnel.closeSocket(localSocket);
+                SocketTunnel.closeSocket(remoteSocket);
+            }
+        }
+
+        private void notifyClientConnected(final InetSocketAddress clientAddress)
+        {
+            for (TunnelListener listener : _tunnelListeners)
+            {
+                try
+                {
+                    listener.clientConnected(clientAddress);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Exception on notifying client listener about connected client", e);
+                }
+            }
+        }
+
+
+        private void notifyClientDisconnected(final InetSocketAddress clientAddress)
+        {
+            for (TunnelListener listener : _tunnelListeners)
+            {
+                try
+                {
+                    listener.clientDisconnected(clientAddress);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.warn("Exception on notifying client listener about disconnected client", e);
+                }
+            }
+        }
+
+        public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
+        {
+            SocketTunnel target = null;
+            for (SocketTunnel tunnel : _tunnels)
+            {
+                if (tunnel.getClientAddress().equals(clientAddress))
+                {
+                    target = tunnel;
+                    break;
+                }
+            }
+            if (target != null)
+            {
+                LOGGER.debug("Stopping forwarding from client {} to server", clientAddress);
+                target.stopClientToServerForwarding();
+            }
+            else
+            {
+                throw new IllegalArgumentException("Could not find tunnel for address " + clientAddress);
+            }
+        }
+
+        private void closeServerSocket()
+        {
+            if (_serverSocket != null)
+            {
+                try
+                {
+                    _serverSocket.close();
+                }
+                catch (IOException e)
+                {
+                    LOGGER.warn("Exception on closing of accepting socket", e);
+                }
+                finally
+                {
+                    _serverSocket = null;
+                }
+            }
+        }
+
+        private SocketTunnel removeTunnel(final InetSocketAddress clientAddress)
+        {
+            SocketTunnel client = null;
+            for (SocketTunnel c : _tunnels)
+            {
+                if (c.isClientAddress(clientAddress))
+                {
+                    client = c;
+                    break;
+                }
+            }
+            if (client != null)
+            {
+                _tunnels.remove(client);
+            }
+            return client;
+        }
+
+        public int getLocalPort()
+        {
+            if (_serverSocket == null)
+            {
+                return -1;
+            }
+            return _serverSocket.getLocalPort();
+        }
+    }
+
+    private static class SocketTunnel
+    {
+        private final Socket _clientSocket;
+        private final Socket _serverSocket;
+        private final TunnelListener _tunnelListener;
+        private final AtomicBoolean _closed;
+        private final AutoClosingStreamForwarder _clientToServer;
+        private final AutoClosingStreamForwarder _serverToClient;
+        private final InetSocketAddress _clientSocketAddress;
+
+        public SocketTunnel(final Socket clientSocket,
+                            final Socket serverSocket,
+                            final TunnelListener tunnelListener) throws IOException
+        {
+            _clientSocket = clientSocket;
+            _clientSocketAddress =
+                    new InetSocketAddress(clientSocket.getInetAddress().getHostName(), _clientSocket.getPort());
+            _serverSocket = serverSocket;
+            _closed = new AtomicBoolean();
+            _tunnelListener = tunnelListener;
+            _clientSocket.setKeepAlive(true);
+            _serverSocket.setKeepAlive(true);
+            _clientToServer = new AutoClosingStreamForwarder(new StreamForwarder(_clientSocket, _serverSocket));
+            _serverToClient = new AutoClosingStreamForwarder(new StreamForwarder(_serverSocket, _clientSocket));
+        }
+
+        public void close()
+        {
+            if (_closed.compareAndSet(false, true))
+            {
+                try
+                {
+                    closeSocket(_serverSocket);
+                    closeSocket(_clientSocket);
+                }
+                finally
+                {
+                    _tunnelListener.clientDisconnected(getClientAddress());
+                }
+            }
+        }
+
+        public void start(Executor executor) throws IOException
+        {
+            executor.execute(_clientToServer);
+            executor.execute(_serverToClient);
+            _tunnelListener.clientConnected(getClientAddress());
+        }
+
+        public void stopClientToServerForwarding()
+        {
+            _clientToServer.stopForwarding();
+        }
+
+        public boolean isClosed()
+        {
+            return _closed.get();
+        }
+
+        public boolean isClientAddress(final InetSocketAddress clientAddress)
+        {
+            return getClientAddress().equals(clientAddress);
+        }
+
+        public InetSocketAddress getClientAddress()
+        {
+            return _clientSocketAddress;
+        }
+
+        private static void closeSocket(Socket socket)
+        {
+            if (socket != null)
+            {
+                try
+                {
+                    socket.close();
+                }
+                catch (IOException e)
+                {
+                    LOGGER.warn("Exception on closing of socket {}", socket, e);
+                }
+            }
+        }
+
+
+        private class AutoClosingStreamForwarder implements Runnable
+        {
+            private StreamForwarder _streamForwarder;
+
+            public AutoClosingStreamForwarder(StreamForwarder streamForwarder)
+            {
+                _streamForwarder = streamForwarder;
+            }
+
+            @Override
+            public void run()
+            {
+                Thread currentThread = Thread.currentThread();
+                String originalThreadName = currentThread.getName();
+                try
+                {
+                    currentThread.setName(_streamForwarder.getName());
+                    _streamForwarder.run();
+                }
+                finally
+                {
+                    close();
+                    currentThread.setName(originalThreadName);
+                }
+            }
+
+            public void stopForwarding()
+            {
+                _streamForwarder.stopForwarding();
+            }
+        }
+    }
+
+    private static class StreamForwarder implements Runnable
+    {
+        private static final int BUFFER_SIZE = 4096;
+
+        private final InputStream _inputStream;
+        private final OutputStream _outputStream;
+        private final String _name;
+        private AtomicBoolean _stopForwarding = new AtomicBoolean();
+
+        public StreamForwarder(Socket input, Socket output) throws IOException
+        {
+            _inputStream = input.getInputStream();
+            _outputStream = output.getOutputStream();
+            _name = "Forwarder-" + input.getLocalSocketAddress() + "->" + output.getRemoteSocketAddress();
+        }
+
+        @Override
+        public void run()
+        {
+            byte[] buffer = new byte[BUFFER_SIZE];
+            int bytesRead;
+            try
+            {
+                while ((bytesRead = _inputStream.read(buffer)) != -1)
+                {
+                    if (!_stopForwarding.get())
+                    {
+                        _outputStream.write(buffer, 0, bytesRead);
+                        _outputStream.flush();
+                        LOGGER.debug("Forwarded {} byte(s)", bytesRead);
+                    }
+                    else
+                    {
+                        LOGGER.debug("Discarded {} byte(s)", bytesRead);
+                    }
+                }
+            }
+            catch (IOException e)
+            {
+                LOGGER.warn("Exception on forwarding data for {}: {}", _name, e.getMessage());
+            }
+            finally
+            {
+                try
+                {
+                    _inputStream.close();
+                }
+                catch (IOException e)
+                {
+                    // ignore
+                }
+
+                try
+                {
+                    _outputStream.close();
+                }
+                catch (IOException e)
+                {
+                    // ignore
+                }
+            }
+        }
+
+
+        public String getName()
+        {
+            return _name;
+        }
+
+        public void stopForwarding()
+        {
+            _stopForwarding.set(true);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java
new file mode 100644
index 0000000..c9eb034
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.connection;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.InetSocketAddress;
+
+import javax.jms.Connection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+
+public class ConnectionFactoryTest extends JmsTestBase
+{
+    private static final String CONNECTION_URL = "amqp://guest:guest@clientID/?brokerlist='tcp://%s:%d'";
+    private String _url;
+
+    @Before
+    public void setUp()
+    {
+        final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        _url = String.format(CONNECTION_URL,
+                             brokerAddress.getHostString(),
+                             brokerAddress.getPort());
+    }
+    /**
+     * The username & password specified should not override the default
+     * specified in the URL.
+     */
+    @Test
+    public void testCreateConnectionWithUsernamePassword() throws Exception
+    {
+        AMQConnectionFactory factory = new AMQConnectionFactory(_url);
+
+        AMQConnection con = null;
+        try
+        {
+            con = factory.createConnection();
+            assertEquals("Usernames used is different from the one in URL","guest",con.getConnectionURL().getUsername());
+            assertEquals("Password used is different from the one in URL","guest",con.getConnectionURL().getPassword());
+        }
+        finally
+        {
+            if (con != null)
+            {
+                con.close();
+            }
+        }
+
+        AMQConnection con2 = null;
+        try
+        {
+            con2 = factory.createConnection("user", "pass");
+            assertEquals("Usernames used is different from the one in URL","user",con2.getConnectionURL().getUsername());
+            assertEquals("Password used is different from the one in URL","pass",con2.getConnectionURL().getPassword());
+        }
+        catch(Exception e)
+        {
+            // ignore
+        }
+        finally
+        {
+            if (con2 != null)
+            {
+                con2.close();
+            }
+        }
+
+        AMQConnection con3 = null;
+        try
+        {
+            con3 = factory.createConnection();
+            assertEquals("Usernames used is different from the one in URL","guest",con3.getConnectionURL().getUsername());
+            assertEquals("Password used is different from the one in URL","guest",con3.getConnectionURL().getPassword());
+        }
+        finally
+        {
+            if (con3 != null)
+            {
+                con3.close();
+            }
+        }
+    }
+
+    /**
+     * Verifies that a connection can be made using an instance of AMQConnectionFactory created with the
+     * default constructor and provided with the connection url via setter.
+     */
+    @Test
+    public void testCreatingConnectionWithInstanceMadeUsingDefaultConstructor() throws Exception
+    {
+        AMQConnectionFactory factory = new AMQConnectionFactory();
+        factory.setConnectionURLString(_url);
+
+        Connection con = null;
+        try
+        {
+            con = factory.createConnection();
+        }
+        finally
+        {
+            if (con != null)
+            {
+                con.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/systests/src/test/java/org/apache/qpid/systest/connection/ExceptionListenerTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/ExceptionListenerTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/ExceptionListenerTest.java
new file mode 100644
index 0000000..d913b8f
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/ExceptionListenerTest.java
@@ -0,0 +1,174 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.systest.connection;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.core.util.Utils;
+
+public class ExceptionListenerTest extends JmsTestBase
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionListenerTest.class);
+
+    /**
+     * This test reproduces a deadlock that was the subject of a support call. A Spring based
+     * application was using SingleConnectionFactory.  It installed an ExceptionListener that
+     * stops and closes the connection in response to any exception.  On receipt of a message
+     * the application would create a new session then send a response message (within onMessage).
+     * It appears that a misconfiguration in the application meant that some of these messages
+     * were bounced (no-route). Bounces are treated like connection exceptions and are passed
+     * back to the application via the ExceptionListener.  The deadlock occurred between the
+     * ExceptionListener's call to stop() and the MessageListener's attempt to create a new
+     * session.
+     */
+    @Test
+    public void testExceptionListenerConnectionStopDeadlock() throws  Exception
+    {
+        assumeThat("Feature 'close on no route' is only implemented Broker-J",
+                   getBrokerAdmin().getBrokerType(),
+                   is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
+        assumeThat("Feature 'close on no route' is only implemented for AMQP 0-8..0-9-1",
+                   getProtocol(),
+                   is(not(equalTo("0-10"))));
+
+        Map<String, String> options = new HashMap<>();
+        options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(false));
+
+        final Connection connection = getConnection(options);
+
+        try
+        {
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue messageQueue = session.createQueue(getTestQueueName());
+            session.createConsumer(messageQueue).close(); // Create queue by side-effect
+
+            // Put 10 messages onto messageQueue
+            Utils.sendMessages(session, messageQueue, 10);
+            // Install an exception listener that stops/closes the connection on receipt of 2nd AMQNoRouteException.
+            // (Triggering on the 2nd (rather than 1st) seems to increase the probability that the test ends in deadlock,
+            // at least on my machine).
+            final CountDownLatch exceptionReceivedLatch = new CountDownLatch(2);
+            final AtomicBoolean doneClosed = new AtomicBoolean();
+            final CountDownLatch connectionClosedAttemptLatch = new CountDownLatch(1);
+            final AtomicReference<Exception> connectionCloseException = new AtomicReference<>();
+            final ExceptionListener listener = new ExceptionListener()
+            {
+                @Override
+                public void onException(JMSException exception)
+                {
+                    exceptionReceivedLatch.countDown();
+                    if (exceptionReceivedLatch.getCount() == 0)
+                    {
+                        try
+                        {
+                            if (doneClosed.compareAndSet(false, true))
+                            {
+                                connection.stop();
+                                connection.close();
+                            }
+                        }
+                        catch (Exception e)
+                        {
+                            // We expect no exception to be caught
+                            connectionCloseException.set(e);
+                        }
+                        finally
+                        {
+                            connectionClosedAttemptLatch.countDown();
+                        }
+
+                    }
+                }
+            };
+            connection.setExceptionListener(listener);
+
+            // Create a message listener that receives from testQueue and tries to forward them to unknown queue (thus
+            // provoking AMQNoRouteException exceptions to be delivered to the ExceptionListener).
+            final Queue unknownQueue = session.createQueue(getTestQueueName() + "_unknown");
+            MessageListener redirectingMessageListener = new MessageListener()
+            {
+                @Override
+                public void onMessage(Message msg)
+                {
+                    try
+                    {
+                        Session mlSession = connection.createSession(true, Session.SESSION_TRANSACTED);  // ** Deadlock
+                        mlSession.createProducer(unknownQueue).send(msg);  // will cause async AMQNoRouteException;
+                        mlSession.commit();
+                    }
+                    catch (JMSException je)
+                    {
+                        // Connection is closed by the listener, so exceptions here are expected.
+                        LOGGER.debug("Expected exception - message listener got exception", je);
+                    }
+                }
+            };
+
+            MessageConsumer consumer = session.createConsumer(messageQueue);
+            consumer.setMessageListener(redirectingMessageListener);
+            connection.start();
+
+            // Await an exception
+            boolean exceptionReceived = exceptionReceivedLatch.await(10, TimeUnit.SECONDS);
+            assertTrue("Exception listener did not hear at least one exception within timeout", exceptionReceived);
+
+            // Await the connection listener to close the connection
+            boolean closeAttemptedReceived = connectionClosedAttemptLatch.await(10, TimeUnit.SECONDS);
+            assertTrue("Exception listener did not try to close the exception within timeout", closeAttemptedReceived);
+            assertNull("Exception listener should not have had experienced an exception : " + connectionCloseException.get(), connectionCloseException.get());
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/139f1c9f/systests/src/test/java/org/apache/qpid/systest/connection/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/HeartbeatTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/HeartbeatTest.java
new file mode 100644
index 0000000..2123d93
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/HeartbeatTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.connection;
+
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.test.utils.TCPTunneler;
+
+public class HeartbeatTest extends JmsTestBase
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatTest.class);
+    private static final String CONNECTION_URL_WITH_HEARTBEAT = "amqp://guest:guest@clientid/?brokerlist='localhost:%d?heartbeat='%d''";
+    private static final int MAXIMUM_WAIT_TIME = 2900;
+    private final TestListener _listener = new TestListener("listener", 2, 2);
+
+    @Test
+    public void testHeartbeatsEnabledUsingUrl() throws Exception
+    {
+        final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT, getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP).getPort(), 1);
+        AMQConnection conn = new AMQConnection(url);
+        try
+        {
+            setHeartbeatListener(conn, _listener);
+            conn.start();
+
+            _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+            assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
+            assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
+        }
+        finally
+        {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testHeartbeatsEnabledUsingSystemProperty() throws Exception
+    {
+        System.setProperty(QPID_HEARTBEAT_INTERVAL, "1");
+        AMQConnection conn = (AMQConnection) getConnection();
+        try
+        {
+            setHeartbeatListener(conn, _listener);
+            conn.start();
+
+            _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+            assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
+            assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
+        }
+        finally
+        {
+            System.clearProperty(QPID_HEARTBEAT_INTERVAL);
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testHeartbeatsDisabledUsingSystemProperty() throws Exception
+    {
+        System.setProperty(QPID_HEARTBEAT_INTERVAL, "0");
+        AMQConnection conn = (AMQConnection) getConnection();
+        try
+        {
+            setHeartbeatListener(conn, _listener);
+            conn.start();
+
+            _listener.awaitExpectedHeartbeats(2000);
+
+            assertEquals("Heartbeats unexpectedly received", 0, _listener.getHeartbeatsReceived());
+            assertEquals("Heartbeats unexpectedly sent ", 0, _listener.getHeartbeatsSent());
+        }
+        finally
+        {
+            System.clearProperty(QPID_HEARTBEAT_INTERVAL);
+            conn.close();
+        }
+
+    }
+
+    /**
+     * This test carefully arranges message flow so that bytes flow only from producer to broker
+     * on the producer side and broker to consumer on the consumer side, deliberately leaving the
+     * reverse path quiet so heartbeats will flow.
+     */
+    @Test
+    public void testUnidirectionalHeartbeating() throws Exception
+    {
+        System.setProperty(QPID_HEARTBEAT_INTERVAL,"1");
+        AMQConnection receiveConn = (AMQConnection) getConnection();
+        AMQConnection sendConn = (AMQConnection) getConnection();
+        try
+        {
+            TestListener receiveListener = new TestListener("receiverListener", 2, 0);
+            TestListener sendListener = new TestListener("senderListener", 0, 2);
+
+            Session receiveSession = receiveConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Session senderSession = sendConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Destination destination = receiveSession.createQueue(getTestQueueName());
+
+            MessageConsumer consumer = receiveSession.createConsumer(destination);
+            MessageProducer producer = senderSession.createProducer(destination);
+
+            setHeartbeatListener(receiveConn, receiveListener);
+            setHeartbeatListener(sendConn, sendListener);
+            receiveConn.start();
+
+            // Start the flow of messages to the consumer
+            consumer.receiveNoWait();
+
+            for(int i = 0; i < 5; i++)
+            {
+                producer.send(senderSession.createTextMessage("Msg " + i));
+                Thread.sleep(500);
+                assertNotNull("Expected to receive message within " + getReceiveTimeout() + "ms.", consumer.receive(getReceiveTimeout()));
+                // Consumer does not ack the message in order that no bytes flow from consumer connection back to Broker
+            }
+
+            assertTrue("Too few heartbeats sent "+ receiveListener.getHeartbeatsSent() +" (expected at least 2)", receiveListener.getHeartbeatsSent()>=2);
+            assertEquals("Unexpected number of heartbeats sent by the sender: ",0,sendListener.getHeartbeatsSent());
+
+            assertTrue("Too few heartbeats received at the sender "+ sendListener.getHeartbeatsReceived() +" (expected at least 2)", sendListener.getHeartbeatsReceived()>=2);
+            assertEquals("Unexpected number of heartbeats received by the receiver: ",0,receiveListener.getHeartbeatsReceived());
+        }
+        finally
+        {
+            System.clearProperty(QPID_HEARTBEAT_INTERVAL);
+            receiveConn.close();
+            sendConn.close();
+        }
+
+    }
+
+
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testHeartbeatsEnabledUsingAmqjLegacySystemProperty() throws Exception
+    {
+        System.setProperty("amqj.heartbeat.delay", "1");
+        AMQConnection conn = (AMQConnection) getConnection();
+        try
+        {
+            setHeartbeatListener(conn, _listener);
+            conn.start();
+
+            _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+            assertTrue("Too few heartbeats received: " + _listener.getHeartbeatsReceived() + " (expected at least 2)",
+                       _listener.getHeartbeatsReceived() >= 2);
+            assertTrue("Too few heartbeats sent " + _listener.getHeartbeatsSent() + " (expected at least 2)",
+                       _listener.getHeartbeatsSent() >= 2);
+        }
+        finally
+        {
+            System.clearProperty("amqj.heartbeat.delay");
+            conn.close();
+        }
+
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testHeartbeatsEnabledUsingOlderLegacySystemProperty() throws Exception
+    {
+        System.setProperty("idle_timeout", "1000");
+        AMQConnection conn = (AMQConnection) getConnection();
+        try
+        {
+            setHeartbeatListener(conn, _listener);
+            conn.start();
+
+            _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+            assertTrue("Too few heartbeats received: " + _listener.getHeartbeatsReceived() + " (expected at least 2)",
+                       _listener.getHeartbeatsReceived() >= 2);
+            assertTrue("Too few heartbeats sent " + _listener.getHeartbeatsSent() + " (expected at least 2)",
+                       _listener.getHeartbeatsSent() >= 2);
+        }
+        finally
+        {
+            System.clearProperty("idle_timeout");
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testClientStopsSendingHeartbeats_BrokerClosesConnection() throws Exception
+    {
+        try(TCPTunneler tcpTunneler = new TCPTunneler(0, "localhost", getBrokerAdmin().getBrokerAddress(
+                BrokerAdmin.PortType.AMQP).getPort(), 1))
+        {
+            tcpTunneler.start();
+
+            final AtomicReference<InetSocketAddress> clientAddressRef = new AtomicReference<>();
+            tcpTunneler.addClientListener(new TCPTunneler.TunnelListener()
+            {
+                @Override
+                public void clientConnected(final InetSocketAddress clientAddress)
+                {
+                    clientAddressRef.set(clientAddress);
+                }
+
+                @Override
+                public void clientDisconnected(final InetSocketAddress clientAddress)
+                {
+                }
+            });
+
+            final CountDownLatch exceptionLatch = new CountDownLatch(1);
+            final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT,  tcpTunneler.getLocalPort(), 1);
+            AMQConnection conn = new AMQConnection(url);
+            setHeartbeatListener(conn, _listener);
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                @Override
+                public void onException(final JMSException exception)
+                {
+                    LOGGER.debug("Exception listener got exception", exception);
+                    exceptionLatch.countDown();
+                }
+            });
+            conn.start();
+
+            assertNotNull(clientAddressRef.get());
+
+            _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+            assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
+            assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
+
+            tcpTunneler.stopClientToServerForwarding(clientAddressRef.get());
+
+            exceptionLatch.await(5, TimeUnit.SECONDS);
+            assertTrue("Connection should be disconnected within timeout", conn.isConnected());
+        }
+    }
+
+    private void setHeartbeatListener(final AMQConnection conn, final TestListener listener) throws Exception
+    {
+        final Method setHeartbeatListener = conn.getClass().getDeclaredMethod("setHeartbeatListener", HeartbeatListener.class);
+        setHeartbeatListener.setAccessible(true);
+        setHeartbeatListener.invoke(conn, listener);
+    }
+
+    private class TestListener implements HeartbeatListener
+    {
+
+        private final String _name;
+        private final AtomicInteger _heartbeatsReceived = new AtomicInteger(0);
+        private final AtomicInteger _heartbeatsSent = new AtomicInteger(0);
+        private final CountDownLatch _expectedReceivedHeartbeats;
+        private final CountDownLatch _expectedSentHeartbeats;
+
+        TestListener(String name, int expectedSentHeartbeats, int expectedReceivedHeartbeats)
+        {
+            _name = name;
+            _expectedReceivedHeartbeats = new CountDownLatch(expectedReceivedHeartbeats);
+            _expectedSentHeartbeats = new CountDownLatch(expectedSentHeartbeats);
+        }
+
+        @Override
+        public void heartbeatReceived()
+        {
+            LOGGER.debug(_name + " heartbeat received");
+            _heartbeatsReceived.incrementAndGet();
+            _expectedReceivedHeartbeats.countDown();
+        }
+
+        int getHeartbeatsReceived()
+        {
+            return _heartbeatsReceived.get();
+        }
+
+        @Override
+        public void heartbeatSent()
+        {
+            LOGGER.debug(_name + " heartbeat sent");
+            _heartbeatsSent.incrementAndGet();
+            _expectedSentHeartbeats.countDown();
+        }
+
+        int getHeartbeatsSent()
+        {
+            return _heartbeatsSent.get();
+        }
+
+        void awaitExpectedHeartbeats(final long maximumWaitTime) throws InterruptedException
+        {
+            long startTime = System.currentTimeMillis();
+            _expectedSentHeartbeats.await(maximumWaitTime, TimeUnit.MILLISECONDS);
+
+            long remainingTime = maximumWaitTime - (System.currentTimeMillis() - startTime);
+            if (remainingTime > 0)
+            {
+                _expectedReceivedHeartbeats.await(remainingTime, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org