You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/21 00:29:19 UTC

svn commit: r1303208 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/ha/ src/main/java/org/apache/hadoop/ha/protocolPB/ src/test/java/org/apache/hadoop/ha/

Author: todd
Date: Tue Mar 20 23:29:18 2012
New Revision: 1303208

URL: http://svn.apache.org/viewvc?rev=1303208&view=rev
Log:
HADOOP-7788. Add simple HealthMonitor class to watch an HAService. Contributed by Todd Lipcon.

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1303208&r1=1303207&r2=1303208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Tue Mar 20 23:29:18 2012
@@ -107,6 +107,8 @@ Trunk (unreleased changes)
     HADOOP-8141. Add method to SecurityUtil to init krb5 cipher suites.
     (todd)
 
+    HADOOP-7788. Add simple HealthMonitor class to watch an HAService (todd)
+
   OPTIMIZATIONS
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1303208&r1=1303207&r2=1303208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Tue Mar 20 23:29:18 2012
@@ -121,5 +121,29 @@ public class CommonConfigurationKeys ext
       "hadoop.security.token.service.use_ip";
   public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT =
       true;
+  
+  /**
+   * HA health monitor and failover controller.
+   */
+ 
+  /** How often to retry connecting to the service. */
+  public static final String HA_HM_CONNECT_RETRY_INTERVAL_KEY =
+    "ha.health-monitor.connect-retry-interval.ms";
+  public static final long HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT = 1000;
+ 
+  /* How often to check the service. */
+  public static final String HA_HM_CHECK_INTERVAL_KEY =
+    "ha.health-monitor.check-interval.ms";  
+  public static final long HA_HM_CHECK_INTERVAL_DEFAULT = 1000;
+ 
+  /* How long to sleep after an unexpected RPC error. */
+  public static final String HA_HM_SLEEP_AFTER_DISCONNECT_KEY =
+    "ha.health-monitor.sleep-after-disconnect.ms";
+  public static final long HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT = 1000;
+ 
+  /* Timeout for the actual monitorHealth() calls. */
+  public static final String HA_HM_RPC_TIMEOUT_KEY =
+    "ha.health-monitor.rpc-timeout.ms";
+  public static final int HA_HM_RPC_TIMEOUT_DEFAULT = 45000;
 }
 

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1303208&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Tue Mar 20 23:29:18 2012
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.*;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Daemon;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is a daemon which runs in a loop, periodically heartbeating
+ * with an HA service. It is responsible for keeping track of that service's
+ * health and exposing callbacks to the failover controller when the health
+ * status changes.
+ * 
+ * Classes which need callbacks should implement the {@link Callback}
+ * interface.
+ */
+class HealthMonitor {
+  private static final Log LOG = LogFactory.getLog(
+      HealthMonitor.class);
+
+  private Daemon daemon;
+  private long connectRetryInterval;
+  private long checkIntervalMillis;
+  private long sleepAfterDisconnectMillis;
+
+  private int rpcTimeout;
+
+  private volatile boolean shouldRun = true;
+
+  /** The connected proxy */
+  private HAServiceProtocol proxy;
+
+  /** The address running the HA Service */
+  private final InetSocketAddress addrToMonitor;
+
+  private final Configuration conf;
+  
+  private State state = State.INITIALIZING;
+
+  /**
+   * Listeners for state changes
+   */
+  private List<Callback> callbacks = Collections.synchronizedList(
+      new LinkedList<Callback>());
+
+  private HAServiceState lastServiceState = HAServiceState.INITIALIZING;
+  
+  enum State {
+    /**
+     * The health monitor is still starting up.
+     */
+    INITIALIZING,
+
+    /**
+     * The service is not responding to health check RPCs.
+     */
+    SERVICE_NOT_RESPONDING,
+
+    /**
+     * The service is connected and healthy.
+     */
+    SERVICE_HEALTHY,
+    
+    /**
+     * The service is running but unhealthy.
+     */
+    SERVICE_UNHEALTHY,
+    
+    /**
+     * The health monitor itself failed unrecoverably and can
+     * no longer provide accurate information.
+     */
+    HEALTH_MONITOR_FAILED;
+  }
+
+
+  HealthMonitor(Configuration conf, InetSocketAddress addrToMonitor) {
+    this.conf = conf;
+    this.addrToMonitor = addrToMonitor;
+    
+    this.sleepAfterDisconnectMillis = conf.getLong(
+        HA_HM_SLEEP_AFTER_DISCONNECT_KEY,
+        HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT);
+    this.checkIntervalMillis = conf.getLong(
+        HA_HM_CHECK_INTERVAL_KEY,
+        HA_HM_CHECK_INTERVAL_DEFAULT);
+    this.connectRetryInterval = conf.getLong(
+        HA_HM_CONNECT_RETRY_INTERVAL_KEY,
+        HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT);
+    this.rpcTimeout = conf.getInt(
+        HA_HM_RPC_TIMEOUT_KEY,
+        HA_HM_RPC_TIMEOUT_DEFAULT);
+    
+    this.daemon = new MonitorDaemon();
+  }
+  
+  public void addCallback(Callback cb) {
+    this.callbacks.add(cb);
+  }
+  
+  public void removeCallback(Callback cb) {
+    callbacks.remove(cb);
+  }
+  
+  public void shutdown() {
+    LOG.info("Stopping HealthMonitor thread");
+    shouldRun = false;
+    daemon.interrupt();
+  }
+
+  /**
+   * @return the current proxy object to the underlying service.
+   * Note that this may return null in the case that the service
+   * is not responding. Also note that, even if the last indicated
+   * state is healthy, the service may have gone down in the meantime.
+   */
+  public synchronized HAServiceProtocol getProxy() {
+    return proxy;
+  }
+  
+  private void loopUntilConnected() throws InterruptedException {
+    tryConnect();
+    while (proxy == null) {
+      Thread.sleep(connectRetryInterval);
+      tryConnect();
+    }
+    assert proxy != null;
+  }
+
+  private void tryConnect() {
+    Preconditions.checkState(proxy == null);
+    
+    try {
+      synchronized (this) {
+        proxy = createProxy();
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not connect to local service at " + addrToMonitor +
+          ": " + e.getMessage());
+      proxy = null;
+      enterState(State.SERVICE_NOT_RESPONDING);
+    }
+  }
+  
+  /**
+   * Connect to the service to be monitored. Stubbed out for easier testing.
+   */
+  protected HAServiceProtocol createProxy() throws IOException {
+    SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    return new HAServiceProtocolClientSideTranslatorPB(
+        addrToMonitor,
+        conf, socketFactory, rpcTimeout);
+  }
+
+  private void doHealthChecks() throws InterruptedException {
+    while (shouldRun) {
+      HAServiceState state = null;
+      boolean healthy = false;
+      try {
+        state = proxy.getServiceState();
+        proxy.monitorHealth();
+        healthy = true;
+      } catch (HealthCheckFailedException e) {
+        LOG.warn("Service health check failed: " + e.getMessage());
+        enterState(State.SERVICE_UNHEALTHY);
+      } catch (Throwable t) {
+        LOG.warn("Transport-level exception trying to monitor health of " +
+            addrToMonitor + ": " + t.getLocalizedMessage());
+        RPC.stopProxy(proxy);
+        proxy = null;
+        enterState(State.SERVICE_NOT_RESPONDING);
+        Thread.sleep(sleepAfterDisconnectMillis);
+        return;
+      }
+      
+      if (state != null) {
+        setLastServiceState(state);
+      }
+      if (healthy) {
+        enterState(State.SERVICE_HEALTHY);
+      }
+
+      Thread.sleep(checkIntervalMillis);
+    }
+  }
+  
+  private synchronized void setLastServiceState(HAServiceState serviceState) {
+    this.lastServiceState = serviceState;
+  }
+
+  private synchronized void enterState(State newState) {
+    if (newState != state) {
+      LOG.info("Entering state " + newState);
+      state = newState;
+      synchronized (callbacks) {
+        for (Callback cb : callbacks) {
+          cb.enteredState(newState);
+        }
+      }
+    }
+  }
+
+  synchronized State getHealthState() {
+    return state;
+  }
+  
+  synchronized HAServiceState getLastServiceState() {
+    return lastServiceState;
+  }
+  
+  boolean isAlive() {
+    return daemon.isAlive();
+  }
+
+  void join() throws InterruptedException {
+    daemon.join();
+  }
+
+  void start() {
+    daemon.start();
+  }
+
+  private class MonitorDaemon extends Daemon {
+    private MonitorDaemon() {
+      super();
+      setName("Health Monitor for " + addrToMonitor);
+      setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+          LOG.fatal("Health monitor failed", e);
+          enterState(HealthMonitor.State.HEALTH_MONITOR_FAILED);
+        }
+      });
+    }
+    
+    @Override
+    public void run() {
+      while (shouldRun) {
+        try { 
+          loopUntilConnected();
+          doHealthChecks();
+        } catch (InterruptedException ie) {
+          Preconditions.checkState(!shouldRun,
+              "Interrupted but still supposed to run");
+        }
+      }
+    }
+  }
+  
+  /**
+   * Callback interface for state change events.
+   * 
+   * This interface is called from a single thread which also performs
+   * the health monitoring. If the callback processing takes a long time,
+   * no further health checks will be made during this period, nor will
+   * other registered callbacks be called.
+   * 
+   * If the callback itself throws an unchecked exception, no other
+   * callbacks following it will be called, and the health monitor
+   * will terminate, entering HEALTH_MONITOR_FAILED state.
+   */
+  static interface Callback {
+    void enteredState(State newState);
+  }
+
+  /**
+   * Simple main() for testing.
+   */
+  public static void main(String[] args) throws InterruptedException {
+    if (args.length != 1) {
+      System.err.println("Usage: " + HealthMonitor.class.getName() +
+          " <addr to monitor>");
+      System.exit(1);
+    }
+    Configuration conf = new Configuration();
+    
+    String target = args[0];
+    InetSocketAddress addr = NetUtils.createSocketAddr(target);
+    
+    HealthMonitor hm = new HealthMonitor(conf, addr);
+    hm.start();
+    hm.join();
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java?rev=1303208&r1=1303207&r2=1303208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java Tue Mar 20 23:29:18 2012
@@ -21,6 +21,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import javax.net.SocketFactory;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -34,7 +36,9 @@ import org.apache.hadoop.ha.proto.HAServ
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
@@ -47,7 +51,7 @@ import com.google.protobuf.ServiceExcept
 @InterfaceAudience.Private
 @InterfaceStability.Stable
 public class HAServiceProtocolClientSideTranslatorPB implements
-    HAServiceProtocol, Closeable {
+    HAServiceProtocol, Closeable, ProtocolTranslator {
   /** RpcController is not used and hence is set to null */
   private final static RpcController NULL_CONTROLLER = null;
   private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ = 
@@ -71,6 +75,16 @@ public class HAServiceProtocolClientSide
         RPC.getProtocolVersion(HAServiceProtocolPB.class), addr, conf);
   }
   
+  public HAServiceProtocolClientSideTranslatorPB(
+      InetSocketAddress addr, Configuration conf,
+      SocketFactory socketFactory, int timeout) throws IOException {
+    RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
+        ProtobufRpcEngine.class);
+    rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
+        RPC.getProtocolVersion(HAServiceProtocolPB.class), addr,
+        UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
+  }
+
   @Override
   public void monitorHealth() throws IOException {
     try {
@@ -132,4 +146,9 @@ public class HAServiceProtocolClientSide
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
 }

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java?rev=1303208&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java Tue Mar 20 23:29:18 2012
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.ha.HealthMonitor.Callback;
+import org.apache.hadoop.ha.HealthMonitor.State;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestHealthMonitor {
+  private static final Log LOG = LogFactory.getLog(
+      TestHealthMonitor.class);
+  
+  /* bogus address to pass to constructor - never used */
+  private static final InetSocketAddress BOGUS_ADDR =
+    new InetSocketAddress(1);
+
+  private HAServiceProtocol mockProxy;
+
+  /** How many times has createProxy been called */
+  private volatile CountDownLatch createProxyLatch;
+
+  /** Should throw an IOE when trying to connect */
+  private volatile boolean shouldThrowOnCreateProxy = false;
+
+  private HealthMonitor hm;
+  
+  @Before
+  public void setupHM() throws InterruptedException, IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+    conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
+    conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
+    conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
+    mockProxy = Mockito.mock(HAServiceProtocol.class);
+    Mockito.doReturn(HAServiceState.ACTIVE)
+      .when(mockProxy).getServiceState();
+    
+    hm = new HealthMonitor(conf, BOGUS_ADDR) {
+      @Override
+      protected HAServiceProtocol createProxy() throws IOException {
+        createProxyLatch.countDown();
+        if (shouldThrowOnCreateProxy) {
+          throw new IOException("can't connect");
+        }
+        return mockProxy;
+      }
+    };
+
+    createProxyLatch = new CountDownLatch(1);
+
+    LOG.info("Starting health monitor");
+    hm.start();
+    
+    LOG.info("Waiting for proxy to be created");
+    assertTrue(createProxyLatch.await(2000, TimeUnit.MILLISECONDS));
+    createProxyLatch = null;
+    
+    LOG.info("Waiting for HEALTHY signal");    
+    waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
+  }
+  
+  @Test(timeout=15000)
+  public void testMonitor() throws Exception {
+    LOG.info("Mocking bad health check, waiting for UNHEALTHY");
+    Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
+      .when(mockProxy).monitorHealth();
+    waitForState(hm, HealthMonitor.State.SERVICE_UNHEALTHY);
+    
+    LOG.info("Returning to healthy state, waiting for HEALTHY");
+    Mockito.doNothing().when(mockProxy).monitorHealth();
+    waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
+
+    LOG.info("Returning an IOException, as if node went down");
+    // should expect many rapid retries
+    createProxyLatch = new CountDownLatch(3);
+    shouldThrowOnCreateProxy = true;
+    Mockito.doThrow(new IOException("Connection lost (fake)"))
+      .when(mockProxy).monitorHealth();
+    waitForState(hm, HealthMonitor.State.SERVICE_NOT_RESPONDING);
+    assertTrue("Monitor should retry if createProxy throws an IOE",
+        createProxyLatch.await(1000, TimeUnit.MILLISECONDS));
+    
+    LOG.info("Returning to healthy state, waiting for HEALTHY");
+    shouldThrowOnCreateProxy = false;
+    Mockito.doNothing().when(mockProxy).monitorHealth();
+    waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
+    
+    hm.shutdown();
+    hm.join();
+    assertFalse(hm.isAlive());
+  }
+
+  /**
+   * Test that the proper state is propagated when the health monitor
+   * sees an uncaught exception in its thread.
+   */
+  @Test(timeout=15000)
+  public void testHealthMonitorDies() throws Exception {
+    LOG.info("Mocking RTE in health monitor, waiting for FAILED");
+    Mockito.doThrow(new OutOfMemoryError())
+      .when(mockProxy).monitorHealth();
+    waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
+    hm.shutdown();
+    hm.join();
+    assertFalse(hm.isAlive());
+  }
+  
+  /**
+   * Test that, if the callback throws an RTE, this will terminate the
+   * health monitor and thus change its state to FAILED
+   * @throws Exception
+   */
+  @Test(timeout=15000)
+  public void testCallbackThrowsRTE() throws Exception {
+    hm.addCallback(new Callback() {
+      @Override
+      public void enteredState(State newState) {
+        throw new RuntimeException("Injected RTE");
+      }
+    });
+    LOG.info("Mocking bad health check, waiting for UNHEALTHY");
+    Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
+      .when(mockProxy).monitorHealth();
+    waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
+  }
+
+  private void waitForState(HealthMonitor hm, State state)
+      throws InterruptedException {
+    long st = System.currentTimeMillis();
+    while (System.currentTimeMillis() - st < 2000) {
+      if (hm.getHealthState() == state) {
+        return;
+      }
+      Thread.sleep(50);
+    }
+    assertEquals(state, hm.getHealthState());
+  }
+}