You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by km...@apache.org on 2014/09/03 01:45:57 UTC

git commit: WebHdfs HA loses original error after failover/retry limit is reached

Repository: knox
Updated Branches:
  refs/heads/master 02d9b39d7 -> f0af8bd6c


WebHdfs HA loses original error after failover/retry limit is reached


Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/f0af8bd6
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/f0af8bd6
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/f0af8bd6

Branch: refs/heads/master
Commit: f0af8bd6ca966319a234c83a85272555545442cb
Parents: 02d9b39
Author: Kevin Minder <ke...@hortonworks.com>
Authored: Tue Sep 2 19:45:55 2014 -0400
Committer: Kevin Minder <ke...@hortonworks.com>
Committed: Tue Sep 2 19:45:55 2014 -0400

----------------------------------------------------------------------
 .../dispatch/WebHdfsHaHttpClientDispatch.java   |  26 ++--
 .../WebHdfsHaHttpClientDispatchTest.java        |   6 +-
 .../hadoop/gateway/WebHdfsHaFuncTest.java       | 130 +++++++++++++++++++
 3 files changed, 153 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/f0af8bd6/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
index a7ab1cb..e97e8c9 100644
--- a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
+++ b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
@@ -73,20 +73,20 @@ public class WebHdfsHaHttpClientDispatch extends HttpClientDispatch {
    }
 
    @Override
-   protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+   protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException {
       HttpResponse inboundResponse = null;
       try {
          inboundResponse = executeOutboundRequest(outboundRequest);
          writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
       } catch (StandbyException e) {
          LOG.errorReceivedFromStandbyNode(e);
-         failoverRequest(outboundRequest, inboundRequest, outboundResponse);
+         failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
       } catch (SafeModeException e) {
          LOG.errorReceivedFromSafeModeNode(e);
-         retryRequest(outboundRequest, inboundRequest, outboundResponse);
+         retryRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
       } catch (IOException e) {
          LOG.errorConnectingToServer(outboundRequest.getURI().toString(), e);
-         failoverRequest(outboundRequest, inboundRequest, outboundResponse);
+         failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
       }
    }
 
@@ -111,11 +111,11 @@ public class WebHdfsHaHttpClientDispatch extends HttpClientDispatch {
       super.writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
    }
 
