You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/02/17 10:27:32 UTC

hadoop git commit: HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned to SERVICE_NOT_RESPONDING (Contributed by Ming Ma)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 500e6a0f4 -> cf4b7f506


HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned to SERVICE_NOT_RESPONDING (Contributed by Ming Ma)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cf4b7f50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cf4b7f50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cf4b7f50

Branch: refs/heads/trunk
Commit: cf4b7f506dd338ecf2ed4c643b6a6a334e070fca
Parents: 500e6a0
Author: Vinayakumar B <vi...@apache.org>
Authored: Tue Feb 17 14:55:56 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Tue Feb 17 14:55:56 2015 +0530

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../org/apache/hadoop/ha/HealthMonitor.java     | 35 ++++++----
 .../org/apache/hadoop/ha/DummyHAService.java    | 73 +++++++++++++++++---
 .../org/apache/hadoop/ha/TestHealthMonitor.java |  4 +-
 4 files changed, 94 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf4b7f50/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 522ec47..51305bb 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -934,6 +934,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11467. KerberosAuthenticator can connect to a non-secure cluster.
     (yzhangal via rkanter)
 
+    HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned
+    to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf4b7f50/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
index 0d14444..8c87629 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
@@ -30,6 +30,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.*;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.util.Daemon;
 
@@ -201,18 +202,20 @@ public class HealthMonitor {
         status = proxy.getServiceStatus();
         proxy.monitorHealth();
         healthy = true;
-      } catch (HealthCheckFailedException e) {
-        LOG.warn("Service health check failed for " + targetToMonitor
-            + ": " + e.getMessage());
-        enterState(State.SERVICE_UNHEALTHY);
       } catch (Throwable t) {
-        LOG.warn("Transport-level exception trying to monitor health of " +
-            targetToMonitor + ": " + t.getLocalizedMessage());
-        RPC.stopProxy(proxy);
-        proxy = null;
-        enterState(State.SERVICE_NOT_RESPONDING);
-        Thread.sleep(sleepAfterDisconnectMillis);
-        return;
+        if (isHealthCheckFailedException(t)) {
+          LOG.warn("Service health check failed for " + targetToMonitor
+              + ": " + t.getMessage());
+          enterState(State.SERVICE_UNHEALTHY);
+        } else {
+          LOG.warn("Transport-level exception trying to monitor health of " +
+              targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
+          RPC.stopProxy(proxy);
+          proxy = null;
+          enterState(State.SERVICE_NOT_RESPONDING);
+          Thread.sleep(sleepAfterDisconnectMillis);
+          return;
+        }
       }
       
       if (status != null) {
@@ -225,7 +228,15 @@ public class HealthMonitor {
       Thread.sleep(checkIntervalMillis);
     }
   }
-  
+
+  private boolean isHealthCheckFailedException(Throwable t) {
+    return ((t instanceof HealthCheckFailedException) ||
+        (t instanceof RemoteException &&
+        ((RemoteException)t).unwrapRemoteException(
+            HealthCheckFailedException.class) instanceof
+            HealthCheckFailedException));
+  }
+
   private synchronized void setLastServiceStatus(HAServiceStatus status) {
     this.lastServiceState = status;
     for (ServiceStateCallback cb : serviceStateCallbacks) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf4b7f50/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
index e9189e2..aef6c4d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
@@ -22,15 +22,25 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 
+import com.google.protobuf.BlockingService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT;
+
 /**
  * Test-only implementation of {@link HAServiceTarget}, which returns
  * a mock implementation.
@@ -50,22 +60,33 @@ class DummyHAService extends HAServiceTarget {
   DummySharedResource sharedResource;
   public int fenceCount = 0;
   public int activeTransitionCount = 0;
+  boolean testWithProtoBufRPC = false;
   
   static ArrayList<DummyHAService> instances = Lists.newArrayList();
   int index;
 
   DummyHAService(HAServiceState state, InetSocketAddress address) {
+    this(state, address, false);
+  }
+
+  DummyHAService(HAServiceState state, InetSocketAddress address,
+      boolean testWithProtoBufRPC) {
     this.state = state;
-    this.proxy = makeMock();
+    this.testWithProtoBufRPC = testWithProtoBufRPC;
+    if (testWithProtoBufRPC) {
+      this.address = startAndGetRPCServerAddress(address);
+    } else {
+      this.address = address;
+    }
+    Configuration conf = new Configuration();
+    this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
     try {
-      Configuration conf = new Configuration();
-      conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName()); 
+      conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
       this.fencer = Mockito.spy(
           NodeFencer.create(conf, DUMMY_FENCE_KEY));
     } catch (BadFencingConfigurationException e) {
       throw new RuntimeException(e);
     }
-    this.address = address;
     synchronized (instances) {
       instances.add(this);
       this.index = instances.size();
@@ -75,9 +96,42 @@ class DummyHAService extends HAServiceTarget {
   public void setSharedResource(DummySharedResource rsrc) {
     this.sharedResource = rsrc;
   }
-  
-  private HAServiceProtocol makeMock() {
-    return Mockito.spy(new MockHAProtocolImpl());
+
+  private InetSocketAddress startAndGetRPCServerAddress(InetSocketAddress serverAddress) {
+    Configuration conf = new Configuration();
+
+    try {
+      RPC.setProtocolEngine(conf,
+          HAServiceProtocolPB.class, ProtobufRpcEngine.class);
+      HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
+          new HAServiceProtocolServerSideTranslatorPB(new MockHAProtocolImpl());
+      BlockingService haPbService = HAServiceProtocolService
+          .newReflectiveBlockingService(haServiceProtocolXlator);
+
+      Server server = new RPC.Builder(conf)
+          .setProtocol(HAServiceProtocolPB.class)
+          .setInstance(haPbService)
+          .setBindAddress(serverAddress.getHostName())
+          .setPort(serverAddress.getPort()).build();
+      server.start();
+      return NetUtils.getConnectAddress(server);
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  private HAServiceProtocol makeMock(Configuration conf, int timeoutMs) {
+    HAServiceProtocol service;
+    if (!testWithProtoBufRPC) {
+      service = new MockHAProtocolImpl();
+    } else {
+      try {
+        service = super.getProxy(conf, timeoutMs);
+      } catch (IOException e) {
+        return null;
+      }
+    }
+    return Mockito.spy(service);
   }
 
   @Override
@@ -93,6 +147,9 @@ class DummyHAService extends HAServiceTarget {
   @Override
   public HAServiceProtocol getProxy(Configuration conf, int timeout)
       throws IOException {
+    if (testWithProtoBufRPC) {
+      proxy = makeMock(conf, timeout);
+    }
     return proxy;
   }
   
@@ -168,7 +225,7 @@ class DummyHAService extends HAServiceTarget {
     public HAServiceStatus getServiceStatus() throws IOException {
       checkUnreachable();
       HAServiceStatus ret = new HAServiceStatus(state);
-      if (state == HAServiceState.STANDBY) {
+      if (state == HAServiceState.STANDBY || state == HAServiceState.ACTIVE) {
         ret.setReadyToBecomeActive();
       }
       return ret;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf4b7f50/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
index db534de..b58793f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ha;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -54,7 +55,8 @@ public class TestHealthMonitor {
     conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
     conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
     
-    svc = new DummyHAService(HAServiceState.ACTIVE, null);
+    svc = new DummyHAService(HAServiceState.ACTIVE,
+        new InetSocketAddress("0.0.0.0", 0), true);
     hm = new HealthMonitor(conf, svc) {
       @Override
       protected HAServiceProtocol createProxy() throws IOException {