You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/26 03:57:04 UTC
svn commit: r1305199 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/
Author: todd
Date: Mon Mar 26 01:57:03 2012
New Revision: 1305199
URL: http://svn.apache.org/viewvc?rev=1305199&view=rev
Log:
HADOOP-8204. TestHealthMonitor fails occasionally. Contributed by Todd Lipcon.
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1305199&r1=1305198&r2=1305199&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Mon Mar 26 01:57:03 2012
@@ -279,6 +279,8 @@ Release 0.23.3 - UNRELEASED
HADOOP-8159. NetworkTopology: getLeaf should check for invalid topologies.
(Colin Patrick McCabe via eli)
+ HADOOP-8204. TestHealthMonitor fails occasionally (todd)
+
BREAKDOWN OF HADOOP-7454 SUBTASKS
HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1305199&r1=1305198&r2=1305199&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Mon Mar 26 01:57:03 2012
@@ -18,13 +18,10 @@
package org.apache.hadoop.ha;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import javax.net.SocketFactory;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -32,9 +29,7 @@ import static org.apache.hadoop.fs.Commo
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthCheckFailedException;
-import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Daemon;
import com.google.common.base.Preconditions;
@@ -64,8 +59,8 @@ class HealthMonitor {
/** The connected proxy */
private HAServiceProtocol proxy;
- /** The address running the HA Service */
- private final InetSocketAddress addrToMonitor;
+ /** The HA service to monitor */
+ private final HAServiceTarget targetToMonitor;
private final Configuration conf;
@@ -109,9 +104,9 @@ class HealthMonitor {
}
- HealthMonitor(Configuration conf, InetSocketAddress addrToMonitor) {
+ HealthMonitor(Configuration conf, HAServiceTarget target) {
+ this.targetToMonitor = target;
this.conf = conf;
- this.addrToMonitor = addrToMonitor;
this.sleepAfterDisconnectMillis = conf.getLong(
HA_HM_SLEEP_AFTER_DISCONNECT_KEY,
@@ -170,7 +165,7 @@ class HealthMonitor {
proxy = createProxy();
}
} catch (IOException e) {
- LOG.warn("Could not connect to local service at " + addrToMonitor +
+ LOG.warn("Could not connect to local service at " + targetToMonitor +
": " + e.getMessage());
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
@@ -181,10 +176,7 @@ class HealthMonitor {
* Connect to the service to be monitored. Stubbed out for easier testing.
*/
protected HAServiceProtocol createProxy() throws IOException {
- SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf);
- return new HAServiceProtocolClientSideTranslatorPB(
- addrToMonitor,
- conf, socketFactory, rpcTimeout);
+ return targetToMonitor.getProxy(conf, rpcTimeout);
}
private void doHealthChecks() throws InterruptedException {
@@ -200,7 +192,7 @@ class HealthMonitor {
enterState(State.SERVICE_UNHEALTHY);
} catch (Throwable t) {
LOG.warn("Transport-level exception trying to monitor health of " +
- addrToMonitor + ": " + t.getLocalizedMessage());
+ targetToMonitor + ": " + t.getLocalizedMessage());
RPC.stopProxy(proxy);
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
@@ -258,7 +250,7 @@ class HealthMonitor {
private class MonitorDaemon extends Daemon {
private MonitorDaemon() {
super();
- setName("Health Monitor for " + addrToMonitor);
+ setName("Health Monitor for " + targetToMonitor);
setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
@@ -297,24 +289,4 @@ class HealthMonitor {
static interface Callback {
void enteredState(State newState);
}
-
- /**
- * Simple main() for testing.
- */
- public static void main(String[] args) throws InterruptedException {
- if (args.length != 1) {
- System.err.println("Usage: " + HealthMonitor.class.getName() +
- " <addr to monitor>");
- System.exit(1);
- }
- Configuration conf = new Configuration();
-
- String target = args[0];
- InetSocketAddress addr = NetUtils.createSocketAddr(target);
-
- HealthMonitor hm = new HealthMonitor(conf, addr);
- hm.start();
- hm.join();
- }
-
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java?rev=1305199&r1=1305198&r2=1305199&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java Mon Mar 26 01:57:03 2012
@@ -30,10 +30,12 @@ import org.mockito.Mockito;
* a mock implementation.
*/
class DummyHAService extends HAServiceTarget {
- HAServiceState state;
+ volatile HAServiceState state;
HAServiceProtocol proxy;
NodeFencer fencer;
InetSocketAddress address;
+ boolean isHealthy = true;
+ boolean actUnreachable = false;
DummyHAService(HAServiceState state, InetSocketAddress address) {
this.state = state;
@@ -47,28 +49,41 @@ class DummyHAService extends HAServiceTa
@Override
public void monitorHealth() throws HealthCheckFailedException,
AccessControlException, IOException {
+ checkUnreachable();
+ if (!isHealthy) {
+ throw new HealthCheckFailedException("not healthy");
+ }
}
@Override
public void transitionToActive() throws ServiceFailedException,
AccessControlException, IOException {
+ checkUnreachable();
state = HAServiceState.ACTIVE;
}
@Override
public void transitionToStandby() throws ServiceFailedException,
AccessControlException, IOException {
+ checkUnreachable();
state = HAServiceState.STANDBY;
}
@Override
public HAServiceStatus getServiceStatus() throws IOException {
+ checkUnreachable();
HAServiceStatus ret = new HAServiceStatus(state);
if (state == HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
}
return ret;
}
+
+ private void checkUnreachable() throws IOException {
+ if (actUnreachable) {
+ throw new IOException("Connection refused (fake)");
+ }
+ }
});
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java?rev=1305199&r1=1305198&r2=1305199&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java Mon Mar 26 01:57:03 2012
@@ -20,41 +20,30 @@ package org.apache.hadoop.ha;
import static org.junit.Assert.*;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
-import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.HealthMonitor.Callback;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
public class TestHealthMonitor {
private static final Log LOG = LogFactory.getLog(
TestHealthMonitor.class);
- /* bogus address to pass to constructor - never used */
- private static final InetSocketAddress BOGUS_ADDR =
- new InetSocketAddress(1);
-
- private HAServiceProtocol mockProxy;
-
/** How many times has createProxy been called */
- private volatile CountDownLatch createProxyLatch;
-
- /** Should throw an IOE when trying to connect */
- private volatile boolean shouldThrowOnCreateProxy = false;
+ private AtomicInteger createProxyCount = new AtomicInteger(0);
+ private volatile boolean throwOOMEOnCreate = false;
private HealthMonitor hm;
+
+ private DummyHAService svc;
@Before
public void setupHM() throws InterruptedException, IOException {
@@ -63,30 +52,21 @@ public class TestHealthMonitor {
conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
- mockProxy = Mockito.mock(HAServiceProtocol.class);
- Mockito.doReturn(new HAServiceStatus(HAServiceState.ACTIVE))
- .when(mockProxy).getServiceStatus();
- hm = new HealthMonitor(conf, BOGUS_ADDR) {
+ svc = new DummyHAService(HAServiceState.ACTIVE, null);
+ hm = new HealthMonitor(conf, svc) {
@Override
protected HAServiceProtocol createProxy() throws IOException {
- createProxyLatch.countDown();
- if (shouldThrowOnCreateProxy) {
- throw new IOException("can't connect");
+ createProxyCount.incrementAndGet();
+ if (throwOOMEOnCreate) {
+ throw new OutOfMemoryError("oome");
}
- return mockProxy;
+ return super.createProxy();
}
};
-
- createProxyLatch = new CountDownLatch(1);
-
LOG.info("Starting health monitor");
hm.start();
- LOG.info("Waiting for proxy to be created");
- assertTrue(createProxyLatch.await(2000, TimeUnit.MILLISECONDS));
- createProxyLatch = null;
-
LOG.info("Waiting for HEALTHY signal");
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
}
@@ -94,27 +74,26 @@ public class TestHealthMonitor {
@Test(timeout=15000)
public void testMonitor() throws Exception {
LOG.info("Mocking bad health check, waiting for UNHEALTHY");
- Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
- .when(mockProxy).monitorHealth();
+ svc.isHealthy = false;
waitForState(hm, HealthMonitor.State.SERVICE_UNHEALTHY);
LOG.info("Returning to healthy state, waiting for HEALTHY");
- Mockito.doNothing().when(mockProxy).monitorHealth();
+ svc.isHealthy = true;
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
LOG.info("Returning an IOException, as if node went down");
// should expect many rapid retries
- createProxyLatch = new CountDownLatch(3);
- shouldThrowOnCreateProxy = true;
- Mockito.doThrow(new IOException("Connection lost (fake)"))
- .when(mockProxy).monitorHealth();
+ int countBefore = createProxyCount.get();
+ svc.actUnreachable = true;
waitForState(hm, HealthMonitor.State.SERVICE_NOT_RESPONDING);
- assertTrue("Monitor should retry if createProxy throws an IOE",
- createProxyLatch.await(1000, TimeUnit.MILLISECONDS));
+
+ // Should retry several times
+ while (createProxyCount.get() < countBefore + 3) {
+ Thread.sleep(10);
+ }
LOG.info("Returning to healthy state, waiting for HEALTHY");
- shouldThrowOnCreateProxy = false;
- Mockito.doNothing().when(mockProxy).monitorHealth();
+ svc.actUnreachable = false;
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
hm.shutdown();
@@ -129,8 +108,8 @@ public class TestHealthMonitor {
@Test(timeout=15000)
public void testHealthMonitorDies() throws Exception {
LOG.info("Mocking RTE in health monitor, waiting for FAILED");
- Mockito.doThrow(new OutOfMemoryError())
- .when(mockProxy).monitorHealth();
+ throwOOMEOnCreate = true;
+ svc.actUnreachable = true;
waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
hm.shutdown();
hm.join();
@@ -151,8 +130,7 @@ public class TestHealthMonitor {
}
});
LOG.info("Mocking bad health check, waiting for UNHEALTHY");
- Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
- .when(mockProxy).monitorHealth();
+ svc.isHealthy = false;
waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
}