You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/02/17 10:27:32 UTC
hadoop git commit: HADOOP-11000. HAServiceProtocol's health state is
incorrectly transitioned to SERVICE_NOT_RESPONDING (Contributed by Ming Ma)
Repository: hadoop
Updated Branches:
refs/heads/trunk 500e6a0f4 -> cf4b7f506
HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned to SERVICE_NOT_RESPONDING (Contributed by Ming Ma)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cf4b7f50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cf4b7f50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cf4b7f50
Branch: refs/heads/trunk
Commit: cf4b7f506dd338ecf2ed4c643b6a6a334e070fca
Parents: 500e6a0
Author: Vinayakumar B <vi...@apache.org>
Authored: Tue Feb 17 14:55:56 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Tue Feb 17 14:55:56 2015 +0530
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../org/apache/hadoop/ha/HealthMonitor.java | 35 ++++++----
.../org/apache/hadoop/ha/DummyHAService.java | 73 +++++++++++++++++---
.../org/apache/hadoop/ha/TestHealthMonitor.java | 4 +-
4 files changed, 94 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf4b7f50/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 522ec47..51305bb 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -934,6 +934,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11467. KerberosAuthenticator can connect to a non-secure cluster.
(yzhangal via rkanter)
+ HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned
+ to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf4b7f50/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
index 0d14444..8c87629 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
@@ -30,6 +30,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.*;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthCheckFailedException;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Daemon;
@@ -201,18 +202,20 @@ public class HealthMonitor {
status = proxy.getServiceStatus();
proxy.monitorHealth();
healthy = true;
- } catch (HealthCheckFailedException e) {
- LOG.warn("Service health check failed for " + targetToMonitor
- + ": " + e.getMessage());
- enterState(State.SERVICE_UNHEALTHY);
} catch (Throwable t) {
- LOG.warn("Transport-level exception trying to monitor health of " +
- targetToMonitor + ": " + t.getLocalizedMessage());
- RPC.stopProxy(proxy);
- proxy = null;
- enterState(State.SERVICE_NOT_RESPONDING);
- Thread.sleep(sleepAfterDisconnectMillis);
- return;
+ if (isHealthCheckFailedException(t)) {
+ LOG.warn("Service health check failed for " + targetToMonitor
+ + ": " + t.getMessage());
+ enterState(State.SERVICE_UNHEALTHY);
+ } else {
+ LOG.warn("Transport-level exception trying to monitor health of " +
+ targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
+ RPC.stopProxy(proxy);
+ proxy = null;
+ enterState(State.SERVICE_NOT_RESPONDING);
+ Thread.sleep(sleepAfterDisconnectMillis);
+ return;
+ }
}
if (status != null) {
@@ -225,7 +228,15 @@ public class HealthMonitor {
Thread.sleep(checkIntervalMillis);
}
}
-
+
+ private boolean isHealthCheckFailedException(Throwable t) {
+ return ((t instanceof HealthCheckFailedException) ||
+ (t instanceof RemoteException &&
+ ((RemoteException)t).unwrapRemoteException(
+ HealthCheckFailedException.class) instanceof
+ HealthCheckFailedException));
+ }
+
private synchronized void setLastServiceStatus(HAServiceStatus status) {
this.lastServiceState = status;
for (ServiceStateCallback cb : serviceStateCallbacks) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf4b7f50/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
index e9189e2..aef6c4d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
@@ -22,15 +22,25 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT;
+
/**
* Test-only implementation of {@link HAServiceTarget}, which returns
* a mock implementation.
@@ -50,22 +60,33 @@ class DummyHAService extends HAServiceTarget {
DummySharedResource sharedResource;
public int fenceCount = 0;
public int activeTransitionCount = 0;
+ boolean testWithProtoBufRPC = false;
static ArrayList<DummyHAService> instances = Lists.newArrayList();
int index;
DummyHAService(HAServiceState state, InetSocketAddress address) {
+ this(state, address, false);
+ }
+
+ DummyHAService(HAServiceState state, InetSocketAddress address,
+ boolean testWithProtoBufRPC) {
this.state = state;
- this.proxy = makeMock();
+ this.testWithProtoBufRPC = testWithProtoBufRPC;
+ if (testWithProtoBufRPC) {
+ this.address = startAndGetRPCServerAddress(address);
+ } else {
+ this.address = address;
+ }
+ Configuration conf = new Configuration();
+ this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
try {
- Configuration conf = new Configuration();
- conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
+ conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
this.fencer = Mockito.spy(
NodeFencer.create(conf, DUMMY_FENCE_KEY));
} catch (BadFencingConfigurationException e) {
throw new RuntimeException(e);
}
- this.address = address;
synchronized (instances) {
instances.add(this);
this.index = instances.size();
@@ -75,9 +96,42 @@ class DummyHAService extends HAServiceTarget {
public void setSharedResource(DummySharedResource rsrc) {
this.sharedResource = rsrc;
}
-
- private HAServiceProtocol makeMock() {
- return Mockito.spy(new MockHAProtocolImpl());
+
+ private InetSocketAddress startAndGetRPCServerAddress(InetSocketAddress serverAddress) {
+ Configuration conf = new Configuration();
+
+ try {
+ RPC.setProtocolEngine(conf,
+ HAServiceProtocolPB.class, ProtobufRpcEngine.class);
+ HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
+ new HAServiceProtocolServerSideTranslatorPB(new MockHAProtocolImpl());
+ BlockingService haPbService = HAServiceProtocolService
+ .newReflectiveBlockingService(haServiceProtocolXlator);
+
+ Server server = new RPC.Builder(conf)
+ .setProtocol(HAServiceProtocolPB.class)
+ .setInstance(haPbService)
+ .setBindAddress(serverAddress.getHostName())
+ .setPort(serverAddress.getPort()).build();
+ server.start();
+ return NetUtils.getConnectAddress(server);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ private HAServiceProtocol makeMock(Configuration conf, int timeoutMs) {
+ HAServiceProtocol service;
+ if (!testWithProtoBufRPC) {
+ service = new MockHAProtocolImpl();
+ } else {
+ try {
+ service = super.getProxy(conf, timeoutMs);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+ return Mockito.spy(service);
}
@Override
@@ -93,6 +147,9 @@ class DummyHAService extends HAServiceTarget {
@Override
public HAServiceProtocol getProxy(Configuration conf, int timeout)
throws IOException {
+ if (testWithProtoBufRPC) {
+ proxy = makeMock(conf, timeout);
+ }
return proxy;
}
@@ -168,7 +225,7 @@ class DummyHAService extends HAServiceTarget {
public HAServiceStatus getServiceStatus() throws IOException {
checkUnreachable();
HAServiceStatus ret = new HAServiceStatus(state);
- if (state == HAServiceState.STANDBY) {
+ if (state == HAServiceState.STANDBY || state == HAServiceState.ACTIVE) {
ret.setReadyToBecomeActive();
}
return ret;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf4b7f50/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
index db534de..b58793f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ha;
import static org.junit.Assert.*;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -54,7 +55,8 @@ public class TestHealthMonitor {
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
- svc = new DummyHAService(HAServiceState.ACTIVE, null);
+ svc = new DummyHAService(HAServiceState.ACTIVE,
+ new InetSocketAddress("0.0.0.0", 0), true);
hm = new HealthMonitor(conf, svc) {
@Override
protected HAServiceProtocol createProxy() throws IOException {