You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/05 22:14:00 UTC
activemq-artemis git commit: NO-JIRA Adding a test playing with
network disconnects and failover
Repository: activemq-artemis
Updated Branches:
refs/heads/master 0d9f5eb2c -> 05ce7c6ec
NO-JIRA Adding a test playing with network disconnects and failover
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/05ce7c6e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/05ce7c6e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/05ce7c6e
Branch: refs/heads/master
Commit: 05ce7c6ecd1c70fc571764af9027767f04538ccd
Parents: 0d9f5eb
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Sep 5 18:09:18 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 5 18:13:35 2018 -0400
----------------------------------------------------------------------
.../artemis/tests/util/ActiveMQTestBase.java | 5 +-
.../failover/NetworkFailureFailoverTest.java | 688 +++++++++++++++++++
.../artemis/tests/util/network/NetUtil.java | 226 ++++++
.../tests/util/network/NetUtilResource.java | 29 +
4 files changed, 946 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/05ce7c6e/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 530c399..2cd5d56 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1219,7 +1219,7 @@ public abstract class ActiveMQTestBase extends Assert {
return params;
}
- protected static final TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live) {
+ protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live) {
if (live) {
return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY);
}
@@ -1231,7 +1231,7 @@ public abstract class ActiveMQTestBase extends Assert {
return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
}
- protected static final TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live) {
+ protected TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live) {
if (live) {
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
}
@@ -1239,6 +1239,7 @@ public abstract class ActiveMQTestBase extends Assert {
Map<String, Object> server1Params = new HashMap<>();
server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+ server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/05ce7c6e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
new file mode 100644
index 0000000..1011502
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
@@ -0,0 +1,688 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.client.impl.Topology;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.junit.Wait;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.util.network.NetUtil;
+import org.apache.activemq.artemis.tests.util.network.NetUtilResource;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class NetworkFailureFailoverTest extends FailoverTestBase {
+
+ @Rule
+ public NetUtilResource netUtilResource = new NetUtilResource();
+
+ @BeforeClass
+ public static void start() {
+ NetUtil.assumeSudo();
+ }
+
+ // 192.0.2.0 is reserved for documentation, so I'm pretty sure this won't exist on any system. (It shouldn't at least)
+ private static final String LIVE_IP = "192.0.2.0";
+
+ private int beforeTime;
+
+ @Override
+ public void setUp() throws Exception {
+ // beforeTime = NettyConnection.getLockTimeout();
+ // NettyConnection.setLockTimeout(1000);
+ NetUtil.netUp(LIVE_IP);
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ // NettyConnection.setLockTimeout(beforeTime);
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
+ return getNettyAcceptorTransportConfiguration(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
+ return getNettyConnectorTransportConfiguration(live);
+ }
+
+ protected ClientSession createSession(ClientSessionFactory sf1,
+ boolean autoCommitSends,
+ boolean autoCommitAcks,
+ int ackBatchSize) throws Exception {
+ return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks, ackBatchSize));
+ }
+
+ protected ClientSession createSession(ClientSessionFactory sf1,
+ boolean autoCommitSends,
+ boolean autoCommitAcks) throws Exception {
+ return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks));
+ }
+
+ protected ClientSession createSession(ClientSessionFactory sf1) throws Exception {
+ return addClientSession(sf1.createSession());
+ }
+
+ protected ClientSession createSession(ClientSessionFactory sf1,
+ boolean xa,
+ boolean autoCommitSends,
+ boolean autoCommitAcks) throws Exception {
+ return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
+ }
+
+ @Override
+ protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live) {
+ Map<String, Object> server1Params = new HashMap<>();
+
+ if (live) {
+ server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+ server1Params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+ } else {
+ server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+ server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ }
+
+ return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
+ }
+
+ @Override
+ protected TransportConfiguration getNettyConnectorTransportConfiguration(final boolean live) {
+ Map<String, Object> server1Params = new HashMap<>();
+
+ if (live) {
+ server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+ server1Params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+ } else {
+ server1Params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT);
+ server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ }
+
+ server1Params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+
+ return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
+ }
+
+ @Test
+ public void testFailoverAfterNetFailure() throws Exception {
+ final AtomicInteger sentMessages = new AtomicInteger(0);
+ final AtomicInteger blockedAt = new AtomicInteger(0);
+
+ Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
+ Map<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+ params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+ TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+ final AtomicInteger countSent = new AtomicInteger(0);
+
+ liveServer.addInterceptor(new Interceptor() {
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
+ //System.out.println("Received " + packet);
+ if (packet instanceof SessionSendMessage) {
+
+ if (countSent.incrementAndGet() == 500) {
+ try {
+ NetUtil.netDown(LIVE_IP);
+ System.out.println("Blocking traffic");
+ // Thread.sleep(3000); // this is important to let stuff to block
+ liveServer.crash(true, false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ System.err.println("Stopping server");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ }
+ }
+ return true;
+ }
+ });
+
+ ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc));
+
+ locator.setBlockOnNonDurableSend(false);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(false);
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(-1);
+ locator.setProducerWindowSize(-1);
+ locator.setClientFailureCheckPeriod(100);
+ locator.setConnectionTTL(1000);
+ ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2);
+ sfProducer.addFailureListener(new SessionFailureListener() {
+ @Override
+ public void beforeReconnect(ActiveMQException exception) {
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+
+ }
+ });
+
+ ClientSession sessionProducer = createSession(sfProducer, true, true, 0);
+
+ sessionProducer.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+ ClientProducer producer = sessionProducer.createProducer(FailoverTestBase.ADDRESS);
+
+ final int numMessages = 2001;
+ final CountDownLatch latchReceived = new CountDownLatch(numMessages);
+
+ ClientSessionFactoryInternal sfConsumer = createSessionFactoryAndWaitForTopology(locator, 2);
+
+ final ClientSession sessionConsumer = createSession(sfConsumer, true, true, 0);
+ final ClientConsumer consumer = sessionConsumer.createConsumer(FailoverTestBase.ADDRESS);
+
+ sessionConsumer.start();
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ final Thread t = new Thread() {
+ @Override
+ public void run() {
+ int received = 0;
+ int errors = 0;
+ while (running.get() && received < numMessages) {
+ try {
+ ClientMessage msgReceived = consumer.receive(500);
+ if (msgReceived != null) {
+ latchReceived.countDown();
+ msgReceived.acknowledge();
+ if (received++ % 100 == 0) {
+ System.out.println("Received " + received);
+ sessionConsumer.commit();
+ }
+ } else {
+ System.out.println("Null");
+ }
+ } catch (Throwable e) {
+ errors++;
+ if (errors > 10) {
+ break;
+ }
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ t.start();
+
+ for (sentMessages.set(0); sentMessages.get() < numMessages; sentMessages.incrementAndGet()) {
+ do {
+ try {
+ if (sentMessages.get() % 100 == 0) {
+ System.out.println("Sent " + sentMessages.get());
+ }
+ producer.send(createMessage(sessionProducer, sentMessages.get(), true));
+ break;
+ } catch (Exception e) {
+ sentMessages.decrementAndGet();
+ new Exception("Exception on ending", e).printStackTrace();
+ }
+ }
+ while (true);
+ }
+
+ // these may never be received. doing the count down where we blocked.
+ for (int i = 0; i < blockedAt.get(); i++) {
+ latchReceived.countDown();
+ }
+
+ Assert.assertTrue(latchReceived.await(1, TimeUnit.MINUTES));
+
+ running.set(false);
+
+ t.join();
+ }
+
+
+ private int countTopologyMembers(Topology topology) {
+ int count = 0;
+ for (TopologyMember m : topology.getMembers()) {
+ count++;
+ if (m.getBackup() != null) {
+ count++;
+ }
+ }
+
+ return count;
+ }
+
+ @Test
+ public void testNetFailureConsume() throws Exception {
+ final AtomicInteger sentMessages = new AtomicInteger(0);
+ final AtomicInteger blockedAt = new AtomicInteger(0);
+
+ Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
+ Map<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+ params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+ TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+ final AtomicInteger countSent = new AtomicInteger(0);
+
+ ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc));
+
+ locator.setBlockOnNonDurableSend(false);
+ locator.setBlockOnDurableSend(false);
+ locator.setBlockOnAcknowledge(false);
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(-1);
+ locator.setProducerWindowSize(-1);
+ locator.setClientFailureCheckPeriod(100);
+ locator.setConnectionTTL(1000);
+ ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2);
+
+ Wait.assertEquals(2, () -> countTopologyMembers(locator.getTopology()));
+
+ sfProducer.addFailureListener(new SessionFailureListener() {
+ @Override
+ public void beforeReconnect(ActiveMQException exception) {
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+
+ }
+ });
+
+ ClientSession sessionProducer = createSession(sfProducer, true, true, 0);
+
+ sessionProducer.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+ ClientProducer producer = sessionProducer.createProducer(FailoverTestBase.ADDRESS);
+
+ final int numMessages = 2001;
+ final CountDownLatch latchReceived = new CountDownLatch(numMessages);
+
+ ClientSessionFactoryInternal sfConsumer = createSessionFactoryAndWaitForTopology(locator, 2);
+
+ final ClientSession sessionConsumer = createSession(sfConsumer, true, true, 0);
+ final ClientConsumer consumer = sessionConsumer.createConsumer(FailoverTestBase.ADDRESS);
+
+ sessionConsumer.start();
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ final Thread t = new Thread() {
+ @Override
+ public void run() {
+ int received = 0;
+ int errors = 0;
+ while (running.get() && received < numMessages) {
+ try {
+ ClientMessage msgReceived = consumer.receive(500);
+ if (msgReceived != null) {
+ latchReceived.countDown();
+ msgReceived.acknowledge();
+ if (++received % 100 == 0) {
+
+ if (received == 300) {
+ System.out.println("Shutting down IP");
+ NetUtil.netDown(LIVE_IP);
+ liveServer.crash(true, false);
+ }
+ System.out.println("Received " + received);
+ sessionConsumer.commit();
+ }
+ } else {
+ System.out.println("Null");
+ }
+ } catch (Throwable e) {
+ errors++;
+ if (errors > 10) {
+ break;
+ }
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+
+ for (sentMessages.set(0); sentMessages.get() < numMessages; sentMessages.incrementAndGet()) {
+ do {
+ try {
+ if (sentMessages.get() % 100 == 0) {
+ System.out.println("Sent " + sentMessages.get());
+ }
+ producer.send(createMessage(sessionProducer, sentMessages.get(), true));
+ break;
+ } catch (Exception e) {
+ sentMessages.decrementAndGet();
+ new Exception("Exception on ending", e).printStackTrace();
+ }
+ }
+ while (true);
+ }
+
+ sessionProducer.close();
+
+
+ t.start();
+
+ // these may never be received. doing the count down where we blocked.
+ for (int i = 0; i < blockedAt.get(); i++) {
+ latchReceived.countDown();
+ }
+
+ Assert.assertTrue(latchReceived.await(1, TimeUnit.MINUTES));
+
+ running.set(false);
+
+ t.join();
+ }
+
+ @Test
+ public void testFailoverCreateSessionOnFailure() throws Exception {
+ final AtomicInteger sentMessages = new AtomicInteger(0);
+ final AtomicInteger blockedAt = new AtomicInteger(0);
+
+ Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
+ Map<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+ params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+ TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+ final AtomicInteger countSent = new AtomicInteger(0);
+
+ final CountDownLatch latchDown = new CountDownLatch(1);
+
+ liveServer.addInterceptor(new Interceptor() {
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
+ //System.out.println("Received " + packet);
+ if (packet instanceof CreateSessionMessage) {
+
+ if (countSent.incrementAndGet() == 50) {
+ try {
+ NetUtil.netDown(LIVE_IP);
+ System.out.println("Blocking traffic");
+ Thread.sleep(3000); // this is important to let stuff to block
+ blockedAt.set(sentMessages.get());
+ latchDown.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return true;
+ }
+ });
+
+ ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc));
+ //locator.setDebugReconnects("CF_retry");
+
+ locator.setBlockOnNonDurableSend(false);
+ locator.setBlockOnDurableSend(false);
+ locator.setBlockOnAcknowledge(false);
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(-1);
+ locator.setProducerWindowSize(-1);
+ locator.setClientFailureCheckPeriod(100);
+ locator.setConnectionTTL(1000);
+ final ClientSessionFactoryInternal sessionFactory = createSessionFactoryAndWaitForTopology(locator, 2);
+ final AtomicInteger failed = new AtomicInteger(0);
+ sessionFactory.addFailureListener(new SessionFailureListener() {
+ @Override
+ public void beforeReconnect(ActiveMQException exception) {
+ if (failed.incrementAndGet() == 1) {
+ Thread.currentThread().interrupt();
+ }
+ new Exception("producer before reconnect", exception).printStackTrace();
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+
+ }
+ });
+ final int numSessions = 100;
+ final CountDownLatch latchCreated = new CountDownLatch(numSessions);
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ final Thread t = new Thread("session-creator") {
+ @Override
+ public void run() {
+ int received = 0;
+ int errors = 0;
+ while (running.get() && received < numSessions) {
+ try {
+ ClientSession session = sessionFactory.createSession();
+ System.out.println("Creating session, currentLatch = " + latchCreated.getCount());
+ session.close();
+ latchCreated.countDown();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors++;
+ }
+ }
+ }
+ };
+
+ t.start();
+
+ Assert.assertTrue(latchDown.await(1, TimeUnit.MINUTES));
+
+ Thread.sleep(1000);
+
+ System.out.println("Server crashed now!!!");
+
+ liveServer.crash(true, false);
+
+ try {
+ Assert.assertTrue(latchCreated.await(5, TimeUnit.MINUTES));
+
+ } finally {
+ running.set(false);
+
+ t.join(TimeUnit.SECONDS.toMillis(30));
+ }
+ }
+
+ @Test
+ public void testInterruptFailingThread() throws Exception {
+ final AtomicInteger sentMessages = new AtomicInteger(0);
+ final AtomicInteger blockedAt = new AtomicInteger(0);
+
+ Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
+ Map<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
+ params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
+ TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+ final AtomicInteger countSent = new AtomicInteger(0);
+
+ final CountDownLatch latchBlocked = new CountDownLatch(1);
+
+ liveServer.addInterceptor(new Interceptor() {
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
+ //System.out.println("Received " + packet);
+ if (packet instanceof SessionSendMessage) {
+
+ if (countSent.incrementAndGet() == 50) {
+ try {
+ NetUtil.netDown(LIVE_IP);
+ System.out.println("Blocking traffic");
+ Thread.sleep(3000); // this is important to let stuff to block
+ blockedAt.set(sentMessages.get());
+ latchBlocked.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ // new Thread()
+ // {
+ // public void run()
+ // {
+ // try
+ // {
+ // System.err.println("Stopping server");
+ // // liveServer.stop();
+ // liveServer.crash(true, false);
+ // }
+ // catch (Exception e)
+ // {
+ // e.printStackTrace();
+ // }
+ // }
+ // }.start();
+ }
+ }
+ return true;
+ }
+ });
+
+ final CountDownLatch failing = new CountDownLatch(1);
+ final HashSet<Thread> setThread = new HashSet<>();
+
+ ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc));
+
+ locator.setBlockOnNonDurableSend(false);
+ locator.setBlockOnDurableSend(false);
+ locator.setBlockOnAcknowledge(false);
+ locator.setReconnectAttempts(-1);
+ locator.setConfirmationWindowSize(-1);
+ locator.setProducerWindowSize(-1);
+ locator.setClientFailureCheckPeriod(100);
+ locator.setConnectionTTL(1000);
+ ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2);
+ sfProducer.addFailureListener(new SessionFailureListener() {
+ @Override
+ public void beforeReconnect(ActiveMQException exception) {
+ setThread.add(Thread.currentThread());
+ failing.countDown();
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+
+ }
+ });
+
+ final ClientSession sessionProducer = createSession(sfProducer, true, true, 0);
+
+ sessionProducer.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+ final ClientProducer producer = sessionProducer.createProducer(FailoverTestBase.ADDRESS);
+
+ final int numMessages = 10000;
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final CountDownLatch messagesSentlatch = new CountDownLatch(numMessages);
+
+ Thread t = new Thread("sendingThread") {
+ @Override
+ public void run() {
+
+ while (sentMessages.get() < numMessages && running.get()) {
+ try {
+ if (sentMessages.get() % 10 == 0) {
+ System.out.println("Sent " + sentMessages.get());
+ }
+ producer.send(createMessage(sessionProducer, sentMessages.get(), true));
+ sentMessages.incrementAndGet();
+ messagesSentlatch.countDown();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+ };
+
+ t.start();
+
+ Assert.assertTrue(latchBlocked.await(1, TimeUnit.MINUTES));
+
+ Assert.assertTrue(failing.await(1, TimeUnit.MINUTES));
+
+ for (int i = 0; i < 5; i++) {
+ for (Thread tint : setThread) {
+ tint.interrupt();
+ }
+ Thread.sleep(500);
+ }
+
+ liveServer.crash(true, false);
+
+ Assert.assertTrue(messagesSentlatch.await(3, TimeUnit.MINUTES));
+
+ running.set(false);
+
+ t.join();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/05ce7c6e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java
new file mode 100644
index 0000000..1d29b38
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtil.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.util.network;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class NetUtil {
+
+ public static boolean checkIP(String ip) throws Exception {
+ InetAddress ipAddress = null;
+ try {
+ ipAddress = InetAddress.getByName(ip);
+ } catch (Exception e) {
+ e.printStackTrace(); // not supposed to happen
+ return false;
+ }
+ NetworkHealthCheck healthCheck = new NetworkHealthCheck(null, 100, 100);
+ return healthCheck.check(ipAddress);
+ }
+
+ public static AtomicInteger nextDevice = new AtomicInteger(0);
+
+ // IP / device (device being only valid on linux)
+ public static Map<String, String> networks = new ConcurrentHashMap<>();
+
+ private enum OS {
+ MAC, LINUX, NON_SUPORTED;
+ }
+
+ static final OS osUsed;
+ static final String user = System.getProperty("user.name");
+
+ static {
+ OS osTmp;
+
+ String propOS = System.getProperty("os.name").toUpperCase();
+
+ if (propOS.contains("MAC")) {
+ osTmp = OS.MAC;
+ } else if (propOS.contains("LINUX")) {
+ osTmp = OS.LINUX;
+ } else {
+ osTmp = OS.NON_SUPORTED;
+ }
+
+ osUsed = osTmp;
+ }
+
+ public static void assumeSudo() {
+ Assume.assumeTrue("non supported OS", osUsed != OS.NON_SUPORTED);
+ if (!canSudo()) {
+ System.out.println("Add the following at the end of your /etc/sudoers (use the visudo command)");
+ System.out.println("# ------------------------------------------------------- ");
+ System.out.println(user + " ALL = NOPASSWD: /sbin/ifconfig");
+ System.out.println("# ------------------------------------------------------- ");
+ Assume.assumeFalse(true);
+ }
+ }
+
+ public static void cleanup() {
+ nextDevice.set(0);
+
+ Set entrySet = networks.entrySet();
+ Iterator iter = entrySet.iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, String> entry = (Map.Entry<String, String>) iter.next();
+ try {
+ netDown(entry.getKey(), entry.getValue());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void netUp(String ip) throws Exception {
+ String deviceID = "lo:" + nextDevice.incrementAndGet();
+ if (osUsed == OS.MAC) {
+ if (runCommand("sudo", "-n", "ifconfig", "lo0", "alias", ip) != 0) {
+ Assert.fail("Cannot sudo ifconfig for ip " + ip);
+ }
+ networks.put(ip, "lo0");
+ } else if (osUsed == OS.LINUX) {
+ if (runCommand("sudo", "-n", "ifconfig", deviceID, ip, "netmask", "255.0.0.0") != 0) {
+ Assert.fail("Cannot sudo ifconfig for ip " + ip);
+ }
+ networks.put(ip, deviceID);
+ } else {
+ Assert.fail("OS not supported");
+ }
+ }
+
+ public static void netDown(String ip) throws Exception {
+ String device = networks.remove(ip);
+ Assert.assertNotNull("ip " + ip + "wasn't set up before", device);
+ netDown(ip, device);
+
+ }
+
+ private static void netDown(String ip, String device) throws Exception {
+ if (osUsed == OS.MAC) {
+ if (runCommand("sudo", "-n", "ifconfig", "lo0", "-alias", ip) != 0) {
+ Assert.fail("Cannot sudo ifconfig for ip " + ip);
+ }
+ } else if (osUsed == OS.LINUX) {
+ if (runCommand("sudo", "-n", "ifconfig", device, "down") != 0) {
+ Assert.fail("Cannot sudo ifconfig for ip " + ip);
+ }
+ } else {
+ Assert.fail("OS not supported");
+ }
+ }
+
+ private static final Logger logger = Logger.getLogger(NetUtil.class);
+
+ public static int runCommand(String... command) throws Exception {
+ return runCommand(10, TimeUnit.SECONDS, command);
+ }
+
+ public static int runCommand(long timeout, TimeUnit timeoutUnit, String... command) throws Exception {
+
+ logCommand(command);
+
+ // it did not work with a simple isReachable, it could be because there's no root access, so we will try ping executable
+ ProcessBuilder processBuilder = new ProcessBuilder(command);
+ final Process process = processBuilder.start();
+
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ readStream(process.getInputStream(), true);
+ } catch (Exception dontCare) {
+
+ }
+ }
+ };
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ readStream(process.getErrorStream(), true);
+ } catch (Exception dontCare) {
+
+ }
+ }
+ };
+ t2.start();
+
+ int value = process.waitFor();
+
+ t.join(timeoutUnit.toMillis(timeout));
+ Assert.assertFalse(t.isAlive());
+ t2.join(timeoutUnit.toMillis(timeout));
+
+ return value;
+ }
+
+ private static void logCommand(String[] command) {
+ StringBuffer logCommand = new StringBuffer();
+ for (String c : command) {
+ logCommand.append(c + " ");
+ }
+ System.out.println("NetUTIL command::" + logCommand.toString());
+ }
+
+ public static boolean canSudo() {
+ try {
+ return runCommand("sudo", "-n", "ifconfig") == 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ @Test
+ public void testCanSudo() throws Exception {
+ Assert.assertTrue(canSudo());
+ }
+
+ private static void readStream(InputStream stream, boolean error) throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+
+ String inputLine;
+ while ((inputLine = reader.readLine()) != null) {
+ if (error) {
+ logger.warn(inputLine);
+ } else {
+ logger.trace(inputLine);
+ }
+ }
+
+ reader.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/05ce7c6e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java
new file mode 100644
index 0000000..0f2abd9
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/network/NetUtilResource.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.util.network;
+
+import org.junit.rules.ExternalResource;
+
+public class NetUtilResource extends ExternalResource {
+
+ @Override
+ protected void after() {
+ super.after();
+ NetUtil.cleanup();
+ }
+}