-   private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+   private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException {
       LOG.failingOverRequest(outboundRequest.getURI().toString());
       AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
       if (counter == null) {
-         counter = new AtomicInteger(1);
+         counter = new AtomicInteger(0);
       }
       inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
       if (counter.incrementAndGet() <= maxFailoverAttempts) {
@@ -134,14 +134,19 @@ public class WebHdfsHaHttpClientDispatch extends HttpClientDispatch {
          executeRequest(outboundRequest, inboundRequest, outboundResponse);
       } else {
          LOG.maxFailoverAttemptsReached(maxFailoverAttempts, resourceRole);
+         if (inboundResponse != null) {
+            writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+         } else {
+            throw new IOException(exception);
+         }
       }
    }
 
-   private void retryRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+   private void retryRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException {
       LOG.retryingRequest(outboundRequest.getURI().toString());
       AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(RETRY_COUNTER_ATTRIBUTE);
       if (counter == null) {
-         counter = new AtomicInteger(1);
+         counter = new AtomicInteger(0);
       }
       inboundRequest.setAttribute(RETRY_COUNTER_ATTRIBUTE, counter);
       if (counter.incrementAndGet() <= maxRetryAttempts) {
@@ -155,6 +160,11 @@ public class WebHdfsHaHttpClientDispatch extends HttpClientDispatch {
          executeRequest(outboundRequest, inboundRequest, outboundResponse);
       } else {
          LOG.maxRetryAttemptsReached(maxRetryAttempts, resourceRole, outboundRequest.getURI().toString());
+         if (inboundResponse != null) {
+            writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+         } else {
+            throw new IOException(exception);
+         }
       }
    }
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/f0af8bd6/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
index a9aa5a6..e64f7c7 100644
--- a/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
+++ b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
@@ -90,7 +90,11 @@ public class WebHdfsHaHttpClientDispatchTest {
       WebHdfsHaHttpClientDispatch dispatch = new WebHdfsHaHttpClientDispatch();
       dispatch.init(filterConfig);
       long startTime = System.currentTimeMillis();
-      dispatch.executeRequest(outboundRequest, inboundRequest, outboundResponse);
+      try {
+         dispatch.executeRequest(outboundRequest, inboundRequest, outboundResponse);
+      } catch (IOException e) {
+        //this is expected after the failover limit is reached
+      }
       long elapsedTime = System.currentTimeMillis() - startTime;
       Assert.assertEquals(uri2.toString(), provider.getActiveURL(serviceName));
       //test to make sure the sleep took place

http://git-wip-us.apache.org/repos/asf/knox/blob/f0af8bd6/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java
----------------------------------------------------------------------
diff --git a/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java b/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java
index 3d0a55d..332fa80 100644
--- a/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java
+++ b/gateway-test/src/test/java/org/apache/hadoop/gateway/WebHdfsHaFuncTest.java
@@ -210,6 +210,26 @@ public class WebHdfsHaFuncTest {
    }
 
    @Test
+   public void testFailoverLimit() throws Exception {
+      String username = "hdfs";
+      String password = "hdfs-password";
+      //Shutdown master and expect standby to serve the list response
+      masterServer.stop();
+      standbyServer.stop();
+      given()
+            .auth().preemptive().basic(username, password)
+            .header("X-XSRF-Header", "jksdhfkhdsf")
+            .queryParam("op", "LISTSTATUS")
+            .expect()
+//            .log().ifError()
+            .statusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR)
+            .when().get(driver.getUrl("WEBHDFS") + "/v1/");
+      standbyServer.start();
+      masterServer.start();
+   }
+
+
+   @Test
    public void testServerInStandby() throws IOException {
       String username = "hdfs";
       String password = "hdfs-password";
@@ -247,6 +267,59 @@ public class WebHdfsHaFuncTest {
    }
 
    @Test
+   public void testServerInStandbyFailoverLimit() throws IOException {
+      String username = "hdfs";
+      String password = "hdfs-password";
+      //make master the server that is in standby
+      masterServer.expect()
+            .method("GET")
+            .pathInfo("/webhdfs/v1/")
+            .queryParam("op", "LISTSTATUS")
+            .queryParam("user.name", username)
+            .respond()
+            .status(HttpStatus.SC_FORBIDDEN)
+            .content(driver.getResourceBytes("webhdfs-liststatus-standby.json"))
+            .contentType("application/json");
+      standbyServer.expect()
+            .method("GET")
+            .pathInfo("/webhdfs/v1/")
+            .queryParam("op", "LISTSTATUS")
+            .queryParam("user.name", username)
+            .respond()
+            .status(HttpStatus.SC_FORBIDDEN)
+            .content(driver.getResourceBytes("webhdfs-liststatus-standby.json"))
+            .contentType("application/json");
+      masterServer.expect()
+            .method("GET")
+            .pathInfo("/webhdfs/v1/")
+            .queryParam("op", "LISTSTATUS")
+            .queryParam("user.name", username)
+            .respond()
+            .status(HttpStatus.SC_FORBIDDEN)
+            .content(driver.getResourceBytes("webhdfs-liststatus-standby.json"))
+            .contentType("application/json");
+      standbyServer.expect()
+            .method("GET")
+            .pathInfo("/webhdfs/v1/")
+            .queryParam("op", "LISTSTATUS")
+            .queryParam("user.name", username)
+            .respond()
+            .status(HttpStatus.SC_FORBIDDEN)
+            .content(driver.getResourceBytes("webhdfs-liststatus-standby.json"))
+            .contentType("application/json");
+      given()
+            .auth().preemptive().basic(username, password)
+            .header("X-XSRF-Header", "jksdhfkhdsf")
+            .queryParam("op", "LISTSTATUS")
+            .expect()
+//            .log().ifError()
+            .statusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR)
+            .when().get(driver.getUrl("WEBHDFS") + "/v1/");
+      masterServer.isEmpty();
+      standbyServer.isEmpty();
+   }
+
+   @Test
    public void testServerInSafeMode() throws IOException {
       String username = "hdfs";
       String password = "hdfs-password";
@@ -283,4 +356,61 @@ public class WebHdfsHaFuncTest {
             .when().post(driver.getUrl("WEBHDFS") + "/v1/user/hdfs/foo.txt");
       masterServer.isEmpty();
    }
+
+   @Test
+   public void testServerInSafeModeRetryLimit() throws IOException {
+      String username = "hdfs";
+      String password = "hdfs-password";
+      //master is in safe mode
+      masterServer.expect()
+            .method("POST")
+            .pathInfo("/webhdfs/v1/user/hdfs/foo.txt")
+            .queryParam("op", "RENAME")
+            .queryParam("destination", "/user/hdfs/foo.txt")
+            .queryParam("user.name", username)
+            .respond()
+            .status(HttpStatus.SC_FORBIDDEN)
+            .content(driver.getResourceBytes("webhdfs-rename-safemode.json"))
+            .contentType("application/json");
+      masterServer.expect()
+            .method("POST")
+            .pathInfo("/webhdfs/v1/user/hdfs/foo.txt")
+            .queryParam("op", "RENAME")
+            .queryParam("destination", "/user/hdfs/foo.txt")
+            .queryParam("user.name", username)
+            .respond()
+            .status(HttpStatus.SC_FORBIDDEN)
+            .content(driver.getResourceBytes("webhdfs-rename-safemode.json"))
+            .contentType("application/json");
+      masterServer.expect()
+            .method("POST")
+            .pathInfo("/webhdfs/v1/user/hdfs/foo.txt")
+            .queryParam("op", "RENAME")
+            .queryParam("destination", "/user/hdfs/foo.txt")
+            .queryParam("user.name", username)
+            .respond()
+            .status(HttpStatus.SC_FORBIDDEN)
+            .content(driver.getResourceBytes("webhdfs-rename-safemode.json"))
+            .contentType("application/json");
+      masterServer.expect()
+            .method("POST")
+            .pathInfo("/webhdfs/v1/user/hdfs/foo.txt")
+            .queryParam("op", "RENAME")
+            .queryParam("destination", "/user/hdfs/foo.txt")
+            .queryParam("user.name", username)
+            .respond()
+            .status(HttpStatus.SC_FORBIDDEN)
+            .content(driver.getResourceBytes("webhdfs-rename-safemode.json"))
+            .contentType("application/json");
+      given()
+            .auth().preemptive().basic(username, password)
+            .header("X-XSRF-Header", "jksdhfkhdsf")
+            .queryParam("op", "RENAME")
+            .queryParam("destination", "/user/hdfs/foo.txt")
+            .expect()
+//            .log().ifError()
+            .statusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR)
+            .when().post(driver.getUrl("WEBHDFS") + "/v1/user/hdfs/foo.txt");
+      masterServer.isEmpty();
+   }
 }