You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by km...@apache.org on 2014/09/03 01:45:57 UTC
git commit: WebHdfs HA loses original error after failover/retry
limit is reached
Repository: knox
Updated Branches:
refs/heads/master 02d9b39d7 -> f0af8bd6c
WebHdfs HA loses original error after failover/retry limit is reached
Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/f0af8bd6
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/f0af8bd6
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/f0af8bd6
Branch: refs/heads/master
Commit: f0af8bd6ca966319a234c83a85272555545442cb
Parents: 02d9b39
Author: Kevin Minder <ke...@hortonworks.com>
Authored: Tue Sep 2 19:45:55 2014 -0400
Committer: Kevin Minder <ke...@hortonworks.com>
Committed: Tue Sep 2 19:45:55 2014 -0400
----------------------------------------------------------------------
.../dispatch/WebHdfsHaHttpClientDispatch.java | 26 ++--
.../WebHdfsHaHttpClientDispatchTest.java | 6 +-
.../hadoop/gateway/WebHdfsHaFuncTest.java | 130 +++++++++++++++++++
3 files changed, 153 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/knox/blob/f0af8bd6/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
index a7ab1cb..e97e8c9 100644
--- a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
+++ b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
@@ -73,20 +73,20 @@ public class WebHdfsHaHttpClientDispatch extends HttpClientDispatch {
}
@Override
- protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+ protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException {
HttpResponse inboundResponse = null;
try {
inboundResponse = executeOutboundRequest(outboundRequest);
writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
} catch (StandbyException e) {
LOG.errorReceivedFromStandbyNode(e);
- failoverRequest(outboundRequest, inboundRequest, outboundResponse);
+ failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
} catch (SafeModeException e) {
LOG.errorReceivedFromSafeModeNode(e);
- retryRequest(outboundRequest, inboundRequest, outboundResponse);
+ retryRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
} catch (IOException e) {
LOG.errorConnectingToServer(outboundRequest.getURI().toString(), e);
- failoverRequest(outboundRequest, inboundRequest, outboundResponse);
+ failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
}
}
@@ -111,11 +111,11 @@ public class WebHdfsHaHttpClientDispatch extends HttpClientDispatch {
super.writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
}
- private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+ private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException {
LOG.failingOverRequest(outboundRequest.getURI().toString());
AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
if (counter == null) {
- counter = new AtomicInteger(1);
+ counter = new AtomicInteger(0);
}
inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
if (counter.incrementAndGet() <= maxFailoverAttempts) {
@@ -134,14 +134,19 @@ public class WebHdfsHaHttpClientDispatch extends HttpClientDispatch {
executeRequest(outboundRequest, inboundRequest, outboundResponse);
} else {
LOG.maxFailoverAttemptsReached(maxFailoverAttempts, resourceRole);
+ if (inboundResponse != null) {
+ writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+ } else {
+ throw new IOException(exception);
+ }
}
}
- private void retryRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+ private void retryRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException {
LOG.retryingRequest(outboundRequest.getURI().toString());
AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(RETRY_COUNTER_ATTRIBUTE);
if (counter == null) {
- counter = new AtomicInteger(1);
+ counter = new AtomicInteger(0);
}
inboundRequest.setAttribute(RETRY_COUNTER_ATTRIBUTE, counter);
if (counter.incrementAndGet() <= maxRetryAttempts) {
@@ -155,6 +160,11 @@ public class WebHdfsHaHttpClientDispatch extends HttpClientDispatch {
executeRequest(outboundRequest, inboundRequest, outboundResponse);
} else {
LOG.maxRetryAttemptsReached(maxRetryAttempts, resourceRole, outboundRequest.getURI().toString());
+ if (inboundResponse != null) {
+ writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+ } else {
+ throw new IOException(exception);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/knox/blob/f0af8bd6/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
index a9aa5a6..e64f7c7 100644
--- a/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
+++ b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
@@ -90,7 +90,11 @@ public class WebHdfsHaHttpClientDispatchTest {
WebHdfsHaHttpClientDispatch dispatch = new WebHdfsHaHttpClientDispatch();
dispatch.init(filterConfig);
long startTime = System.currentTimeMillis();
- dispatch.executeRequest(outboundRequest, inboundRequest, outboundResponse);
+ try {
+ dispatch.executeRequest(outboundRequest, inboundRequest, outboundResponse);
+ } catch (IOException e) {
+ //this is expected after the failover limit is reached
+ }
long elapsedTime = System.currentTimeMillis() - startTime;
Assert.assertEquals(uri2.toString(), provider.getActiveURL(serviceName));
//test to make sure the sleep took place
http://git-wip-us.apache.org/repos/asf/knox/blob/f0af8bd6/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java
----------------------------------------------------------------------
diff --git a/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java b/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java
index 3d0a55d..332fa80 100644
--- a/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java
+++ b/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java
@@ -210,6 +210,26 @@ public class WebHdfsHaFuncTest {
}
@Test
+ public void testFailoverLimit() throws Exception {
+ String username = "hdfs";
+ String password = "hdfs-password";
+ //Shutdown master and expect standby to serve the list response
+ masterServer.stop();
+ standbyServer.stop();
+ given()
+ .auth().preemptive().basic(username, password)
+ .header("X-XSRF-Header", "jksdhfkhdsf")
+ .queryParam("op", "LISTSTATUS")
+ .expect()
+// .log().ifError()
+ .statusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR)
+ .when().get(driver.getUrl("WEBHDFS") + "/v1/");
+ standbyServer.start();
+ masterServer.start();
+ }
+
+
+ @Test
public void testServerInStandby() throws IOException {
String username = "hdfs";
String password = "hdfs-password";
@@ -247,6 +267,59 @@ public class WebHdfsHaFuncTest {
}
@Test
+ public void testServerInStandbyFailoverLimit() throws IOException {
+ String username = "hdfs";
+ String password = "hdfs-password";
+ //make master the server that is in standby
+ masterServer.expect()
+ .method("GET")
+ .pathInfo("/webhdfs/v1/")
+ .queryParam("op", "LISTSTATUS")
+ .queryParam("user.name", username)
+ .respond()
+ .status(HttpStatus.SC_FORBIDDEN)
+ .content(driver.getResourceBytes("webhdfs-liststatus-standby.json"))
+ .contentType("application/json");
+ standbyServer.expect()
+ .method("GET")
+ .pathInfo("/webhdfs/v1/")
+ .queryParam("op", "LISTSTATUS")
+ .queryParam("user.name", username)
+ .respond()
+ .status(HttpStatus.SC_FORBIDDEN)
+ .content(driver.getResourceBytes("webhdfs-liststatus-standby.json"))
+ .contentType("application/json");
+ masterServer.expect()
+ .method("GET")
+ .pathInfo("/webhdfs/v1/")
+ .queryParam("op", "LISTSTATUS")
+ .queryParam("user.name", username)
+ .respond()
+ .status(HttpStatus.SC_FORBIDDEN)
+ .content(driver.getResourceBytes("webhdfs-liststatus-standby.json"))
+ .contentType("application/json");
+ standbyServer.expect()
+ .method("GET")
+ .pathInfo("/webhdfs/v1/")
+ .queryParam("op", "LISTSTATUS")
+ .queryParam("user.name", username)
+ .respond()
+ .status(HttpStatus.SC_FORBIDDEN)
+ .content(driver.getResourceBytes("webhdfs-liststatus-standby.json"))
+ .contentType("application/json");
+ given()
+ .auth().preemptive().basic(username, password)
+ .header("X-XSRF-Header", "jksdhfkhdsf")
+ .queryParam("op", "LISTSTATUS")
+ .expect()
+// .log().ifError()
+ .statusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR)
+ .when().get(driver.getUrl("WEBHDFS") + "/v1/");
+ masterServer.isEmpty();
+ standbyServer.isEmpty();
+ }
+
+ @Test
public void testServerInSafeMode() throws IOException {
String username = "hdfs";
String password = "hdfs-password";
@@ -283,4 +356,61 @@ public class WebHdfsHaFuncTest {
.when().post(driver.getUrl("WEBHDFS") + "/v1/user/hdfs/foo.txt");
masterServer.isEmpty();
}
+
+ @Test
+ public void testServerInSafeModeRetryLimit() throws IOException {
+ String username = "hdfs";
+ String password = "hdfs-password";
+ //master is in safe mode
+ masterServer.expect()
+ .method("POST")
+ .pathInfo("/webhdfs/v1/user/hdfs/foo.txt")
+ .queryParam("op", "RENAME")
+ .queryParam("destination", "/user/hdfs/foo.txt")
+ .queryParam("user.name", username)
+ .respond()
+ .status(HttpStatus.SC_FORBIDDEN)
+ .content(driver.getResourceBytes("webhdfs-rename-safemode.json"))
+ .contentType("application/json");
+ masterServer.expect()
+ .method("POST")
+ .pathInfo("/webhdfs/v1/user/hdfs/foo.txt")
+ .queryParam("op", "RENAME")
+ .queryParam("destination", "/user/hdfs/foo.txt")
+ .queryParam("user.name", username)
+ .respond()
+ .status(HttpStatus.SC_FORBIDDEN)
+ .content(driver.getResourceBytes("webhdfs-rename-safemode.json"))
+ .contentType("application/json");
+ masterServer.expect()
+ .method("POST")
+ .pathInfo("/webhdfs/v1/user/hdfs/foo.txt")
+ .queryParam("op", "RENAME")
+ .queryParam("destination", "/user/hdfs/foo.txt")
+ .queryParam("user.name", username)
+ .respond()
+ .status(HttpStatus.SC_FORBIDDEN)
+ .content(driver.getResourceBytes("webhdfs-rename-safemode.json"))
+ .contentType("application/json");
+ masterServer.expect()
+ .method("POST")
+ .pathInfo("/webhdfs/v1/user/hdfs/foo.txt")
+ .queryParam("op", "RENAME")
+ .queryParam("destination", "/user/hdfs/foo.txt")
+ .queryParam("user.name", username)
+ .respond()
+ .status(HttpStatus.SC_FORBIDDEN)
+ .content(driver.getResourceBytes("webhdfs-rename-safemode.json"))
+ .contentType("application/json");
+ given()
+ .auth().preemptive().basic(username, password)
+ .header("X-XSRF-Header", "jksdhfkhdsf")
+ .queryParam("op", "RENAME")
+ .queryParam("destination", "/user/hdfs/foo.txt")
+ .expect()
+// .log().ifError()
+ .statusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR)
+ .when().post(driver.getUrl("WEBHDFS") + "/v1/user/hdfs/foo.txt");
+ masterServer.isEmpty();
+ }
}