You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/06/10 14:19:56 UTC

[hive] branch master updated: HIVE-23659: Add Retry for Ranger Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 84ee70e  HIVE-23659: Add Retry for Ranger Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
84ee70e is described below

commit 84ee70e8ae6e9cd41e36feab16cbcc5afad50a0b
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Wed Jun 10 19:49:44 2020 +0530

    HIVE-23659: Add Retry for Ranger Replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
 .../ql/exec/repl/ranger/RangerRestClientImpl.java  | 171 ++++++++++++---------
 .../apache/hadoop/hive/metastore/utils/Retry.java  |  21 ++-
 .../hadoop/hive/metastore/utils/RetryTest.java     |  69 ++++++++-
 3 files changed, 187 insertions(+), 74 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
index 1b17632..13d3836 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -72,7 +72,6 @@ public class RangerRestClientImpl implements RangerRestClient {
                                                      String dbName,
                                                      String rangerHiveServiceName)throws SemanticException {
     LOG.info("Ranger endpoint for cluster " + sourceRangerEndpoint);
-    ClientResponse clientResp;
     String uri;
     if (StringUtils.isEmpty(rangerHiveServiceName)) {
       throw new SemanticException("Ranger Service Name cannot be empty");
@@ -85,33 +84,43 @@ public class RangerRestClientImpl implements RangerRestClient {
     }
     String url = sourceRangerEndpoint + (uri.startsWith("/") ? uri : ("/" + uri));
     LOG.debug("Url to export policies from source Ranger: {}", url);
-    RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList();
-    WebResource.Builder builder = getRangerResourceBuilder(url);
-    clientResp = builder.get(ClientResponse.class);
-
-    String response = null;
-    if (clientResp != null) {
-      if (clientResp.getStatus() == HttpServletResponse.SC_OK) {
-        Gson gson = new GsonBuilder().create();
-        response = clientResp.getEntity(String.class);
-        LOG.debug("Response received for ranger export {} ", response);
-        if (StringUtils.isNotEmpty(response)) {
-          rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class);
-          return rangerExportPolicyList;
+
+    Retry<RangerExportPolicyList> retriable = new Retry<RangerExportPolicyList>(Exception.class) {
+      @Override
+      public RangerExportPolicyList execute() throws Exception {
+        WebResource.Builder builder = getRangerResourceBuilder(url);
+        RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList();
+        ClientResponse clientResp = builder.get(ClientResponse.class);
+        String response = null;
+        if (clientResp != null) {
+          if (clientResp.getStatus() == HttpServletResponse.SC_OK) {
+            Gson gson = new GsonBuilder().create();
+            response = clientResp.getEntity(String.class);
+            LOG.debug("Response received for ranger export {} ", response);
+            if (StringUtils.isNotEmpty(response)) {
+              rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class);
+              return rangerExportPolicyList;
+            }
+          } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
+            LOG.debug("Ranger policy export request returned empty list");
+            return rangerExportPolicyList;
+          } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
+            throw new SemanticException("Authentication Failure while communicating to Ranger admin");
+          } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) {
+            throw new SemanticException("Authorization Failure while communicating to Ranger admin");
+          }
         }
-      } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
-        LOG.debug("Ranger policy export request returned empty list");
-        return rangerExportPolicyList;
-      } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
-        throw new SemanticException("Authentication Failure while communicating to Ranger admin");
-      } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) {
-        throw new SemanticException("Authorization Failure while communicating to Ranger admin");
+        if (StringUtils.isEmpty(response)) {
+          LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
+        }
+        return null;
       }
+    };
+    try {
+      return retriable.runWithDelay();
+    } catch (Exception e) {
+      throw new SemanticException(e);
     }
-    if (StringUtils.isEmpty(response)) {
-      LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
-    }
-    return rangerExportPolicyList;
   }
 
   public List<RangerPolicy> removeMultiResourcePolicies(List<RangerPolicy> rangerPolicies) {
@@ -170,50 +179,60 @@ public class RangerRestClientImpl implements RangerRestClient {
         + (uri.startsWith("/") ? uri : ("/" + uri));
 
     LOG.debug("URL to import policies on target Ranger: {}", url);
-    ClientResponse clientResp = null;
-
-    StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file",
-        new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)),
-        rangerPoliciesJsonFileName);
-    StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson",
-        new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName);
-
-    FormDataMultiPart formDataMultiPart = new FormDataMultiPart();
-    MultiPart multipartEntity = null;
-    try {
-      multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap);
-      WebResource.Builder builder = getRangerResourceBuilder(url);
-      clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA)
-          .post(ClientResponse.class, multipartEntity);
-      if (clientResp != null) {
-        if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
-          LOG.debug("Ranger policy import finished successfully");
-
-        } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
-          throw new Exception("Authentication Failure while communicating to Ranger admin");
-        } else {
-          throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs.");
-        }
-      }
-    } finally {
-      try {
-        if (filePartPolicies != null) {
-          filePartPolicies.cleanup();
-        }
-        if (filePartServiceMap != null) {
-          filePartServiceMap.cleanup();
-        }
-        if (formDataMultiPart != null) {
-          formDataMultiPart.close();
-        }
-        if (multipartEntity != null) {
-          multipartEntity.close();
+    Retry<RangerExportPolicyList> retriable = new Retry<RangerExportPolicyList>(Exception.class) {
+      @Override
+      public RangerExportPolicyList execute() throws Exception {
+        ClientResponse clientResp = null;
+
+        StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file",
+            new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)),
+            rangerPoliciesJsonFileName);
+        StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson",
+            new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName);
+
+        FormDataMultiPart formDataMultiPart = new FormDataMultiPart();
+        MultiPart multipartEntity = null;
+        try {
+          multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap);
+          WebResource.Builder builder = getRangerResourceBuilder(url);
+          clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA)
+            .post(ClientResponse.class, multipartEntity);
+          if (clientResp != null) {
+            if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
+              LOG.debug("Ranger policy import finished successfully");
+
+            } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
+              throw new Exception("Authentication Failure while communicating to Ranger admin");
+            } else {
+              throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs.");
+            }
+          }
+        } finally {
+          try {
+            if (filePartPolicies != null) {
+              filePartPolicies.cleanup();
+            }
+            if (filePartServiceMap != null) {
+              filePartServiceMap.cleanup();
+            }
+            if (formDataMultiPart != null) {
+              formDataMultiPart.close();
+            }
+            if (multipartEntity != null) {
+              multipartEntity.close();
+            }
+          } catch (IOException e) {
+            LOG.error("Exception occurred while closing resources: {}", e);
+          }
         }
