You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/27 13:58:38 UTC

flink git commit: [FLINK-3347] [akka] Add QuarantineMonitor which shuts a quarantined actor system and JVM down

Repository: flink
Updated Branches:
  refs/heads/release-1.1 ea41b9c56 -> 16e7c7867


[FLINK-3347] [akka] Add QuarantineMonitor which shuts a quarantined actor system and JVM down

The QuarantineMonitor subscribes to the actor system's event bus and listens to
AssociationErrorEvents. These are the events which are generated when the actor system
has quarantined another actor system or if it has been quarantined by another actor
system. In case of the quarantined state, the actor system will be shutdown killing
all actors and then the JVM is terminated.

Add configuration switch to enable the quarantine monitor for TaskManagers

Per default the QuarantineMonitor is disabled for TaskManagers in order to not change
the behaviour of 1.1.

Fix QuarantineMonitorTest: Replace Flink's futures by Scala's futures

This closes #2697.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16e7c786
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16e7c786
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16e7c786

Branch: refs/heads/release-1.1
Commit: 16e7c78675fcba137acd83539dabe65b84443a29
Parents: ea41b9c
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 27 00:24:12 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 27 15:58:09 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |   1 +
 .../flink/configuration/ConfigConstants.java    |  10 +
 .../runtime/akka/DefaultQuarantineHandler.java  |  76 +++++
 .../flink/runtime/akka/QuarantineHandler.java   |  46 +++
 .../flink/runtime/akka/QuarantineMonitor.java   | 100 ++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  15 +-
 .../runtime/akka/QuarantineMonitorTest.java     | 329 +++++++++++++++++++
 7 files changed, 576 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16e7c786/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 1ce6d56..7741ca7 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -191,6 +191,7 @@ The following parameters configure Flink's JobManager and TaskManagers.
 - `blob.fetch.num-concurrent`: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: **50**).
 - `blob.fetch.backlog`: The maximum number of queued BLOB fetches (such as JAR file downloads) that the JobManager allows (DEFAULT: **1000**).
 - `task.cancellation-interval`: Time interval between two successive task cancellation attempts in milliseconds (DEFAULT: **30000**).
+- `taskmanager.exit-on-fatal-akka-error`: Whether the TaskManager shall be terminated in case of a fatal Akka error (quarantining event). (DEFAULT: **false**)
 
 
 ### Distributed Coordination (via Akka)

http://git-wip-us.apache.org/repos/asf/flink/blob/16e7c786/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5deed4e..9902350 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -329,6 +329,16 @@ public final class ConfigConstants {
 	 */
 	public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
 
+	/**
+	 * Whether the quarantine monitor for task managers shall be started. The quarantine monitor
+	 * shuts down the actor system if it detects that it has quarantined another actor system
+	 * or if it has been quarantined by another actor system.
+	 *
+	 * @deprecated Only introduced in 1.1.4 to not change the default behaviour
+	 */
+	@Deprecated
+	public static final String ENABLE_QUARANTINE_MONITOR = "taskmanager.exit-on-fatal-akka-error";
+
 	
 	// ------------------------ YARN Configuration ------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/16e7c786/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
