You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/21 00:27:43 UTC
svn commit: r1303207 - in
/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/ha/
src/main/java/org/apache/hadoop/ha/protocolPB/
src/test/java/org/apache/hadoop...
Author: todd
Date: Tue Mar 20 23:27:42 2012
New Revision: 1303207
URL: http://svn.apache.org/viewvc?rev=1303207&view=rev
Log:
HADOOP-7788. Add simple HealthMonitor class to watch an HAService. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1303207&r1=1303206&r2=1303207&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Tue Mar 20 23:27:42 2012
@@ -96,6 +96,8 @@ Release 0.23.3 - UNRELEASED
HADOOP-8183. Stop using "mapred.used.genericoptions.parser" (harsh)
+ HADOOP-7788. Add simple HealthMonitor class to watch an HAService (todd)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1303207&r1=1303206&r2=1303207&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Tue Mar 20 23:27:42 2012
@@ -121,5 +121,29 @@ public class CommonConfigurationKeys ext
"hadoop.security.token.service.use_ip";
public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT =
true;
+
+ /**
+ * HA health monitor and failover controller.
+ */
+
+ /** How often to retry connecting to the service. */
+ public static final String HA_HM_CONNECT_RETRY_INTERVAL_KEY =
+ "ha.health-monitor.connect-retry-interval.ms";
+ public static final long HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT = 1000;
+
+ /* How often to check the service. */
+ public static final String HA_HM_CHECK_INTERVAL_KEY =
+ "ha.health-monitor.check-interval.ms";
+ public static final long HA_HM_CHECK_INTERVAL_DEFAULT = 1000;
+
+ /* How long to sleep after an unexpected RPC error. */
+ public static final String HA_HM_SLEEP_AFTER_DISCONNECT_KEY =
+ "ha.health-monitor.sleep-after-disconnect.ms";
+ public static final long HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT = 1000;
+
+ /* Timeout for the actual monitorHealth() calls. */
+ public static final String HA_HM_RPC_TIMEOUT_KEY =
+ "ha.health-monitor.rpc-timeout.ms";
+ public static final int HA_HM_RPC_TIMEOUT_DEFAULT = 45000;
}
Added: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1303207&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Tue Mar 20 23:27:42 2012
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.*;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Daemon;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is a daemon which runs in a loop, periodically heartbeating
+ * with an HA service. It is responsible for keeping track of that service's
+ * health and exposing callbacks to the failover controller when the health
+ * status changes.
+ *
+ * Classes which need callbacks should implement the {@link Callback}
+ * interface.
+ */
+class HealthMonitor {
+ private static final Log LOG = LogFactory.getLog(
+ HealthMonitor.class);
+
+ private Daemon daemon;
+ private long connectRetryInterval;
+ private long checkIntervalMillis;
+ private long sleepAfterDisconnectMillis;
+
+ private int rpcTimeout;
+
+ private volatile boolean shouldRun = true;
+
+ /** The connected proxy */
+ private HAServiceProtocol proxy;
+
+ /** The address running the HA Service */
+ private final InetSocketAddress addrToMonitor;
+
+ private final Configuration conf;
+
+ private State state = State.INITIALIZING;
+
+ /**
+ * Listeners for state changes
+ */
+ private List<Callback> callbacks = Collections.synchronizedList(
+ new LinkedList<Callback>());
+
+ private HAServiceState lastServiceState = HAServiceState.INITIALIZING;
+
+ enum State {
+ /**
+ * The health monitor is still starting up.
+ */
+ INITIALIZING,
+
+ /**
+ * The service is not responding to health check RPCs.
+ */
+ SERVICE_NOT_RESPONDING,
+
+ /**
+ * The service is connected and healthy.
+ */
+ SERVICE_HEALTHY,
+
+ /**
+ * The service is running but unhealthy.
+ */
+ SERVICE_UNHEALTHY,
+
+ /**
+ * The health monitor itself failed unrecoverably and can
+ * no longer provide accurate information.
+ */
+ HEALTH_MONITOR_FAILED;
+ }
+
+
+ HealthMonitor(Configuration conf, InetSocketAddress addrToMonitor) {
+ this.conf = conf;
+ this.addrToMonitor = addrToMonitor;
+
+ this.sleepAfterDisconnectMillis = conf.getLong(
+ HA_HM_SLEEP_AFTER_DISCONNECT_KEY,
+ HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT);
+ this.checkIntervalMillis = conf.getLong(
+ HA_HM_CHECK_INTERVAL_KEY,
+ HA_HM_CHECK_INTERVAL_DEFAULT);
+ this.connectRetryInterval = conf.getLong(
+ HA_HM_CONNECT_RETRY_INTERVAL_KEY,
+ HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT);
+ this.rpcTimeout = conf.getInt(
+ HA_HM_RPC_TIMEOUT_KEY,
+ HA_HM_RPC_TIMEOUT_DEFAULT);
+
+ this.daemon = new MonitorDaemon();
+ }
+
+ public void addCallback(Callback cb) {
+ this.callbacks.add(cb);
+ }
+
+ public void removeCallback(Callback cb) {
+ callbacks.remove(cb);
+ }
+
+ public void shutdown() {
+ LOG.info("Stopping HealthMonitor thread");
+ shouldRun = false;
+ daemon.interrupt();
+ }
+
+ /**
+ * @return the current proxy object to the underlying service.
+ * Note that this may return null in the case that the service
+ * is not responding. Also note that, even if the last indicated
+ * state is healthy, the service may have gone down in the meantime.
+ */
+ public synchronized HAServiceProtocol getProxy() {
+ return proxy;
+ }
+
+ private void loopUntilConnected() throws InterruptedException {
+ tryConnect();
+ while (proxy == null) {
+ Thread.sleep(connectRetryInterval);
+ tryConnect();
+ }
+ assert proxy != null;
+ }
+
+ private void tryConnect() {
+ Preconditions.checkState(proxy == null);
+
+ try {
+ synchronized (this) {
+ proxy = createProxy();
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not connect to local service at " + addrToMonitor +
+ ": " + e.getMessage());
+ proxy = null;
+ enterState(State.SERVICE_NOT_RESPONDING);
+ }
+ }
+
+ /**
+ * Connect to the service to be monitored. Stubbed out for easier testing.
+ */
+ protected HAServiceProtocol createProxy() throws IOException {
+ SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ return new HAServiceProtocolClientSideTranslatorPB(
+ addrToMonitor,
+ conf, socketFactory, rpcTimeout);
+ }
+
+ private void doHealthChecks() throws InterruptedException {
+ while (shouldRun) {
+ HAServiceState state = null;
+ boolean healthy = false;
+ try {
+ state = proxy.getServiceState();
+ proxy.monitorHealth();
+ healthy = true;
+ } catch (HealthCheckFailedException e) {
+ LOG.warn("Service health check failed: " + e.getMessage());
+ enterState(State.SERVICE_UNHEALTHY);
+ } catch (Throwable t) {
+ LOG.warn("Transport-level exception trying to monitor health of " +
+ addrToMonitor + ": " + t.getLocalizedMessage());
+ RPC.stopProxy(proxy);
+ proxy = null;
+ enterState(State.SERVICE_NOT_RESPONDING);
+ Thread.sleep(sleepAfterDisconnectMillis);
+ return;
+ }
+
+ if (state != null) {
+ setLastServiceState(state);
+ }
+ if (healthy) {
+ enterState(State.SERVICE_HEALTHY);
+ }
+
+ Thread.sleep(checkIntervalMillis);
+ }
+ }
+
+ private synchronized void setLastServiceState(HAServiceState serviceState) {
+ this.lastServiceState = serviceState;
+ }
+
+ private synchronized void enterState(State newState) {
+ if (newState != state) {
+ LOG.info("Entering state " + newState);
+ state = newState;
+ synchronized (callbacks) {
+ for (Callback cb : callbacks) {
+ cb.enteredState(newState);
+ }
+ }
+ }
+ }
+
+ synchronized State getHealthState() {
+ return state;
+ }
+
+ synchronized HAServiceState getLastServiceState() {
+ return lastServiceState;
+ }
+
+ boolean isAlive() {
+ return daemon.isAlive();
+ }
+
+ void join() throws InterruptedException {
+ daemon.join();
+ }
+
+ void start() {
+ daemon.start();
+ }
+
+ private class MonitorDaemon extends Daemon {
+ private MonitorDaemon() {
+ super();
+ setName("Health Monitor for " + addrToMonitor);
+ setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.fatal("Health monitor failed", e);
+ enterState(HealthMonitor.State.HEALTH_MONITOR_FAILED);
+ }
+ });
+ }
+
+ @Override
+ public void run() {
+ while (shouldRun) {
+ try {
+ loopUntilConnected();
+ doHealthChecks();
+ } catch (InterruptedException ie) {
+ Preconditions.checkState(!shouldRun,
+ "Interrupted but still supposed to run");
+ }
+ }
+ }
+ }
+
+ /**
+ * Callback interface for state change events.
+ *
+ * This interface is called from a single thread which also performs
+ * the health monitoring. If the callback processing takes a long time,
+ * no further health checks will be made during this period, nor will
+ * other registered callbacks be called.
+ *
+ * If the callback itself throws an unchecked exception, no other
+ * callbacks following it will be called, and the health monitor
+ * will terminate, entering HEALTH_MONITOR_FAILED state.
+ */
+ static interface Callback {
+ void enteredState(State newState);
+ }
+
+ /**
+ * Simple main() for testing.
+ */
+ public static void main(String[] args) throws InterruptedException {
+ if (args.length != 1) {
+ System.err.println("Usage: " + HealthMonitor.class.getName() +
+ " <addr to monitor>");
+ System.exit(1);
+ }
+ Configuration conf = new Configuration();
+
+ String target = args[0];
+ InetSocketAddress addr = NetUtils.createSocketAddr(target);
+
+ HealthMonitor hm = new HealthMonitor(conf, addr);
+ hm.start();
+ hm.join();
+ }
+
+}
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java?rev=1303207&r1=1303206&r2=1303207&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java Tue Mar 20 23:27:42 2012
@@ -21,6 +21,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
+import javax.net.SocketFactory;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -34,7 +36,9 @@ import org.apache.hadoop.ha.proto.HAServ
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -47,7 +51,7 @@ import com.google.protobuf.ServiceExcept
@InterfaceAudience.Private
@InterfaceStability.Stable
public class HAServiceProtocolClientSideTranslatorPB implements
- HAServiceProtocol, Closeable {
+ HAServiceProtocol, Closeable, ProtocolTranslator {
/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ =
@@ -71,6 +75,16 @@ public class HAServiceProtocolClientSide
RPC.getProtocolVersion(HAServiceProtocolPB.class), addr, conf);
}
+ public HAServiceProtocolClientSideTranslatorPB(
+ InetSocketAddress addr, Configuration conf,
+ SocketFactory socketFactory, int timeout) throws IOException {
+ RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
+ ProtobufRpcEngine.class);
+ rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
+ RPC.getProtocolVersion(HAServiceProtocolPB.class), addr,
+ UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
+ }
+
@Override
public void monitorHealth() throws IOException {
try {
@@ -132,4 +146,9 @@ public class HAServiceProtocolClientSide
throw ProtobufHelper.getRemoteException(e);
}
}
+
+ @Override
+ public Object getUnderlyingProxyObject() {
+ return rpcProxy;
+ }
}
Added: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java?rev=1303207&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java Tue Mar 20 23:27:42 2012
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.ha.HealthMonitor.Callback;
+import org.apache.hadoop.ha.HealthMonitor.State;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestHealthMonitor {
+ private static final Log LOG = LogFactory.getLog(
+ TestHealthMonitor.class);
+
+ /* bogus address to pass to constructor - never used */
+ private static final InetSocketAddress BOGUS_ADDR =
+ new InetSocketAddress(1);
+
+ private HAServiceProtocol mockProxy;
+
+ /** How many times has createProxy been called */
+ private volatile CountDownLatch createProxyLatch;
+
+ /** Should throw an IOE when trying to connect */
+ private volatile boolean shouldThrowOnCreateProxy = false;
+
+ private HealthMonitor hm;
+
+ @Before
+ public void setupHM() throws InterruptedException, IOException {
+ Configuration conf = new Configuration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+ conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
+ conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
+ conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
+ mockProxy = Mockito.mock(HAServiceProtocol.class);
+ Mockito.doReturn(HAServiceState.ACTIVE)
+ .when(mockProxy).getServiceState();
+
+ hm = new HealthMonitor(conf, BOGUS_ADDR) {
+ @Override
+ protected HAServiceProtocol createProxy() throws IOException {
+ createProxyLatch.countDown();
+ if (shouldThrowOnCreateProxy) {
+ throw new IOException("can't connect");
+ }
+ return mockProxy;
+ }
+ };
+
+ createProxyLatch = new CountDownLatch(1);
+
+ LOG.info("Starting health monitor");
+ hm.start();
+
+ LOG.info("Waiting for proxy to be created");
+ assertTrue(createProxyLatch.await(2000, TimeUnit.MILLISECONDS));
+ createProxyLatch = null;
+
+ LOG.info("Waiting for HEALTHY signal");
+ waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
+ }
+
+ @Test(timeout=15000)
+ public void testMonitor() throws Exception {
+ LOG.info("Mocking bad health check, waiting for UNHEALTHY");
+ Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
+ .when(mockProxy).monitorHealth();
+ waitForState(hm, HealthMonitor.State.SERVICE_UNHEALTHY);
+
+ LOG.info("Returning to healthy state, waiting for HEALTHY");
+ Mockito.doNothing().when(mockProxy).monitorHealth();
+ waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
+
+ LOG.info("Returning an IOException, as if node went down");
+ // should expect many rapid retries
+ createProxyLatch = new CountDownLatch(3);
+ shouldThrowOnCreateProxy = true;
+ Mockito.doThrow(new IOException("Connection lost (fake)"))
+ .when(mockProxy).monitorHealth();
+ waitForState(hm, HealthMonitor.State.SERVICE_NOT_RESPONDING);
+ assertTrue("Monitor should retry if createProxy throws an IOE",
+ createProxyLatch.await(1000, TimeUnit.MILLISECONDS));
+
+ LOG.info("Returning to healthy state, waiting for HEALTHY");
+ shouldThrowOnCreateProxy = false;
+ Mockito.doNothing().when(mockProxy).monitorHealth();
+ waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
+
+ hm.shutdown();
+ hm.join();
+ assertFalse(hm.isAlive());
+ }
+
+ /**
+ * Test that the proper state is propagated when the health monitor
+ * sees an uncaught exception in its thread.
+ */
+ @Test(timeout=15000)
+ public void testHealthMonitorDies() throws Exception {
+ LOG.info("Mocking RTE in health monitor, waiting for FAILED");
+ Mockito.doThrow(new OutOfMemoryError())
+ .when(mockProxy).monitorHealth();
+ waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
+ hm.shutdown();
+ hm.join();
+ assertFalse(hm.isAlive());
+ }
+
+ /**
+ * Test that, if the callback throws an RTE, this will terminate the
+ * health monitor and thus change its state to FAILED
+ * @throws Exception
+ */
+ @Test(timeout=15000)
+ public void testCallbackThrowsRTE() throws Exception {
+ hm.addCallback(new Callback() {
+ @Override
+ public void enteredState(State newState) {
+ throw new RuntimeException("Injected RTE");
+ }
+ });
+ LOG.info("Mocking bad health check, waiting for UNHEALTHY");
+ Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
+ .when(mockProxy).monitorHealth();
+ waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
+ }
+
+ private void waitForState(HealthMonitor hm, State state)
+ throws InterruptedException {
+ long st = System.currentTimeMillis();
+ while (System.currentTimeMillis() - st < 2000) {
+ if (hm.getHealthState() == state) {
+ return;
+ }
+ Thread.sleep(50);
+ }
+ assertEquals(state, hm.getHealthState());
+ }
+}