-      } catch (IOException e) {
-        LOG.error("Exception occurred while closing resources: {}", e);
+        return rangerExportPolicyList;
       }
+    };
+    try {
+      return retriable.runWithDelay();
+    } catch (Exception e) {
+      throw new SemanticException(e);
     }
-    return rangerExportPolicyList;
   }
 
   private synchronized Client getRangerClient() {
@@ -342,11 +361,21 @@ public class RangerRestClientImpl implements RangerRestClient {
   }
 
   @Override
-  public boolean checkConnection(String url) {
-    WebResource.Builder builder;
-    builder = getRangerResourceBuilder(url);
-    ClientResponse clientResp = builder.get(ClientResponse.class);
-    return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED);
+  public boolean checkConnection(String url) throws SemanticException {
+    Retry<Boolean> retriable = new Retry<Boolean>(Exception.class) {
+      @Override
+      public Boolean execute() throws Exception {
+        WebResource.Builder builder;
+        builder = getRangerResourceBuilder(url);
+        ClientResponse clientResp = builder.get(ClientResponse.class);
+        return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED);
+      }
+    };
+    try {
+      return retriable.runWithDelay();
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
   }
 
   @Override
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
index bdb269a..032eaf4 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java
@@ -23,7 +23,8 @@ package org.apache.hadoop.hive.metastore.utils;
  */
 public abstract class Retry<T> {
 
-  public static final int MAX_RETRIES = 3;
+  public static final int MAX_RETRIES = 4;
+  public static final int DELAY = 30 * 1000;
   private int tries = 0;
   private Class retryExceptionType;
 
@@ -49,4 +50,22 @@ public abstract class Retry<T> {
       }
     }
   }
+
+  public T runWithDelay() throws Exception {
+    try {
+      return execute();
+    } catch(Exception e) {
+      if (e.getClass().equals(retryExceptionType)){
+        tries++;
+        if (MAX_RETRIES == tries) {
+          throw e;
+        } else {
+          Thread.sleep(DELAY * tries);
+          return runWithDelay();
+        }
+      } else {
+        throw e;
+      }
+    }
+  }
 }
diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
index 67bd658..8cff68d 100644
--- a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
+++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java
@@ -28,15 +28,21 @@ public class RetryTest {
   @Test
   public void testRetrySuccess() {
     Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+      private int count = 0;
       @Override
       public Void execute() {
-        throw new NullPointerException();
+        if (count < 1) {
+          count++;
+          throw new NullPointerException();
+        } else {
+          return null;
+        }
       }
     };
     try {
       retriable.run();
     } catch (Exception e) {
-      Assert.assertEquals(NullPointerException.class, e.getClass());
+      Assert.fail();
     }
   }
 
@@ -50,8 +56,67 @@ public class RetryTest {
     };
     try {
       retriable.run();
+      Assert.fail();
     } catch (Exception e) {
       Assert.assertEquals(RuntimeException.class, e.getClass());
     }
   }
+
+  @Test
+  public void testRetryFailureWithDelay() {
+    Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+      @Override
+      public Void execute() {
+        throw new RuntimeException();
+      }
+    };
+    try {
+      retriable.runWithDelay();
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals(RuntimeException.class, e.getClass());
+    }
+  }
+
+  @Test
+  public void testRetrySuccessWithDelay() {
+    Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+      private long startTime = System.currentTimeMillis();
+      @Override
+      public Void execute() {
+        executeWithDelay(startTime);
+        return null;
+      }
+    };
+    try {
+      retriable.runWithDelay();
+    } catch (Exception e) {
+      Assert.fail();
+    }
+  }
+
+  private void executeWithDelay(long startTime) {
+    long currentTime = System.currentTimeMillis();
+    if (currentTime - startTime < 40 * 1000) {
+      throw new NullPointerException();
+    }
+  }
+
+  @Test
+  public void testRetryFailureWithDelayMoreThanTimeout() {
+    Retry<Void> retriable = new Retry<Void>(NullPointerException.class) {
+      @Override
+      public Void execute() {
+        throw new NullPointerException();
+      }
+    };
+    long startTime = System.currentTimeMillis();
+    try {
+      retriable.runWithDelay();
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertEquals(NullPointerException.class, e.getClass());
+      Assert.assertTrue(System.currentTimeMillis() - startTime > 180 * 1000);
+    }
+  }
 }