You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/05 22:14:00 UTC

activemq-artemis git commit: NO-JIRA Adding a test playing with network disconnects and failover

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 0d9f5eb2c -> 05ce7c6ec


NO-JIRA Adding a test playing with network disconnects and failover


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/05ce7c6e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/05ce7c6e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/05ce7c6e

Branch: refs/heads/master
Commit: 05ce7c6ecd1c70fc571764af9027767f04538ccd
Parents: 0d9f5eb
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Sep 5 18:09:18 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 5 18:13:35 2018 -0400

----------------------------------------------------------------------
 .../artemis/tests/util/ActiveMQTestBase.java    |   5 +-
 .../failover/NetworkFailureFailoverTest.java    | 688 +++++++++++++++++++
 .../artemis/tests/util/network/NetUtil.java     | 226 ++++++
 .../tests/util/network/NetUtilResource.java     |  29 +
 4 files changed, 946 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/05ce7c6e/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 530c399..2cd5d56 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1219,7 +1219,7 @@ public abstract class ActiveMQTestBase extends Assert {
       return params;
    }
 
-   protected static final TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live) {
+   protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live) {
       if (live) {
          return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY);
       }
@@ -1231,7 +1231,7 @@ public abstract class ActiveMQTestBase extends Assert {
       return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
    }
 
