You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/23 16:14:32 UTC
[2/3] activemq-artemis git commit: ARTEMIS-473/ARTEMIS-863 Detect
network failures
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index c112892..2f5f3fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -93,6 +93,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
@@ -111,6 +112,7 @@ import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MemoryManager;
+import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
@@ -240,6 +242,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
*/
private volatile ExecutorFactory ioExecutorFactory;
+ private final NetworkHealthCheck networkHealthCheck = new NetworkHealthCheck(ActiveMQDefaultConfiguration.getDefaultNetworkCheckNic(), ActiveMQDefaultConfiguration.getDefaultNetworkCheckPeriod(), ActiveMQDefaultConfiguration.getDefaultNetworkCheckTimeout());
+
private final HierarchicalRepository<Set<Role>> securityRepository;
private volatile ResourceManager resourceManager;
@@ -325,6 +329,28 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final ConcurrentMap<String, AtomicInteger> connectedClientIds = new ConcurrentHashMap();
+ private final ActiveMQComponent networkCheckMonitor = new ActiveMQComponent() {
+ @Override
+ public void start() throws Exception {
+ internalStart();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ internalStop();
+ }
+
+ @Override
+ public String toString() {
+ return ActiveMQServerImpl.this.toString();
+ }
+
+ @Override
+ public boolean isStarted() {
+ return ActiveMQServerImpl.this.isStarted();
+ }
+ };
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -405,6 +431,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return reloadManager;
}
+ @Override
+ public NetworkHealthCheck getNetworkHealthCheck() {
+ return networkHealthCheck;
+ }
+
// life-cycle methods
// ----------------------------------------------------------------
@@ -430,6 +461,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public final synchronized void start() throws Exception {
+ SERVER_STATE originalState = state;
+ try {
+ internalStart();
+ } finally {
+ if (originalState == SERVER_STATE.STOPPED) {
+ networkHealthCheck.setTimeUnit(TimeUnit.MILLISECONDS).setPeriod(configuration.getNetworkCheckPeriod()).
+ setNetworkTimeout(configuration.getNetworkCheckTimeout()).
+ parseAddressList(configuration.getNetworkCheckList()).
+ parseURIList(configuration.getNetworkCheckURLList()).
+ setNICName(configuration.getNetworkCheckNIC()).
+ setIpv4Command(configuration.getNetworkCheckPingCommand()).
+ setIpv6Command(configuration.getNetworkCheckPing6Command());
+
+ networkHealthCheck.addComponent(networkCheckMonitor);
+ }
+ }
+ }
+
+ private void internalStart() throws Exception {
if (state != SERVER_STATE.STOPPED) {
logger.debug("Server already started!");
return;
@@ -442,7 +492,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
state = SERVER_STATE.STARTING;
if (haPolicy == null) {
- haPolicy = ConfigurationUtils.getHAPolicy(configuration.getHAPolicyConfiguration());
+ haPolicy = ConfigurationUtils.getHAPolicy(configuration.getHAPolicyConfiguration(), this);
}
activationLatch.setCount(1);
@@ -493,6 +543,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public ReplicationEndpoint getReplicationEndpoint() {
+ if (activation instanceof SharedNothingBackupActivation) {
+ return ((SharedNothingBackupActivation) activation).getReplicationEndpoint();
+ }
+ return null;
+ }
+
+ @Override
public void unlockActivation() {
activationLock.release();
}
@@ -611,6 +669,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public final void stop() throws Exception {
+ try {
+ internalStop();
+ } finally {
+ networkHealthCheck.stop();
+ }
+ }
+
+ private void internalStop() throws Exception {
stop(false);
}
@@ -774,7 +840,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
fileStoreMonitor = null;
}
- activation.sendLiveIsStopping();
+ if (failoverOnServerShutdown) {
+ activation.sendLiveIsStopping();
+ }
stopComponent(connectorsService);
@@ -838,7 +906,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (storageManager != null)
try {
- storageManager.stop(criticalIOError);
+ storageManager.stop(criticalIOError, failoverOnServerShutdown);
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, storageManager.getClass().getName());
}
@@ -1847,7 +1915,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
this.executorFactory = new OrderedExecutorFactory(threadPool);
-
if (serviceRegistry.getIOExecutorService() != null) {
this.ioExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getIOExecutorService());
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 8351c4d..cb8c971 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
+import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
@@ -71,6 +72,7 @@ public final class SharedNothingBackupActivation extends Activation {
ClusterControl clusterControl;
private boolean closed;
private volatile boolean backupUpToDate = true;
+ private final NetworkHealthCheck networkHealthCheck;
private final ReusableLatch backupSyncLatch = new ReusableLatch(0);
@@ -78,13 +80,15 @@ public final class SharedNothingBackupActivation extends Activation {
boolean attemptFailBack,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO,
- ReplicaPolicy replicaPolicy) {
+ ReplicaPolicy replicaPolicy,
+ NetworkHealthCheck networkHealthCheck) {
this.activeMQServer = activeMQServer;
this.attemptFailBack = attemptFailBack;
this.activationParams = activationParams;
this.shutdownOnCriticalIO = shutdownOnCriticalIO;
this.replicaPolicy = replicaPolicy;
backupSyncLatch.setCount(1);
+ this.networkHealthCheck = networkHealthCheck;
}
public void init() throws Exception {
@@ -117,7 +121,7 @@ public final class SharedNothingBackupActivation extends Activation {
synchronized (this) {
if (closed)
return;
- backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool());
+ backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck);
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
}
@@ -269,9 +273,10 @@ public final class SharedNothingBackupActivation extends Activation {
public void run() {
try {
if (logger.isTraceEnabled()) {
- logger.trace("Calling activeMQServer.stop()");
+ logger.trace("Calling activeMQServer.stop() and start() to restart the server");
}
activeMQServer.stop();
+ activeMQServer.start();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index e650540..46d92e3 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -871,6 +871,66 @@
</xsd:sequence>
</xsd:complexType>
</xsd:element>
+
+ <xsd:element name="network-check-list" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ A comma separated list of IPs to be used to validate if the broker should be kept up
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="network-check-URL-list" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ A comma separated list of URLs to be used to validate if the broker should be kept up
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="network-check-period" type="xsd:long" default="10000" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ A frequency in milliseconds to how often we should check if the network is still up
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="network-check-timeout" type="xsd:long" default="1000" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ A timeout used in milliseconds to be used on the ping.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="network-check-NIC" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ The network interface card name to be used to validate the address.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+
+ <xsd:element name="network-check-ping-command" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ The ping command used to ping IPV4 addresses.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+
+ <xsd:element name="network-check-ping6-command" type="xsd:string" default="" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ The ping command used to ping IPV6 addresses.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+
</xsd:all>
</xsd:complexType>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index e8abcd5..a21cf3a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -106,6 +106,13 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval());
Assert.assertEquals(1234567, conf.getConfigurationFileRefreshPeriod());
+ Assert.assertEquals("127.0.0.1", conf.getNetworkCheckList());
+ Assert.assertEquals("some-nick", conf.getNetworkCheckNIC());
+ Assert.assertEquals(123, conf.getNetworkCheckPeriod());
+ Assert.assertEquals(321, conf.getNetworkCheckTimeout());
+ Assert.assertEquals("ping-four", conf.getNetworkCheckPingCommand());
+ Assert.assertEquals("ping-six", conf.getNetworkCheckPing6Command());
+
Assert.assertEquals("largemessagesdir", conf.getLargeMessagesDirectory());
Assert.assertEquals(95, conf.getMemoryWarningThreshold());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 93c5c9d..9b06fbe 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -229,7 +229,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
- public void stop(boolean ioCriticalError) throws Exception {
+ public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index f1b1774..6030f81 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -236,6 +236,13 @@
<memory-warning-threshold>95</memory-warning-threshold>
<memory-measure-interval>54321</memory-measure-interval>
<large-messages-directory>largemessagesdir</large-messages-directory>
+ <network-check-list>127.0.0.1</network-check-list>
+ <network-check-NIC>some-nick</network-check-NIC>
+ <network-check-period>123</network-check-period>
+ <network-check-timeout>321</network-check-timeout>
+ <network-check-URL-list>www.apache.org</network-check-URL-list>
+ <network-check-ping-command>ping-four</network-check-ping-command>
+ <network-check-ping6-command>ping-six</network-check-ping6-command>
<security-settings>
<security-setting match="a1">
<permission type="createNonDurableQueue" roles="a1.1"/>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/docs/user-manual/en/SUMMARY.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md
index 451bd4a..bf3ed3c 100644
--- a/docs/user-manual/en/SUMMARY.md
+++ b/docs/user-manual/en/SUMMARY.md
@@ -20,6 +20,7 @@
* [Configuration Reload](config-reload.md)
* [Detecting Dead Connections](connection-ttl.md)
* [Detecting Slow Consumers](slow-consumers.md)
+* [Avoiding Network Isolation](network-isolation.md)
* [Resource Manager Configuration](transaction-config.md)
* [Flow Control](flow-control.md)
* [Guarantees of sends and commits](send-guarantees.md)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/docs/user-manual/en/configuration-index.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index 895fe77..cffee06 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -112,6 +112,11 @@ system-property-prefix | Prefix for replacing configuration settings using Bean
[transaction-timeout](transaction-config.md "Resource Manager Configuration") | how long (in ms) before a transaction can be removed from the resource manager after create time. Default=300000
[transaction-timeout-scan-period](transaction-config.md "Resource Manager Configuration") | how often (in ms) to scan for timeout transactions. Default=1000
[wild-card-routing-enabled](wildcard-routing.md "Routing Messages With Wild Cards") | true means that the server supports wild card routing. Default=true
+[network-check-NIC](network-isolation.md) | The network internet card to be used on InetAddress.isReacheable
+[network-check-URL](network-isolation.md) | The list of http URIs to be used to validate the network
+[network-check-list](network-isolation.md) | The list of pings to be used on ping or InetAddress.isReacheable
+[network-check-ping-command](network-isolation.md) | The command used to oping IPV4 addresses
+[network-check-ping6-command](network-isolation.md) | The command used to oping IPV6 addresses
#address-setting type
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/docs/user-manual/en/network-isolation.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/network-isolation.md b/docs/user-manual/en/network-isolation.md
new file mode 100644
index 0000000..acb26ef
--- /dev/null
+++ b/docs/user-manual/en/network-isolation.md
@@ -0,0 +1,106 @@
+# Network Isolation
+
+In case the server is isolated, say for a network failure, the server will be isolated for its peers on a network of brokers. If you are playing with replication the backup may think the backup failed and you may endup with two live nodes, what is called the split brain.
+
+# Pinging the network
+
+You may configure one more addresses on the broker.xml that are part of your network topology, that will be pinged through the life cycle of the server.
+
+The server will stop itself until the network is back on such case.
+
+If you execute the create command passing a -ping argument, you will create a default xml that is ready to be used with network checks:
+
+
+```
+./artemis create /myDir/myServer --ping 10.0.0.1
+```
+
+
+This XML part will be added to your broker.xml:
+
+```xml
+<!--
+You can specify the NIC you want to use to verify if the network
+ <network-check-NIC>theNickName</network-check-NIC>
+-->
+
+<!--
+Use this to use an HTTP server to validate the network
+ <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+<network-check-period>10000</network-check-period>
+<network-check-timeout>1000</network-check-timeout>
+
+<!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is meant to check if your network is up.
+ Using IPs that could eventually disappear or be partially visible may defeat the purpose.
+ You can use a list of multiple IPs, any successful ping will make the server OK to continue running -->
+<network-check-list>10.0.0.1</network-check-list>
+
+<!-- use this to customize the ping used for ipv4 addresses -->
+<network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command>
+
+<!-- use this to customize the ping used for ipv addresses -->
+<network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command>
+
+```
+
+
+Once you lose connectivity towards 10.0.0.1 on the given example
+, you will see see this output at the server:
+
+
+```
+09:49:24,562 WARN [org.apache.activemq.artemis.core.server.NetworkHealthCheck] Ping Address /10.0.0.1 wasn't reacheable
+09:49:36,577 INFO [org.apache.activemq.artemis.core.server.NetworkHealthCheck] Network is unhealthy, stopping service ActiveMQServerImpl::serverUUID=04fd5dd8-b18c-11e6-9efe-6a0001921ad0
+09:49:36,625 INFO [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 1.6.0 [04fd5dd8-b18c-11e6-9efe-6a0001921ad0] stopped, uptime 14.787 seconds
+09:50:00,653 WARN [org.apache.activemq.artemis.core.server.NetworkHealthCheck] ping: sendto: No route to host
+09:50:10,656 WARN [org.apache.activemq.artemis.core.server.NetworkHealthCheck] Host is down: java.net.ConnectException: Host is down
+ at java.net.Inet6AddressImpl.isReachable0(Native Method) [rt.jar:1.8.0_73]
+ at java.net.Inet6AddressImpl.isReachable(Inet6AddressImpl.java:77) [rt.jar:1.8.0_73]
+ at java.net.InetAddress.isReachable(InetAddress.java:502) [rt.jar:1.8.0_73]
+ at org.apache.activemq.artemis.core.server.NetworkHealthCheck.check(NetworkHealthCheck.java:295) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
+ at org.apache.activemq.artemis.core.server.NetworkHealthCheck.check(NetworkHealthCheck.java:276) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
+ at org.apache.activemq.artemis.core.server.NetworkHealthCheck.run(NetworkHealthCheck.java:244) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
+ at org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent$2.run(ActiveMQScheduledComponent.java:189) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
+ at org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent$3.run(ActiveMQScheduledComponent.java:199) [artemis-commons-1.6.0-SNAPSHOT.jar:1.6.0-SNAPSHOT]
+ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [rt.jar:1.8.0_73]
+ at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [rt.jar:1.8.0_73]
+ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [rt.jar:1.8.0_73]
+ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [rt.jar:1.8.0_73]
+ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_73]
+ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_73]
+ at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_73]
+
+```
+
+Once you re establish your network connections towards the configured check list:
+
+```
+09:53:23,461 INFO [org.apache.activemq.artemis.core.server.NetworkHealthCheck] Network is healthy, starting service ActiveMQServerImpl::
+09:53:23,462 INFO [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=./data/journal,bindingsDirectory=./data/bindings,largeMessagesDirectory=./data/large-messages,pagingDirectory=./data/paging)
+09:53:23,462 INFO [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
+09:53:23,462 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
+09:53:23,463 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
+09:53:23,463 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
+09:53:23,463 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
+09:53:23,464 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
+09:53:23,464 INFO [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
+09:53:23,541 INFO [org.apache.activemq.artemis.core.server] AMQ221003: Deploying queue jms.queue.DLQ
+09:53:23,541 INFO [org.apache.activemq.artemis.core.server] AMQ221003: Deploying queue jms.queue.ExpiryQueue
+09:53:23,549 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:61616 for protocols [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
+09:53:23,550 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:5445 for protocols [HORNETQ,STOMP]
+09:53:23,554 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:5672 for protocols [AMQP]
+09:53:23,555 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:1883 for protocols [MQTT]
+09:53:23,556 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started Acceptor at 0.0.0.0:61613 for protocols [STOMP]
+09:53:23,556 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
+09:53:23,556 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 1.6.0 [0.0.0.0, nodeID=04fd5dd8-b18c-11e6-9efe-6a0001921ad0]
+```
+
+# Warning
+
+> Make sure you understand your network topology as this is meant to validate if your network.
+> Using IPs that could eventually disappear or be partially visible may defeat the purpose.
+> You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
index fc0cad3..b08ceb1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
@@ -278,13 +278,18 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
}
protected void crash(final ClientSession... sessions) throws Exception {
- liveServer.crash(sessions);
+ this.crash(true, sessions);
}
protected void crash(final boolean waitFailure, final ClientSession... sessions) throws Exception {
- liveServer.crash(waitFailure, sessions);
+ this.crash(true, waitFailure, sessions);
}
+ protected void crash(final boolean failover,
+ final boolean waitFailure,
+ final ClientSession... sessions) throws Exception {
+ liveServer.crash(failover, waitFailure, sessions);
+ }
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkIsolationReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkIsolationReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkIsolationReplicationTest.java
new file mode 100644
index 0000000..9b42279
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkIsolationReplicationTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NetworkIsolationReplicationTest extends FailoverTestBase {
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
+ return TransportConfigurationUtils.getNettyAcceptor(live, 1);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
+ return TransportConfigurationUtils.getNettyConnector(live, 1);
+ }
+
+ protected ClientSession createSession(ClientSessionFactory sf1,
+ boolean xa,
+ boolean autoCommitSends,
+ boolean autoCommitAcks) throws Exception {
+ return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
+ }
+
+ @Test
+ public void testDoNotActivateOnIsolation() throws Exception {
+ ServerLocator locator = getServerLocator();
+
+ backupServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("203.0.113.1"));
+
+ ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory());
+
+ ClientSession session = createSession(sf, false, true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+ Assert.assertFalse(backupServer.getServer().getNetworkHealthCheck().check());
+
+ crash(false, true, session);
+
+ for (int i = 0; i < 1000 && !backupServer.isStarted(); i++) {
+ Thread.sleep(10);
+ }
+
+ Assert.assertTrue(backupServer.isStarted());
+ Assert.assertFalse(backupServer.isActive());
+
+ liveServer.start();
+
+ for (int i = 0; i < 1000 && backupServer.getServer().getReplicationEndpoint() != null && !backupServer.getServer().getReplicationEndpoint().isStarted(); i++) {
+ Thread.sleep(10);
+ }
+
+ backupServer.getServer().getNetworkHealthCheck().clearAddresses();
+
+ // This will make sure the backup got synchronized after the network was activated again
+ Assert.assertTrue(backupServer.getServer().getReplicationEndpoint().isStarted());
+ }
+
+ @Test
+ public void testLiveIsolated() throws Exception {
+ backupServer.stop();
+
+ liveServer.stop();
+ liveServer.getServer().getConfiguration().setNetworkCheckList("203.0.113.1").
+ setNetworkCheckPeriod(100).setNetworkCheckTimeout(100);
+
+ liveServer.start();
+
+ Assert.assertEquals(100L, liveServer.getServer().getNetworkHealthCheck().getPeriod());
+
+ liveServer.getServer().getNetworkHealthCheck().setTimeUnit(TimeUnit.MILLISECONDS);
+
+ Assert.assertFalse(liveServer.getServer().getNetworkHealthCheck().check());
+
+ long timeout = System.currentTimeMillis() + 30000;
+ while (liveServer.isStarted() && System.currentTimeMillis() < timeout) {
+ Thread.sleep(100);
+ }
+
+ Assert.assertFalse(liveServer.isStarted());
+
+ liveServer.getServer().getNetworkHealthCheck().addAddress(InetAddress.getByName("127.0.0.1"));
+
+ timeout = System.currentTimeMillis() + 30000;
+ while (!liveServer.isStarted() && System.currentTimeMillis() < timeout) {
+ Thread.sleep(100);
+ }
+
+ Assert.assertTrue(liveServer.isStarted());
+ }
+
+ @Override
+ protected void createConfigs() throws Exception {
+ createReplicatedConfigs();
+ }
+
+ @Override
+ protected void crash(boolean failover, boolean waitFailure, ClientSession... sessions) throws Exception {
+ if (sessions.length > 0) {
+ for (ClientSession session : sessions) {
+ waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
+ }
+ } else {
+ waitForRemoteBackup(null, 5, true, backupServer.getServer());
+ }
+ super.crash(failover, waitFailure, sessions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/SameProcessActiveMQServer.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/SameProcessActiveMQServer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/SameProcessActiveMQServer.java
index ededc18..35e17aa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/SameProcessActiveMQServer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/SameProcessActiveMQServer.java
@@ -76,6 +76,11 @@ public class SameProcessActiveMQServer implements TestableServer {
@Override
public CountDownLatch crash(boolean waitFailure, ClientSession... sessions) throws Exception {
+ return crash(true, waitFailure, sessions);
+ }
+
+ @Override
+ public CountDownLatch crash(boolean failover, boolean waitFailure, ClientSession... sessions) throws Exception {
CountDownLatch latch = new CountDownLatch(sessions.length);
CountDownSessionFailureListener[] listeners = new CountDownSessionFailureListener[sessions.length];
for (int i = 0; i < sessions.length; i++) {
@@ -87,7 +92,7 @@ public class SameProcessActiveMQServer implements TestableServer {
clusterManager.flushExecutor();
clusterManager.clear();
Assert.assertTrue("server should be running!", server.isStarted());
- server.stop(true);
+ server.stop(failover);
if (waitFailure) {
// Wait to be informed of failure
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/402f25be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/TestableServer.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/TestableServer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/TestableServer.java
index 3c5b52a..24888fb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/TestableServer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/TestableServer.java
@@ -36,6 +36,8 @@ public interface TestableServer extends ActiveMQComponent {
CountDownLatch crash(boolean waitFailure, ClientSession... sessions) throws Exception;
+ CountDownLatch crash(boolean failover, boolean waitFailure, ClientSession... sessions) throws Exception;
+
boolean isActive();
void addInterceptor(Interceptor interceptor);