You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/26 03:57:04 UTC

svn commit: r1305199 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/

Author: todd
Date: Mon Mar 26 01:57:03 2012
New Revision: 1305199

URL: http://svn.apache.org/viewvc?rev=1305199&view=rev
Log:
HADOOP-8204. TestHealthMonitor fails occasionally. Contributed by Todd Lipcon.

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1305199&r1=1305198&r2=1305199&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Mon Mar 26 01:57:03 2012
@@ -279,6 +279,8 @@ Release 0.23.3 - UNRELEASED 
     HADOOP-8159. NetworkTopology: getLeaf should check for invalid topologies.
     (Colin Patrick McCabe via eli)
 
+    HADOOP-8204. TestHealthMonitor fails occasionally (todd)
+
   BREAKDOWN OF HADOOP-7454 SUBTASKS
 
     HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1305199&r1=1305198&r2=1305199&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Mon Mar 26 01:57:03 2012
@@ -18,13 +18,10 @@
 package org.apache.hadoop.ha;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
-import javax.net.SocketFactory;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -32,9 +29,7 @@ import static org.apache.hadoop.fs.Commo
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HealthCheckFailedException;
-import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Daemon;
 
 import com.google.common.base.Preconditions;
@@ -64,8 +59,8 @@ class HealthMonitor {
   /** The connected proxy */
   private HAServiceProtocol proxy;
 
-  /** The address running the HA Service */
-  private final InetSocketAddress addrToMonitor;
+  /** The HA service to monitor */
+  private final HAServiceTarget targetToMonitor;
 
   private final Configuration conf;
   
@@ -109,9 +104,9 @@ class HealthMonitor {
   }
 
 
-  HealthMonitor(Configuration conf, InetSocketAddress addrToMonitor) {
+  HealthMonitor(Configuration conf, HAServiceTarget target) {
+    this.targetToMonitor = target;
     this.conf = conf;
-    this.addrToMonitor = addrToMonitor;
     
     this.sleepAfterDisconnectMillis = conf.getLong(
         HA_HM_SLEEP_AFTER_DISCONNECT_KEY,
@@ -170,7 +165,7 @@ class HealthMonitor {
         proxy = createProxy();
       }
     } catch (IOException e) {
-      LOG.warn("Could not connect to local service at " + addrToMonitor +
+      LOG.warn("Could not connect to local service at " + targetToMonitor +
           ": " + e.getMessage());
       proxy = null;
       enterState(State.SERVICE_NOT_RESPONDING);
@@ -181,10 +176,7 @@ class HealthMonitor {
    * Connect to the service to be monitored. Stubbed out for easier testing.
    */
   protected HAServiceProtocol createProxy() throws IOException {
-    SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf);
-    return new HAServiceProtocolClientSideTranslatorPB(
-        addrToMonitor,
-        conf, socketFactory, rpcTimeout);
+    return targetToMonitor.getProxy(conf, rpcTimeout);
   }
 
   private void doHealthChecks() throws InterruptedException {
@@ -200,7 +192,7 @@ class HealthMonitor {
         enterState(State.SERVICE_UNHEALTHY);
       } catch (Throwable t) {
         LOG.warn("Transport-level exception trying to monitor health of " +
-            addrToMonitor + ": " + t.getLocalizedMessage());
+            targetToMonitor + ": " + t.getLocalizedMessage());
         RPC.stopProxy(proxy);
         proxy = null;
         enterState(State.SERVICE_NOT_RESPONDING);
@@ -258,7 +250,7 @@ class HealthMonitor {
   private class MonitorDaemon extends Daemon {
     private MonitorDaemon() {
       super();
-      setName("Health Monitor for " + addrToMonitor);
+      setName("Health Monitor for " + targetToMonitor);
       setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
         @Override
         public void uncaughtException(Thread t, Throwable e) {
@@ -297,24 +289,4 @@ class HealthMonitor {
   static interface Callback {
     void enteredState(State newState);
   }
-
-  /**
-   * Simple main() for testing.
-   */
-  public static void main(String[] args) throws InterruptedException {
-    if (args.length != 1) {
-      System.err.println("Usage: " + HealthMonitor.class.getName() +
-          " <addr to monitor>");
-      System.exit(1);
-    }
-    Configuration conf = new Configuration();
-    
-    String target = args[0];
-    InetSocketAddress addr = NetUtils.createSocketAddr(target);
-    
-    HealthMonitor hm = new HealthMonitor(conf, addr);
-    hm.start();
-    hm.join();
-  }
-
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java?rev=1305199&r1=1305198&r2=1305199&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java Mon Mar 26 01:57:03 2012
@@ -30,10 +30,12 @@ import org.mockito.Mockito;
  * a mock implementation.
  */
 class DummyHAService extends HAServiceTarget {
-  HAServiceState state;
+  volatile HAServiceState state;
   HAServiceProtocol proxy;
   NodeFencer fencer;
   InetSocketAddress address;
+  boolean isHealthy = true;
+  boolean actUnreachable = false;
 
   DummyHAService(HAServiceState state, InetSocketAddress address) {
     this.state = state;
@@ -47,28 +49,41 @@ class DummyHAService extends HAServiceTa
       @Override
       public void monitorHealth() throws HealthCheckFailedException,
           AccessControlException, IOException {
+        checkUnreachable();
+        if (!isHealthy) {
+          throw new HealthCheckFailedException("not healthy");
+        }
       }
 
       @Override
       public void transitionToActive() throws ServiceFailedException,
           AccessControlException, IOException {
+        checkUnreachable();
         state = HAServiceState.ACTIVE;
       }
 
       @Override
       public void transitionToStandby() throws ServiceFailedException,
           AccessControlException, IOException {
+        checkUnreachable();
         state = HAServiceState.STANDBY;
       }
 
       @Override
       public HAServiceStatus getServiceStatus() throws IOException {
+        checkUnreachable();
         HAServiceStatus ret = new HAServiceStatus(state);
         if (state == HAServiceState.STANDBY) {
           ret.setReadyToBecomeActive();
         }
         return ret;
       }
+
+      private void checkUnreachable() throws IOException {
+        if (actUnreachable) {
+          throw new IOException("Connection refused (fake)");
+        }
+      }
     });
   }
 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java?rev=1305199&r1=1305198&r2=1305199&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java Mon Mar 26 01:57:03 2012
@@ -20,41 +20,30 @@ package org.apache.hadoop.ha;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
-import org.apache.hadoop.ha.HealthCheckFailedException;
 import org.apache.hadoop.ha.HealthMonitor.Callback;
 import org.apache.hadoop.ha.HealthMonitor.State;
 
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class TestHealthMonitor {
   private static final Log LOG = LogFactory.getLog(
       TestHealthMonitor.class);
   
-  /* bogus address to pass to constructor - never used */
-  private static final InetSocketAddress BOGUS_ADDR =
-    new InetSocketAddress(1);
-
-  private HAServiceProtocol mockProxy;
-
   /** How many times has createProxy been called */
-  private volatile CountDownLatch createProxyLatch;
-
-  /** Should throw an IOE when trying to connect */
-  private volatile boolean shouldThrowOnCreateProxy = false;
+  private AtomicInteger createProxyCount = new AtomicInteger(0);
+  private volatile boolean throwOOMEOnCreate = false;
 
   private HealthMonitor hm;
+
+  private DummyHAService svc;
   
   @Before
   public void setupHM() throws InterruptedException, IOException {
@@ -63,30 +52,21 @@ public class TestHealthMonitor {
     conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
     conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
     conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
-    mockProxy = Mockito.mock(HAServiceProtocol.class);
-    Mockito.doReturn(new HAServiceStatus(HAServiceState.ACTIVE))
-      .when(mockProxy).getServiceStatus();
     
-    hm = new HealthMonitor(conf, BOGUS_ADDR) {
+    svc = new DummyHAService(HAServiceState.ACTIVE, null);
+    hm = new HealthMonitor(conf, svc) {
       @Override
       protected HAServiceProtocol createProxy() throws IOException {
-        createProxyLatch.countDown();
-        if (shouldThrowOnCreateProxy) {
-          throw new IOException("can't connect");
+        createProxyCount.incrementAndGet();
+        if (throwOOMEOnCreate) {
+          throw new OutOfMemoryError("oome");
         }
-        return mockProxy;
+        return super.createProxy();
       }
     };
-
-    createProxyLatch = new CountDownLatch(1);
-
     LOG.info("Starting health monitor");
     hm.start();
     
-    LOG.info("Waiting for proxy to be created");
-    assertTrue(createProxyLatch.await(2000, TimeUnit.MILLISECONDS));
-    createProxyLatch = null;
-    
     LOG.info("Waiting for HEALTHY signal");    
     waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
   }
@@ -94,27 +74,26 @@ public class TestHealthMonitor {
   @Test(timeout=15000)
   public void testMonitor() throws Exception {
     LOG.info("Mocking bad health check, waiting for UNHEALTHY");
-    Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
-      .when(mockProxy).monitorHealth();
+    svc.isHealthy = false;
     waitForState(hm, HealthMonitor.State.SERVICE_UNHEALTHY);
     
     LOG.info("Returning to healthy state, waiting for HEALTHY");
-    Mockito.doNothing().when(mockProxy).monitorHealth();
+    svc.isHealthy = true;
     waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
 
     LOG.info("Returning an IOException, as if node went down");
     // should expect many rapid retries
-    createProxyLatch = new CountDownLatch(3);
-    shouldThrowOnCreateProxy = true;
-    Mockito.doThrow(new IOException("Connection lost (fake)"))
-      .when(mockProxy).monitorHealth();
+    int countBefore = createProxyCount.get();
+    svc.actUnreachable = true;
     waitForState(hm, HealthMonitor.State.SERVICE_NOT_RESPONDING);
-    assertTrue("Monitor should retry if createProxy throws an IOE",
-        createProxyLatch.await(1000, TimeUnit.MILLISECONDS));
+
+    // Should retry several times
+    while (createProxyCount.get() < countBefore + 3) {
+      Thread.sleep(10);
+    }
     
     LOG.info("Returning to healthy state, waiting for HEALTHY");
-    shouldThrowOnCreateProxy = false;
-    Mockito.doNothing().when(mockProxy).monitorHealth();
+    svc.actUnreachable = false;
     waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
     
     hm.shutdown();
@@ -129,8 +108,8 @@ public class TestHealthMonitor {
   @Test(timeout=15000)
   public void testHealthMonitorDies() throws Exception {
     LOG.info("Mocking RTE in health monitor, waiting for FAILED");
-    Mockito.doThrow(new OutOfMemoryError())
-      .when(mockProxy).monitorHealth();
+    throwOOMEOnCreate = true;
+    svc.actUnreachable = true;
     waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
     hm.shutdown();
     hm.join();
@@ -151,8 +130,7 @@ public class TestHealthMonitor {
       }
     });
     LOG.info("Mocking bad health check, waiting for UNHEALTHY");
-    Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
-      .when(mockProxy).monitorHealth();
+    svc.isHealthy = false;
     waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
   }