-   protected static final TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live) {
+   protected TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live) {
       if (live) {
          return new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
       }
@@ -1239,6 +1239,7 @@ public abstract class ActiveMQTestBase extends Assert {
       Map<String, Object> server1Params = new HashMap<>();
 
       server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+      server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
       return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/05ce7c6e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
new file mode 100644
index 0000000..1011502
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
@@ -0,0 +1,688 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.client.impl.Topology;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.junit.Wait;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.util.network.NetUtil;
+import org.apache.activemq.artemis.tests.util.network.NetUtilResource;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class NetworkFailureFailoverTest extends FailoverTestBase {
+
+   @Rule
+   public NetUtilResource netUtilResource = new NetUtilResource();
+
+   @BeforeClass
+   public static void start() {
+      NetUtil.assumeSudo();
+   }
+
+   // 192.0.2.0 is reserved for documentation, so I'm pretty sure this won't exist on any system. (It shouldn't at least)
+   private static final String LIVE_IP = "192.0.2.0";
+
+   private int beforeTime;
+
+   @Override
+   public void setUp() throws Exception {
+      //      beforeTime = NettyConnection.getLockTimeout();
+      //      NettyConnection.setLockTimeout(1000);
+      NetUtil.netUp(LIVE_IP);
+      super.setUp();
+   }
+
+   @Override
+   public void tearDown() throws Exception {
+      super.tearDown();
+      //      NettyConnection.setLockTimeout(beforeTime);
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
+      return getNettyAcceptorTransportConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
+      return getNettyConnectorTransportConfiguration(live);
+   }
+
+   protected ClientSession createSession(ClientSessionFactory sf1,
+                                         boolean autoCommitSends,
+                                         boolean autoCommitAcks,
+                                         int ackBatchSize) throws Exception {
+      return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks, ackBatchSize));
+   }
+
+   protected ClientSession createSession(ClientSessionFactory sf1,
+                                         boolean autoCommitSends,
+                                         boolean autoCommitAcks) throws Exception {
+      return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks));
+   }
+
+   protected ClientSession createSession(ClientSessionFactory sf1) throws Exception {
+      return addClientSession(sf1.createSession());
+   }
+
+   protected ClientSession createSession(ClientSessionFactory sf1,
+                                         boolean xa,
+                                         boolean autoCommitSends,
+                                         boolean autoCommitAcks) throws Exception {
+      return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
+   }
+
+   @Override
+   protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live) {
+      Map<String, Object> server1Params = new HashMap<>();
+
+      if (live) {
+         server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+         server1Params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+      } else {
+         server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+         server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      }
+
+      return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
+   }
+
+   @Override
+   protected TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live) {
+      Map<String, Object> server1Params = new HashMap<>();
+
+      if (live) {
+         server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+         server1Params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+      } else {
+         server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+         server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      }
+
+      server1Params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+
+      return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
+   }
+
+   @Test
+   public void testFailoverAfterNetFailure() throws Exception {
+      final AtomicInteger sentMessages = new AtomicInteger(0);
+      final AtomicInteger blockedAt = new AtomicInteger(0);
+
+      Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+      params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+      TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+      final AtomicInteger countSent = new AtomicInteger(0);
+
+      liveServer.addInterceptor(new Interceptor() {
+         @Override
+         public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
+            //System.out.println("Received " + packet);
+            if (packet instanceof SessionSendMessage) {
+
+               if (countSent.incrementAndGet() == 500) {
+                  try {
+                     NetUtil.netDown(LIVE_IP);
+                     System.out.println("Blocking traffic");
+                     // Thread.sleep(3000); // this is important to let stuff to block
+                     liveServer.crash(true, false);
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+                  new Thread() {
+                     @Override
+                     public void run() {
+                        try {
+                           System.err.println("Stopping server");
+                        } catch (Exception e) {
+                           e.printStackTrace();
+                        }
+                     }
+                  }.start();
+               }
+            }
+            return true;
+         }
+      });
+
+      ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc));
+
+      locator.setBlockOnNonDurableSend(false);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(false);
+      locator.setReconnectAttempts(-1);
+      locator.setConfirmationWindowSize(-1);
+      locator.setProducerWindowSize(-1);
+      locator.setClientFailureCheckPeriod(100);
+      locator.setConnectionTTL(1000);
+      ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2);
+      sfProducer.addFailureListener(new SessionFailureListener() {
+         @Override
+         public void beforeReconnect(ActiveMQException exception) {
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+
+         }
+      });
+
+      ClientSession sessionProducer = createSession(sfProducer, true, true, 0);
+
+      sessionProducer.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      ClientProducer producer = sessionProducer.createProducer(FailoverTestBase.ADDRESS);
+
+      final int numMessages = 2001;
+      final CountDownLatch latchReceived = new CountDownLatch(numMessages);
+
+      ClientSessionFactoryInternal sfConsumer = createSessionFactoryAndWaitForTopology(locator, 2);
+
+      final ClientSession sessionConsumer = createSession(sfConsumer, true, true, 0);
+      final ClientConsumer consumer = sessionConsumer.createConsumer(FailoverTestBase.ADDRESS);
+
+      sessionConsumer.start();
+
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      final Thread t = new Thread() {
+         @Override
+         public void run() {
+            int received = 0;
+            int errors = 0;
+            while (running.get() && received < numMessages) {
+               try {
+                  ClientMessage msgReceived = consumer.receive(500);
+                  if (msgReceived != null) {
+                     latchReceived.countDown();
+                     msgReceived.acknowledge();
+                     if (received++ % 100 == 0) {
+                        System.out.println("Received " + received);
+                        sessionConsumer.commit();
+                     }
+                  } else {
+                     System.out.println("Null");
+                  }
+               } catch (Throwable e) {
+                  errors++;
+                  if (errors > 10) {
+                     break;
+                  }
+                  e.printStackTrace();
+               }
+            }
+         }
+      };
+
+      t.start();
+
+      for (sentMessages.set(0); sentMessages.get() < numMessages; sentMessages.incrementAndGet()) {
+         do {
+            try {
+               if (sentMessages.get() % 100 == 0) {
+                  System.out.println("Sent " + sentMessages.get());
+               }
+               producer.send(createMessage(sessionProducer, sentMessages.get(), true));
+               break;
+            } catch (Exception e) {
+               sentMessages.decrementAndGet();
+               new Exception("Exception on ending", e).printStackTrace();
+            }
+         }
+         while (true);
+      }
+
+      // these may never be received. doing the count down where we blocked.
+      for (int i = 0; i < blockedAt.get(); i++) {
+         latchReceived.countDown();
+      }
+
+      Assert.assertTrue(latchReceived.await(1, TimeUnit.MINUTES));
+
+      running.set(false);
+
+      t.join();
+   }
+
+
+   private int countTopologyMembers(Topology topology) {
+      int count = 0;
+      for (TopologyMember m : topology.getMembers()) {
+         count++;
+         if (m.getBackup() != null) {
+            count++;
+         }
+      }
+
+      return count;
+   }
+
+   @Test
+   public void testNetFailureConsume() throws Exception {
+      final AtomicInteger sentMessages = new AtomicInteger(0);
+      final AtomicInteger blockedAt = new AtomicInteger(0);
+
+      Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+      params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+      TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+      final AtomicInteger countSent = new AtomicInteger(0);
+
+      ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc));
+
+      locator.setBlockOnNonDurableSend(false);
+      locator.setBlockOnDurableSend(false);
+      locator.setBlockOnAcknowledge(false);
+      locator.setReconnectAttempts(-1);
+      locator.setConfirmationWindowSize(-1);
+      locator.setProducerWindowSize(-1);
+      locator.setClientFailureCheckPeriod(100);
+      locator.setConnectionTTL(1000);
+      ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2);
+
+      Wait.assertEquals(2, () -> countTopologyMembers(locator.getTopology()));
+
+      sfProducer.addFailureListener(new SessionFailureListener() {
+         @Override
+         public void beforeReconnect(ActiveMQException exception) {
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+
+         }
+      });
+
+      ClientSession sessionProducer = createSession(sfProducer, true, true, 0);
+
+      sessionProducer.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      ClientProducer producer = sessionProducer.createProducer(FailoverTestBase.ADDRESS);
+
+      final int numMessages = 2001;
+      final CountDownLatch latchReceived = new CountDownLatch(numMessages);
+
+      ClientSessionFactoryInternal sfConsumer = createSessionFactoryAndWaitForTopology(locator, 2);
+
+      final ClientSession sessionConsumer = createSession(sfConsumer, true, true, 0);
+      final ClientConsumer consumer = sessionConsumer.createConsumer(FailoverTestBase.ADDRESS);
+
+      sessionConsumer.start();
+
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      final Thread t = new Thread() {
+         @Override
+         public void run() {
+            int received = 0;
+            int errors = 0;
+            while (running.get() && received < numMessages) {
+               try {
+                  ClientMessage msgReceived = consumer.receive(500);
+                  if (msgReceived != null) {
+                     latchReceived.countDown();
+                     msgReceived.acknowledge();
+                     if (++received % 100 == 0) {
+
+                        if (received == 300) {
+                           System.out.println("Shutting down IP");
+                           NetUtil.netDown(LIVE_IP);
+                           liveServer.crash(true, false);
+                        }
+                        System.out.println("Received " + received);
+                        sessionConsumer.commit();
+                     }
+                  } else {
+                     System.out.println("Null");
+                  }
+               } catch (Throwable e) {
+                  errors++;
+                  if (errors > 10) {
+                     break;
+                  }
+                  e.printStackTrace();
+               }
+            }
+         }
+      };
+
+
+      for (sentMessages.set(0); sentMessages.get() < numMessages; sentMessages.incrementAndGet()) {
+         do {
+            try {
+               if (sentMessages.get() % 100 == 0) {
+                  System.out.println("Sent " + sentMessages.get());
+               }
+               producer.send(createMessage(sessionProducer, sentMessages.get(), true));
+               break;
+            } catch (Exception e) {
+               sentMessages.decrementAndGet();
+               new Exception("Exception on ending", e).printStackTrace();
+            }
+         }
+         while (true);
+      }
+
+      sessionProducer.close();
+
+
+      t.start();
+
+      // these may never be received. doing the count down where we blocked.
+      for (int i = 0; i < blockedAt.get(); i++) {
+         latchReceived.countDown();
+      }
+
+      Assert.assertTrue(latchReceived.await(1, TimeUnit.MINUTES));
+
+      running.set(false);
+
+      t.join();
+   }
+
+   @Test
+   public void testFailoverCreateSessionOnFailure() throws Exception {
+      final AtomicInteger sentMessages = new AtomicInteger(0);
+      final AtomicInteger blockedAt = new AtomicInteger(0);
+
+      Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+      params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+      TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+      final AtomicInteger countSent = new AtomicInteger(0);
+
+      final CountDownLatch latchDown = new CountDownLatch(1);
+
+      liveServer.addInterceptor(new Interceptor() {
+         @Override
+         public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
+            //System.out.println("Received " + packet);
+            if (packet instanceof CreateSessionMessage) {
+
+               if (countSent.incrementAndGet() == 50) {
+                  try {
+                     NetUtil.netDown(LIVE_IP);
+                     System.out.println("Blocking traffic");
+                     Thread.sleep(3000); // this is important to let stuff to block
+                     blockedAt.set(sentMessages.get());
+                     latchDown.countDown();
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+            }
+            return true;
+         }
+      });
+
+      ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc));
+      //locator.setDebugReconnects("CF_retry");
+
+      locator.setBlockOnNonDurableSend(false);
+      locator.setBlockOnDurableSend(false);
+      locator.setBlockOnAcknowledge(false);
+      locator.setReconnectAttempts(-1);
+      locator.setConfirmationWindowSize(-1);
+      locator.setProducerWindowSize(-1);
+      locator.setClientFailureCheckPeriod(100);
+      locator.setConnectionTTL(1000);
+      final ClientSessionFactoryInternal sessionFactory = createSessionFactoryAndWaitForTopology(locator, 2);
+      final AtomicInteger failed = new AtomicInteger(0);
+      sessionFactory.addFailureListener(new SessionFailureListener() {
+         @Override
+         public void beforeReconnect(ActiveMQException exception) {
+            if (failed.incrementAndGet() == 1) {
+               Thread.currentThread().interrupt();
+            }
+            new Exception("producer before reconnect", exception).printStackTrace();
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+
+         }
+      });
+      final int numSessions = 100;
+      final CountDownLatch latchCreated = new CountDownLatch(numSessions);
+
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      final Thread t = new Thread("session-creator") {
+         @Override
+         public void run() {
+            int received = 0;
+            int errors = 0;
+            while (running.get() && received < numSessions) {
+               try {
+                  ClientSession session = sessionFactory.createSession();
+                  System.out.println("Creating session, currentLatch = " + latchCreated.getCount());
+                  session.close();
+                  latchCreated.countDown();
+               } catch (Throwable e) {
+                  e.printStackTrace();
+                  errors++;
+               }
+            }
+         }
+      };
+
+      t.start();
+
+      Assert.assertTrue(latchDown.await(1, TimeUnit.MINUTES));
+
+      Thread.sleep(1000);
+
+      System.out.println("Server crashed now!!!");
+
+      liveServer.crash(true, false);
+
+      try {
+         Assert.assertTrue(latchCreated.await(5, TimeUnit.MINUTES));
+
+      } finally {
+         running.set(false);
+
+         t.join(TimeUnit.SECONDS.toMillis(30));
+      }
+   }
+
+   @Test
+   public void testInterruptFailingThread() throws Exception {
+      final AtomicInteger sentMessages = new AtomicInteger(0);
+      final AtomicInteger blockedAt = new AtomicInteger(0);
+
+      Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+      params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+      TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+      final AtomicInteger countSent = new AtomicInteger(0);
+
+      final CountDownLatch latchBlocked = new CountDownLatch(1);
+
+      liveServer.addInterceptor(new Interceptor() {
+         @Override
+         public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
+            //System.out.println("Received " + packet);
+            if (packet instanceof SessionSendMessage) {
+
+               if (countSent.incrementAndGet() == 50) {
+                  try {
+                     NetUtil.netDown(LIVE_IP);
+                     System.out.println("Blocking traffic");
+                     Thread.sleep(3000); // this is important to let stuff to block
+                     blockedAt.set(sentMessages.get());
+                     latchBlocked.countDown();
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+                  //                  new Thread()
+                  //                  {
+                  //                     public void run()
+                  //                     {
+                  //                        try
+                  //                        {
+                  //                           System.err.println("Stopping server");
+                  //                           // liveServer.stop();
+                  //                           liveServer.crash(true, false);
+                  //                        }
+                  //                        catch (Exception e)
+                  //                        {
+                  //                           e.printStackTrace();
+                  //                        }
+                  //                     }
+                  //                  }.start();
+               }
+            }
+            return true;
+         }
+      });
+
+      final CountDownLatch failing = new CountDownLatch(1);
+      final HashSet<Thread> setThread = new HashSet<>();
+
+      ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc));
+
+      locator.setBlockOnNonDurableSend(false);
+      locator.setBlockOnDurableSend(false);
+      locator.setBlockOnAcknowledge(false);
+      locator.setReconnectAttempts(-1);
+      locator.setConfirmationWindowSize(-1);
+      locator.setProducerWindowSize(-1);
+      locator.setClientFailureCheckPeriod(100);
+      locator.setConnectionTTL(1000);
+      ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2);
+      sfProducer.addFailureListener(new SessionFailureListener() {
+         @Override
+         public void beforeReconnect(ActiveMQException exception) {
+            setThread.add(Thread.currentThread());
+            failing.countDown();
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+
+         }
+      });
+
+      final ClientSession sessionProducer = createSession(sfProducer, true, true, 0);
+
+      sessionProducer.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      final ClientProducer producer = sessionProducer.createProducer(FailoverTestBase.ADDRESS);
+
+      final int numMessages = 10000;
+
+      final AtomicBoolean running = new AtomicBoolean(true);
+      final CountDownLatch messagesSentlatch = new CountDownLatch(numMessages);
+
+      Thread t = new Thread("sendingThread") {
+         @Override
+         public void run() {
+
+            while (sentMessages.get() < numMessages && running.get()) {
+               try {
+                  if (sentMessages.get() % 10 == 0) {
+                     System.out.println("Sent " + sentMessages.get());
+                  }
+                  producer.send(createMessage(sessionProducer, sentMessages.get(), true));
+                  sentMessages.incrementAndGet();
+                  messagesSentlatch.countDown();
+               } catch (Throwable e) {
+                  e.printStackTrace();
+               }
+            }
+
+         }
+      };
+
+      t.start();
+
+      Assert.assertTrue(latchBlocked.await(1, TimeUnit.MINUTES));
+
+      Assert.assertTrue(failing.await(1, TimeUnit.MINUTES));
+
+      for (int i = 0; i < 5; i++) {
+         for (Thread tint : setThread) {
+            tint.interrupt();
+         }
+         Thread.sleep(500);
+      }
+
+      liveServer.crash(true, false);
+
+      Assert.assertTrue(messagesSentlatch.await(3, TimeUnit.MINUTES));
+
+      running.set(false);
+
+      t.join();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/05ce7c6e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java
new file mode 100644
index 0000000..1d29b38
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.util.network;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class NetUtil {
+
+   public static boolean checkIP(String ip) throws Exception {
+      InetAddress ipAddress = null;
+      try {
+         ipAddress = InetAddress.getByName(ip);
+      } catch (Exception e) {
+         e.printStackTrace(); // not supposed to happen
+         return false;
+      }
+      NetworkHealthCheck healthCheck = new NetworkHealthCheck(null, 100, 100);
+      return healthCheck.check(ipAddress);
+   }
+
+   public static AtomicInteger nextDevice = new AtomicInteger(0);
+
+   // IP / device (device being only valid on linux)
+   public static Map<String, String> networks = new ConcurrentHashMap<>();
+
+   private enum OS {
+      MAC, LINUX, NON_SUPORTED;
+   }
+
+   static final OS osUsed;
+   static final String user = System.getProperty("user.name");
+
+   static {
+      OS osTmp;
+
+      String propOS = System.getProperty("os.name").toUpperCase();
+
+      if (propOS.contains("MAC")) {
+         osTmp = OS.MAC;
+      } else if (propOS.contains("LINUX")) {
+         osTmp = OS.LINUX;
+      } else {
+         osTmp = OS.NON_SUPORTED;
+      }
+
+      osUsed = osTmp;
+   }
+
+   public static void assumeSudo() {
+      Assume.assumeTrue("non supported OS", osUsed != OS.NON_SUPORTED);
+      if (!canSudo()) {
+         System.out.println("Add the following at the end of your /etc/sudoers (use the visudo command)");
+         System.out.println("# ------------------------------------------------------- ");
+         System.out.println(user + " ALL = NOPASSWD: /sbin/ifconfig");
+         System.out.println("# ------------------------------------------------------- ");
+         Assume.assumeFalse(true);
+      }
+   }
+
+   public static void cleanup() {
+      nextDevice.set(0);
+
+      Set entrySet = networks.entrySet();
+      Iterator iter = entrySet.iterator();
+      while (iter.hasNext()) {
+         Map.Entry<String, String> entry = (Map.Entry<String, String>) iter.next();
+         try {
+            netDown(entry.getKey(), entry.getValue());
+         } catch (Exception e) {
+            e.printStackTrace();
+         }
+      }
+   }
+
+   public static void netUp(String ip) throws Exception {
+      String deviceID = "lo:" + nextDevice.incrementAndGet();
+      if (osUsed == OS.MAC) {
+         if (runCommand("sudo", "-n", "ifconfig", "lo0", "alias", ip) != 0) {
+            Assert.fail("Cannot sudo ifconfig for ip " + ip);
+         }
+         networks.put(ip, "lo0");
+      } else if (osUsed == OS.LINUX) {
+         if (runCommand("sudo", "-n", "ifconfig", deviceID, ip, "netmask", "255.0.0.0") != 0) {
+            Assert.fail("Cannot sudo ifconfig for ip " + ip);
+         }
+         networks.put(ip, deviceID);
+      } else {
+         Assert.fail("OS not supported");
+      }
+   }
+
+   public static void netDown(String ip) throws Exception {
+      String device = networks.remove(ip);
+      Assert.assertNotNull("ip " + ip + "wasn't set up before", device);
+      netDown(ip, device);
+
+   }
+
+   private static void netDown(String ip, String device) throws Exception {
+      if (osUsed == OS.MAC) {
+         if (runCommand("sudo", "-n", "ifconfig", "lo0", "-alias", ip) != 0) {
+            Assert.fail("Cannot sudo ifconfig for ip " + ip);
+         }
+      } else if (osUsed == OS.LINUX) {
+         if (runCommand("sudo", "-n", "ifconfig", device, "down") != 0) {
+            Assert.fail("Cannot sudo ifconfig for ip " + ip);
+         }
+      } else {
+         Assert.fail("OS not supported");
+      }
+   }
+
+   private static final Logger logger = Logger.getLogger(NetUtil.class);
+
+   public static int runCommand(String... command) throws Exception {
+      return runCommand(10, TimeUnit.SECONDS, command);
+   }
+
+   public static int runCommand(long timeout, TimeUnit timeoutUnit, String... command) throws Exception {
+
+      logCommand(command);
+
+      // it did not work with a simple isReachable, it could be because there's no root access, so we will try ping executable
+      ProcessBuilder processBuilder = new ProcessBuilder(command);
+      final Process process = processBuilder.start();
+
+      Thread t = new Thread() {
+         @Override
+         public void run() {
+            try {
+               readStream(process.getInputStream(), true);
+            } catch (Exception dontCare) {
+
+            }
+         }
+      };
+      Thread t2 = new Thread() {
+         @Override
+         public void run() {
+            try {
+               readStream(process.getErrorStream(), true);
+            } catch (Exception dontCare) {
+
+            }
+         }
+      };
+      t2.start();
+
+      int value = process.waitFor();
+
+      t.join(timeoutUnit.toMillis(timeout));
+      Assert.assertFalse(t.isAlive());
+      t2.join(timeoutUnit.toMillis(timeout));
+
+      return value;
+   }
+
+   private static void logCommand(String[] command) {
+      StringBuffer logCommand = new StringBuffer();
+      for (String c : command) {
+         logCommand.append(c + " ");
+      }
+      System.out.println("NetUTIL command::" + logCommand.toString());
+   }
+
+   public static boolean canSudo() {
+      try {
+         return runCommand("sudo", "-n", "ifconfig") == 0;
+      } catch (Exception e) {
+         e.printStackTrace();
+         return false;
+      }
+   }
+
+   @Test
+   public void testCanSudo() throws Exception {
+      Assert.assertTrue(canSudo());
+   }
+
+   private static void readStream(InputStream stream, boolean error) throws IOException {
+      BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+
+      String inputLine;
+      while ((inputLine = reader.readLine()) != null) {
+         if (error) {
+            logger.warn(inputLine);
+         } else {
+            logger.trace(inputLine);
+         }
+      }
+
+      reader.close();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/05ce7c6e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java
new file mode 100644
index 0000000..0f2abd9
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.util.network;
+
+import org.junit.rules.ExternalResource;
+
+public class NetUtilResource extends ExternalResource {
+
+   @Override
+   protected void after() {
+      super.after();
+      NetUtil.cleanup();
+   }
+}