new file mode 100644
index 0000000..378cb25
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Default quarantine handler which logs the quarantining events, then shuts down the given
+ * actor system by sending Kill to all actors and then shutting the JVM down with the given
+ * exit code.
+ */
+public class DefaultQuarantineHandler implements QuarantineHandler {
+
+	private final FiniteDuration timeout;
+	private final int exitCode;
+	private final Logger log;
+
+	public DefaultQuarantineHandler(Time timeout, int exitCode, Logger log) {
+		Preconditions.checkNotNull(timeout);
+		this.timeout = new FiniteDuration(timeout.getSize(), timeout.getUnit());
+		this.exitCode = exitCode;
+		this.log = Preconditions.checkNotNull(log);
+	}
+
+	@Override
+	public void wasQuarantinedBy(String remoteSystem, ActorSystem actorSystem) {
+		Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+		log.error("The actor system {} has been quarantined by {}. Shutting the actor system " +
+			"down to be able to reestablish a connection!", actorSystemAddress, remoteSystem);
+
+		shutdownActorSystem(actorSystem);
+	}
+
+	@Override
+	public void hasQuarantined(String remoteSystem, ActorSystem actorSystem) {
+		Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+		log.error("The actor system {} has quarantined the remote actor system {}. Shutting " +
+			"the actor system down to be able to reestablish a connection!", actorSystemAddress, remoteSystem);
+
+		shutdownActorSystem(actorSystem);
+	}
+
+	private void shutdownActorSystem(ActorSystem actorSystem) {
+		// shut the actor system down
+		actorSystem.shutdown();
+
+		try {
+			// give it some time to complete the shutdown
+			actorSystem.awaitTermination(timeout);
+		} finally {
+			// now let's crash the JVM
+			System.exit(exitCode);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/16e7c786/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
new file mode 100644
index 0000000..21623e8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorSystem;
+
+/**
+ * Callback interface for the {@link QuarantineMonitor} which is called in case the actor system
+ * has been quarantined or quarantined another system.
+ */
+public interface QuarantineHandler {
+
+	/**
+	 * Callback when the given actor system was quarantined by the given remote actor system.
+	 *
+	 * @param remoteSystem is the address of the remote actor system which has quarantined this
+	 *                     actor system
+	 * @param actorSystem which has been quarantined
+	 */
+	void wasQuarantinedBy(final String remoteSystem, final ActorSystem actorSystem);
+
+	/**
+	 * Callback when the given actor system has quarantined the given remote actor system.
+	 *
+	 * @param remoteSystem is the address of the remote actor system which has been quarantined
+	 *                     by our actor system
+	 * @param actorSystem which has quarantined the other actor system
+	 */
+	void hasQuarantined(final String remoteSystem, final ActorSystem actorSystem);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/16e7c786/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
new file mode 100644
index 0000000..de82f29
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.UntypedActor;
+import akka.remote.AssociationErrorEvent;
+import akka.remote.transport.Transport;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The quarantine monitor subscribes to the event bus of the actor system in which it was started.
+ * It listens to {@link AssociationErrorEvent} which contain information if we got quarantined
+ * or quarantine another remote actor system. If the actor detects that the actor system has been
+ * quarantined or quarantined another system, then the {@link QuarantineHandler} is called.
+ *
+ * IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version the
+ * quarantine state might be detected differently.
+ */
+public class QuarantineMonitor extends UntypedActor {
+
+	private static final Pattern pattern = Pattern.compile("^Invalid address:\\s+(.*)$");
+
+	private static final String QUARANTINE_MSG = "The remote system has a UID that has been quarantined. Association aborted.";
+	private static final String QUARANTINED_MSG = "The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.";
+
+	private final QuarantineHandler handler;
+	private final Logger log;
+
+	public QuarantineMonitor(QuarantineHandler handler, Logger log) {
+		this.handler = Preconditions.checkNotNull(handler);
+		this.log = Preconditions.checkNotNull(log);
+	}
+
+	@Override
+	public void preStart() {
+		getContext().system().eventStream().subscribe(getSelf(), AssociationErrorEvent.class);
+	}
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		if (message instanceof AssociationErrorEvent) {
+			AssociationErrorEvent associationErrorEvent = (AssociationErrorEvent) message;
+
+			// IMPORTANT: The check for the quarantining event is highly specific to Akka 2.3.7
+			// and can change with a different Akka version.
+			// It assumes the following:
+			// AssociationErrorEvent(InvalidAssociation(InvalidAssociationException(QUARANTINE(D)_MSG))
+			if (associationErrorEvent.getCause() != null) {
+				Throwable invalidAssociation = associationErrorEvent.getCause();
+				Matcher matcher = pattern.matcher(invalidAssociation.getMessage());
+
+				final String remoteSystem;
+
+				if (matcher.find()) {
+					remoteSystem = matcher.group(1);
+				} else {
+					remoteSystem = "Unknown";
+				}
+
+				if (invalidAssociation.getCause() instanceof Transport.InvalidAssociationException) {
+					Transport.InvalidAssociationException invalidAssociationException = (Transport.InvalidAssociationException) invalidAssociation.getCause();
+
+					// don't hate the player, hate the game! That's the only way to find out if we
+					// got quarantined or quarantined another actor system in Akka 2.3.7
+					if (QUARANTINE_MSG.equals(invalidAssociationException.getMessage())) {
+						handler.hasQuarantined(remoteSystem, getContext().system());
+					} else if (QUARANTINED_MSG.equals(invalidAssociationException.getMessage())) {
+						handler.wasQuarantinedBy(remoteSystem, getContext().system());
+					} else {
+						log.debug("The invalid association exception's message could not be matched.", associationErrorEvent);
+					}
+				} else {
+					log.debug("The association error event's root cause is not of type {}.", Transport.InvalidAssociationException.class.getSimpleName(), associationErrorEvent);
+				}
+			} else {
+				log.debug("Received association error event which did not contain a cause.", associationErrorEvent);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/16e7c786/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index dc2780b..5ae52be 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -36,6 +36,7 @@ import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
 import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
@@ -43,7 +44,7 @@ import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor}
 import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
@@ -1734,6 +1735,18 @@ object TaskManager {
         Props(classOf[ProcessReaper], taskManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE),
         "TaskManager_Process_Reaper")
 
+      if (configuration.getBoolean(ConfigConstants.ENABLE_QUARANTINE_MONITOR, false)) {
+        val quarantineHandler = new DefaultQuarantineHandler(
+          Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis),
+          RUNTIME_FAILURE_RETURN_CODE,
+          LOG.logger)
+
+        LOG.debug("Starting TaskManager quarantine monitor")
+        taskManagerSystem.actorOf(
+          Props(classOf[QuarantineMonitor], quarantineHandler, LOG.logger)
+        )
+      }
+
       // if desired, start the logging daemon that periodically logs the
       // memory usage information
       if (LOG.isInfoEnabled && configuration.getBoolean(

http://git-wip-us.apache.org/repos/asf/flink/blob/16e7c786/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
new file mode 100644
index 0000000..27d0f05
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.OnComplete;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class QuarantineMonitorTest extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(QuarantineMonitorTest.class);
+
+	private static final FiniteDuration zeroDelay = new FiniteDuration(0L, TimeUnit.SECONDS);
+
+	// we need two actor systems because we're quarantining one of them
+	private static ActorSystem actorSystem1;
+	private ActorSystem actorSystem2;
+
+	@BeforeClass
+	public static void setup() {
+		Properties properties = new Properties();
+		properties.setProperty("akka.remote.watch-failure-detector.threshold", "0.00001");
+		properties.setProperty("akka.remote.watch-failure-detector.heartbeat-interval", "1 ms");
+		properties.setProperty("akka.remote.watch-failure-detector.acceptable-heartbeat-pause", "1 ms");
+		Config deathWatch = ConfigFactory.parseProperties(properties);
+		Config defaultConfig = AkkaUtils.getDefaultAkkaConfig();
+
+		actorSystem1 = AkkaUtils.createActorSystem(deathWatch.withFallback(defaultConfig));
+	}
+
+	@AfterClass
+	public static void tearDown() {
+		if (actorSystem1 != null) {
+			actorSystem1.shutdown();
+			actorSystem1.awaitTermination();
+		}
+	}
+
+	@Before
+	public void setupTest() {
+		actorSystem2 = AkkaUtils.createDefaultActorSystem();
+	}
+
+	@After
+	public void tearDownTest() {
+		if (actorSystem2 != null) {
+			actorSystem2.shutdown();
+			actorSystem2.awaitTermination();
+		}
+	}
+
+	/**
+	 * Tests that the quarantine monitor detects if an actor system has been quarantined by another
+	 * actor system.
+	 */
+	@Test(timeout = 5000L)
+	public void testWatcheeQuarantined() throws Exception {
+		TestingQuarantineHandler quarantineHandler = new TestingQuarantineHandler();
+
+		ActorRef watchee = null;
+		ActorRef watcher = null;
+		ActorRef monitor = null;
+
+		FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration interval = new FiniteDuration(200, TimeUnit.MILLISECONDS);
+
+		try {
+			// start the quarantine monitor in the watchee actor system
+			monitor = actorSystem2.actorOf(getQuarantineMonitorProps(quarantineHandler), "quarantineMonitor");
+
+			watchee = actorSystem2.actorOf(getWatcheeProps(timeout, interval, quarantineHandler), "watchee");
+			watcher = actorSystem1.actorOf(getWatcherProps(timeout, interval, quarantineHandler), "watcher");
+
+			final Address actorSystem1Address = AkkaUtils.getAddress(actorSystem1);
+			final String watcheeAddress = AkkaUtils.getAkkaURL(actorSystem2, watchee);
+			final String watcherAddress = AkkaUtils.getAkkaURL(actorSystem1, watcher);
+
+			// ping the watcher continuously
+			watchee.tell(new Ping(watcherAddress), ActorRef.noSender());
+			// start watching the watchee
+			watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
+
+			Future<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
+
+			final String quarantineSystem = Await.result(quarantineFuture, timeout);
+
+			Assert.assertEquals(actorSystem1Address.toString(), quarantineSystem);
+		} finally {
+			TestingUtils.stopActor(watchee);
+			TestingUtils.stopActor(watcher);
+			TestingUtils.stopActor(monitor);
+		}
+	}
+
+	/**
+	 * Tests that the quarantine monitor detects if an actor system quarantines another actor
+	 * system.
+	 */
+	@Test(timeout = 5000L)
+	public void testWatcherQuarantining() throws Exception {
+		TestingQuarantineHandler quarantineHandler = new TestingQuarantineHandler();
+
+		ActorRef watchee = null;
+		ActorRef watcher = null;
+		ActorRef monitor = null;
+
+		FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration interval = new FiniteDuration(200, TimeUnit.MILLISECONDS);
+
+		try {
+			// start the quarantine monitor in the watcher actor system
+			monitor = actorSystem1.actorOf(getQuarantineMonitorProps(quarantineHandler), "quarantineMonitor");
+
+			watchee = actorSystem2.actorOf(getWatcheeProps(timeout, interval, quarantineHandler), "watchee");
+			watcher = actorSystem1.actorOf(getWatcherProps(timeout, interval, quarantineHandler), "watcher");
+
+			final Address actorSystem1Address = AkkaUtils.getAddress(actorSystem2);
+			final String watcheeAddress = AkkaUtils.getAkkaURL(actorSystem2, watchee);
+			final String watcherAddress = AkkaUtils.getAkkaURL(actorSystem1, watcher);
+
+			// ping the watcher continuously
+			watchee.tell(new Ping(watcherAddress), ActorRef.noSender());
+			// start watching the watchee
+			watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
+
+			Future<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
+
+			final String quarantineSystem = Await.result(quarantineFuture, timeout);
+
+			Assert.assertEquals(actorSystem1Address.toString(), quarantineSystem);
+		} finally {
+			TestingUtils.stopActor(watchee);
+			TestingUtils.stopActor(watcher);
+			TestingUtils.stopActor(monitor);
+		}
+	}
+
+	private static class TestingQuarantineHandler implements QuarantineHandler, ErrorHandler {
+
+		private final Promise<String> wasQuarantinedByFuture;
+		private final Promise<String> hasQuarantinedFuture;
+
+		public TestingQuarantineHandler() {
+			this.wasQuarantinedByFuture = new scala.concurrent.impl.Promise.DefaultPromise<>();
+			this.hasQuarantinedFuture = new scala.concurrent.impl.Promise.DefaultPromise<>();
+		}
+
+		@Override
+		public void wasQuarantinedBy(String remoteSystem, ActorSystem actorSystem) {
+			wasQuarantinedByFuture.success(remoteSystem);
+		}
+
+		@Override
+		public void hasQuarantined(String remoteSystem, ActorSystem actorSystem) {
+			hasQuarantinedFuture.success(remoteSystem);
+		}
+
+		public Future<String> getWasQuarantinedByFuture() {
+			return wasQuarantinedByFuture.future();
+		}
+
+		public Future<String> getHasQuarantinedFuture() {
+			return hasQuarantinedFuture.future();
+		}
+
+		@Override
+		public void handleError(Throwable failure) {
+			wasQuarantinedByFuture.failure(failure);
+			hasQuarantinedFuture.failure(failure);
+		}
+	}
+
+	private interface ErrorHandler {
+		void handleError(Throwable failure);
+	}
+
+	static class Watcher extends UntypedActor {
+
+		private final FiniteDuration timeout;
+		private final FiniteDuration interval;
+		private final ErrorHandler errorHandler;
+
+		Watcher(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
+			this.timeout = Preconditions.checkNotNull(timeout);
+			this.interval = Preconditions.checkNotNull(interval);
+			this.errorHandler = Preconditions.checkNotNull(errorHandler);
+		}
+
+		@Override
+		public void onReceive(Object message) throws Exception {
+			if (message instanceof Watch) {
+				Watch watch = (Watch) message;
+
+				getContext().actorSelection(watch.getTarget()).resolveOne(timeout).onComplete(new OnComplete<ActorRef>() {
+					@Override
+					public void onComplete(Throwable failure, ActorRef success) throws Throwable {
+						if (success != null) {
+							getContext().watch(success);
+							// constantly ping the watchee
+							getContext().system().scheduler().schedule(
+								zeroDelay,
+								interval,
+								success,
+								"Watcher message",
+								getContext().dispatcher(),
+								getSelf());
+						} else {
+							errorHandler.handleError(failure);
+						}
+					}
+				}, getContext().dispatcher());
+			}
+		}
+	}
+
+	static class Watchee extends UntypedActor {
+
+		private final FiniteDuration timeout;
+		private final FiniteDuration interval;
+		private final ErrorHandler errorHandler;
+
+		Watchee(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
+			this.timeout = Preconditions.checkNotNull(timeout);
+			this.interval = Preconditions.checkNotNull(interval);
+			this.errorHandler = Preconditions.checkNotNull(errorHandler);
+		}
+
+		@Override
+		public void onReceive(Object message) throws Exception {
+			if (message instanceof Ping) {
+				final Ping ping = (Ping) message;
+
+				getContext().actorSelection(ping.getTarget()).resolveOne(timeout).onComplete(new OnComplete<ActorRef>() {
+					@Override
+					public void onComplete(Throwable failure, ActorRef success) throws Throwable {
+						if (success != null) {
+							// constantly ping the target
+							getContext().system().scheduler().schedule(
+								zeroDelay,
+								interval,
+								success,
+								"Watchee message",
+								getContext().dispatcher(),
+								getSelf());
+						} else {
+							errorHandler.handleError(failure);
+						}
+					}
+				}, getContext().dispatcher());
+			}
+		}
+	}
+
+	static class Watch {
+		private final String target;
+
+		Watch(String target) {
+			this.target = target;
+		}
+
+		public String getTarget() {
+			return target;
+		}
+	}
+
+	static class Ping {
+		private final String target;
+
+		Ping(String target) {
+			this.target = target;
+		}
+
+		public String getTarget() {
+			return target;
+		}
+	}
+
+	static Props getWatcheeProps(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
+		return Props.create(Watchee.class, timeout, interval, errorHandler);
+	}
+
+	static Props getWatcherProps(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
+		return Props.create(Watcher.class, timeout, interval, errorHandler);
+	}
+
+	static Props getQuarantineMonitorProps(QuarantineHandler handler) {
+		return Props.create(QuarantineMonitor.class, handler, LOG);
+	}
+
+}