You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/07/30 07:30:41 UTC

[GitHub] [hive] pkumarsinha commented on a change in pull request #1329: HIVE-23897 : Create a common Retry Interface for replication

pkumarsinha commented on a change in pull request #1329:
URL: https://github.com/apache/hive/pull/1329#discussion_r462125753



##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -617,6 +617,22 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
             "Name of the source cluster for the replication."),
     REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null,
             "Name of the target cluster for the replication."),
+    REPL_RETRY_INTIAL_DELAY("hive.repl.retry.initial.delay", "60s",
+      new TimeValidator(TimeUnit.SECONDS),
+      "Initial Delay before retry starts."),
+    REPL_RETRY_BACKOFF_COEFFICIENT("hive.repl.retry.backoff.coefficient", 1.2f,
+      "The backoff coefficient for exponential retry delay between retries. " +
+        "Previous Delay * Backoff Coefficient will determine the next retry interval"),
+    REPL_RETRY_JITTER("hive.repl.retry.jitter", "30s", new TimeValidator(TimeUnit.SECONDS),
+      "A random jitter to be applied to avoid all retries happening at the same time."),
+    REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES("hive.repl.retry.max.delay.between.retries", "60m",
+      new TimeValidator(TimeUnit.MINUTES),
+      "Maximum allowed retry delay in seconds after including exponential backoff. " +

Review comment:
       Default is in minutes but the comment is in second. Should we change  this?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.atlas.AtlasServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+/**
+ * Implement retry logic for service calls.
+ */
+public class RetryingClientTimeBased {
+  private static long MINIMUM_DELAY_IN_SEC = 1;
+  private static final Logger LOG = LoggerFactory.getLogger(RetryingClientTimeBased.class);
+  private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to create/update";
+  private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is in progress";
+  private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP file";
+  protected long totalDurationInSeconds;
+  protected long initialDelayInSeconds;
+  protected long maxRetryDelayInSeconds;
+  protected double backOff;
+  protected int maxJitterInSeconds;
+
+  protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws Exception {
+    long startTime = System.currentTimeMillis();
+    long delay = this.initialDelayInSeconds;
+    while (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
+      try {
+        LOG.debug("Retrying method: {}", func.getClass().getName(), null);
+        return func.call();
+      } catch (Exception e) {
+        if (processImportExportLockException(e, delay)) {
+          //retry case. compute next sleep time
+          delay = getNextDelay(delay);
+          continue;
+        }
+        if (processInvalidParameterException(e)) {
+          return null;
+        }
+        LOG.error(func.getClass().getName(), e);
+        throw new Exception(e);
+      }
+    }
+    return defaultReturnValue;
+  }
+
+  private long getNextDelay(long currentDelay) {
+    if (currentDelay <= 0) { // in case initial delay was set to 0.
+      currentDelay = MINIMUM_DELAY_IN_SEC;
+    }
+

Review comment:
       nit:can consider formatting this file

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public class Retryable {
+  private static long MINIMUM_DELAY_IN_SEC = 1;
+
+  private long totalDurationInSeconds;
+  private List<Class<? extends Exception>> retryOn;
+  private List<Class<? extends Exception>> failOn;
+  private long initialDelayInSeconds;
+  private long maxRetryDelayInSeconds;
+  private double backOff;
+  private int maxJitterInSeconds;
+
+  private Retryable() {
+    this.retryOn = new ArrayList<>();
+    this.failOn = new ArrayList<>();
+    this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS);

Review comment:
       Format the class

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public class Retryable {
+  private static long MINIMUM_DELAY_IN_SEC = 1;
+
+  private long totalDurationInSeconds;
+  private List<Class<? extends Exception>> retryOn;
+  private List<Class<? extends Exception>> failOn;
+  private long initialDelayInSeconds;
+  private long maxRetryDelayInSeconds;
+  private double backOff;
+  private int maxJitterInSeconds;
+
+  private Retryable() {
+    this.retryOn = new ArrayList<>();
+    this.failOn = new ArrayList<>();
+    this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS);
+    this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES), TimeUnit.SECONDS);
+    this.backOff = HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.defaultFloatVal;
+    this.maxJitterInSeconds = (int) HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_JITTER.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_JITTER), TimeUnit.SECONDS);
+    this.totalDurationInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION), TimeUnit.SECONDS);;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public <T> T executeCallable(Callable<T> callable) throws Exception {
+    long startTime = System.currentTimeMillis();
+    long delay = this.initialDelayInSeconds;
+    Exception currentCapturedException = null;
+    while(true) {
+      try {
+        if (UserGroupInformation.isSecurityEnabled()) {
+          SecurityUtils.reloginExpiringKeytabUser();
+          return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<T>) () -> callable.call());
+        } else {
+          return callable.call();
+        }
+      } catch (Exception e) {
+        if (this.failOn.stream().noneMatch(k -> e.getClass().equals(k))
+          && this.retryOn.stream().anyMatch(k -> e.getClass().isAssignableFrom(k))) {
+          if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
+            // case where waiting would go beyond max duration. So throw exception and return
+            throw e;
+          }
+          sleep(delay);
+          //retry case. compute next sleep time
+          delay = getNextDelay(delay, currentCapturedException, e);
+          // reset current captured exception.
+          currentCapturedException = e;
+        } else {
+          // Exception cannot be retried on. Throw exception and return
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void sleep(long seconds) {
+    try {
+      Thread.sleep(seconds * 1000);
+    } catch (InterruptedException e) {
+      // no-op.. just proceed
+    }
+  }
+
+  private long getNextDelay(long currentDelay, final Exception previousException, final Exception currentException) {
+    if (previousException != null && !previousException.getClass().equals(currentException.getClass())) {
+      // New exception encountered. Returning initial delay for next retry.
+      return this.initialDelayInSeconds;
+    }
+
+    if (currentDelay <= 0) { // in case initial delay was set to 0.
+      currentDelay = MINIMUM_DELAY_IN_SEC;
+    }
+
+    currentDelay *= this.backOff;
+    if (this.maxJitterInSeconds > 0) {
+      currentDelay += new Random().nextInt(this.maxJitterInSeconds);
+    }
+
+    if (currentDelay > this.maxRetryDelayInSeconds) {
+      currentDelay = this.maxRetryDelayInSeconds;
+    }
+
+    return  currentDelay;
+  }
+
+  private long elapsedTimeInSeconds(long fromTimeMillis) {
+    return (System.currentTimeMillis() - fromTimeMillis)/ 1000;
+  }
+
+  public static class Builder {
+    private final Retryable runnable = new Retryable();
+    public Builder() {
+    }
+
+    public Builder withHiveConf(HiveConf conf) {
+      runnable.totalDurationInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS);
+      runnable.initialDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS);
+      runnable.maxRetryDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars
+        .REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS);
+      runnable.backOff = conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT);
+      runnable.maxJitterInSeconds = (int) conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_JITTER, TimeUnit.SECONDS);
+      return this;
+    }
+
+    public Retryable build() {
+      return runnable;
+    }
+
+    public Builder withTotalDuration(long maxDuration) {
+      runnable.totalDurationInSeconds = maxDuration;
+      return this;
+    }
+
+    // making this thread safe as it appends to list

Review comment:
       Do we really  need it synchronized as the build process is single thread operation, no?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public class Retryable {
+  private static long MINIMUM_DELAY_IN_SEC = 1;

Review comment:
       Should this or the default(a new const.) be little higher?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public class Retryable {
+  private static long MINIMUM_DELAY_IN_SEC = 1;
+
+  private long totalDurationInSeconds;
+  private List<Class<? extends Exception>> retryOn;
+  private List<Class<? extends Exception>> failOn;
+  private long initialDelayInSeconds;
+  private long maxRetryDelayInSeconds;
+  private double backOff;
+  private int maxJitterInSeconds;
+
+  private Retryable() {
+    this.retryOn = new ArrayList<>();
+    this.failOn = new ArrayList<>();
+    this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS);
+    this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES), TimeUnit.SECONDS);
+    this.backOff = HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.defaultFloatVal;
+    this.maxJitterInSeconds = (int) HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_JITTER.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_JITTER), TimeUnit.SECONDS);
+    this.totalDurationInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION), TimeUnit.SECONDS);;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public <T> T executeCallable(Callable<T> callable) throws Exception {
+    long startTime = System.currentTimeMillis();
+    long delay = this.initialDelayInSeconds;
+    Exception currentCapturedException = null;
+    while(true) {
+      try {
+        if (UserGroupInformation.isSecurityEnabled()) {
+          SecurityUtils.reloginExpiringKeytabUser();
+          return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<T>) () -> callable.call());
+        } else {
+          return callable.call();
+        }
+      } catch (Exception e) {
+        if (this.failOn.stream().noneMatch(k -> e.getClass().equals(k))
+          && this.retryOn.stream().anyMatch(k -> e.getClass().isAssignableFrom(k))) {
+          if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
+            // case where waiting would go beyond max duration. So throw exception and return
+            throw e;
+          }
+          sleep(delay);
+          //retry case. compute next sleep time
+          delay = getNextDelay(delay, currentCapturedException, e);
+          // reset current captured exception.
+          currentCapturedException = e;
+        } else {
+          // Exception cannot be retried on. Throw exception and return
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void sleep(long seconds) {
+    try {
+      Thread.sleep(seconds * 1000);
+    } catch (InterruptedException e) {
+      // no-op.. just proceed
+    }
+  }
+
+  private long getNextDelay(long currentDelay, final Exception previousException, final Exception currentException) {
+    if (previousException != null && !previousException.getClass().equals(currentException.getClass())) {

Review comment:
       Consider changing the .equals comparison here as well.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public class Retryable {
+  private static long MINIMUM_DELAY_IN_SEC = 1;
+
+  private long totalDurationInSeconds;
+  private List<Class<? extends Exception>> retryOn;
+  private List<Class<? extends Exception>> failOn;
+  private long initialDelayInSeconds;
+  private long maxRetryDelayInSeconds;
+  private double backOff;
+  private int maxJitterInSeconds;
+
+  private Retryable() {
+    this.retryOn = new ArrayList<>();
+    this.failOn = new ArrayList<>();
+    this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS);
+    this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES), TimeUnit.SECONDS);
+    this.backOff = HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.defaultFloatVal;
+    this.maxJitterInSeconds = (int) HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_JITTER.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_JITTER), TimeUnit.SECONDS);
+    this.totalDurationInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION), TimeUnit.SECONDS);;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public <T> T executeCallable(Callable<T> callable) throws Exception {
+    long startTime = System.currentTimeMillis();
+    long delay = this.initialDelayInSeconds;
+    Exception currentCapturedException = null;
+    while(true) {
+      try {
+        if (UserGroupInformation.isSecurityEnabled()) {
+          SecurityUtils.reloginExpiringKeytabUser();
+          return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<T>) () -> callable.call());
+        } else {
+          return callable.call();
+        }
+      } catch (Exception e) {
+        if (this.failOn.stream().noneMatch(k -> e.getClass().equals(k))
+          && this.retryOn.stream().anyMatch(k -> e.getClass().isAssignableFrom(k))) {
+          if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
+            // case where waiting would go beyond max duration. So throw exception and return
+            throw e;
+          }
+          sleep(delay);
+          //retry case. compute next sleep time
+          delay = getNextDelay(delay, currentCapturedException, e);

Review comment:
       nit: ( currentCapturedException and e) counterparts are (previousException, currentException) in method signature getNextDelay(). Consider changing the name, it looks a bit confusing

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public class Retryable {
+  private static long MINIMUM_DELAY_IN_SEC = 1;

Review comment:
       nit:Mark it as final, will go well with naming convension

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public class Retryable {
+  private static long MINIMUM_DELAY_IN_SEC = 1;
+
+  private long totalDurationInSeconds;
+  private List<Class<? extends Exception>> retryOn;
+  private List<Class<? extends Exception>> failOn;
+  private long initialDelayInSeconds;
+  private long maxRetryDelayInSeconds;
+  private double backOff;
+  private int maxJitterInSeconds;
+
+  private Retryable() {
+    this.retryOn = new ArrayList<>();
+    this.failOn = new ArrayList<>();
+    this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS);
+    this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES), TimeUnit.SECONDS);
+    this.backOff = HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.defaultFloatVal;
+    this.maxJitterInSeconds = (int) HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_JITTER.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_JITTER), TimeUnit.SECONDS);
+    this.totalDurationInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION), TimeUnit.SECONDS);;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public <T> T executeCallable(Callable<T> callable) throws Exception {
+    long startTime = System.currentTimeMillis();
+    long delay = this.initialDelayInSeconds;
+    Exception currentCapturedException = null;
+    while(true) {
+      try {
+        if (UserGroupInformation.isSecurityEnabled()) {
+          SecurityUtils.reloginExpiringKeytabUser();
+          return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<T>) () -> callable.call());
+        } else {
+          return callable.call();
+        }
+      } catch (Exception e) {
+        if (this.failOn.stream().noneMatch(k -> e.getClass().equals(k))

Review comment:
       e.getClass().equals(k) The exceptions classes don't have .equals() overrides so it will be reference comparison. I understand the the current usage is on Exception.class but opening up interface for any exception object will make it error prone if accidentally uses with direct exception object.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to implement any retry logic in case of exceptions.
+ */
+public class Retryable {
+  private static long MINIMUM_DELAY_IN_SEC = 1;
+
+  private long totalDurationInSeconds;
+  private List<Class<? extends Exception>> retryOn;
+  private List<Class<? extends Exception>> failOn;
+  private long initialDelayInSeconds;
+  private long maxRetryDelayInSeconds;
+  private double backOff;
+  private int maxJitterInSeconds;
+
+  private Retryable() {
+    this.retryOn = new ArrayList<>();
+    this.failOn = new ArrayList<>();
+    this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS);
+    this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES), TimeUnit.SECONDS);
+    this.backOff = HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.defaultFloatVal;
+    this.maxJitterInSeconds = (int) HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_JITTER.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_JITTER), TimeUnit.SECONDS);
+    this.totalDurationInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION.defaultStrVal,
+      HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION), TimeUnit.SECONDS);;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public <T> T executeCallable(Callable<T> callable) throws Exception {
+    long startTime = System.currentTimeMillis();
+    long delay = this.initialDelayInSeconds;
+    Exception currentCapturedException = null;
+    while(true) {
+      try {
+        if (UserGroupInformation.isSecurityEnabled()) {
+          SecurityUtils.reloginExpiringKeytabUser();
+          return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<T>) () -> callable.call());
+        } else {
+          return callable.call();
+        }
+      } catch (Exception e) {
+        if (this.failOn.stream().noneMatch(k -> e.getClass().equals(k))
+          && this.retryOn.stream().anyMatch(k -> e.getClass().isAssignableFrom(k))) {
+          if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
+            // case where waiting would go beyond max duration. So throw exception and return
+            throw e;
+          }
+          sleep(delay);
+          //retry case. compute next sleep time
+          delay = getNextDelay(delay, currentCapturedException, e);
+          // reset current captured exception.
+          currentCapturedException = e;
+        } else {
+          // Exception cannot be retried on. Throw exception and return
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void sleep(long seconds) {
+    try {
+      Thread.sleep(seconds * 1000);
+    } catch (InterruptedException e) {
+      // no-op.. just proceed
+    }
+  }
+
+  private long getNextDelay(long currentDelay, final Exception previousException, final Exception currentException) {
+    if (previousException != null && !previousException.getClass().equals(currentException.getClass())) {
+      // New exception encountered. Returning initial delay for next retry.
+      return this.initialDelayInSeconds;
+    }
+
+    if (currentDelay <= 0) { // in case initial delay was set to 0.
+      currentDelay = MINIMUM_DELAY_IN_SEC;
+    }
+
+    currentDelay *= this.backOff;
+    if (this.maxJitterInSeconds > 0) {
+      currentDelay += new Random().nextInt(this.maxJitterInSeconds);
+    }
+
+    if (currentDelay > this.maxRetryDelayInSeconds) {
+      currentDelay = this.maxRetryDelayInSeconds;
+    }
+
+    return  currentDelay;
+  }
+
+  private long elapsedTimeInSeconds(long fromTimeMillis) {
+    return (System.currentTimeMillis() - fromTimeMillis)/ 1000;
+  }
+
+  public static class Builder {
+    private final Retryable runnable = new Retryable();
+    public Builder() {
+    }
+
+    public Builder withHiveConf(HiveConf conf) {
+      runnable.totalDurationInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS);
+      runnable.initialDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS);
+      runnable.maxRetryDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars
+        .REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS);
+      runnable.backOff = conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT);
+      runnable.maxJitterInSeconds = (int) conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_JITTER, TimeUnit.SECONDS);
+      return this;
+    }
+
+    public Retryable build() {
+      return runnable;
+    }
+
+    public Builder withTotalDuration(long maxDuration) {
+      runnable.totalDurationInSeconds = maxDuration;
+      return this;
+    }
+
+    // making this thread safe as it appends to list
+    public synchronized Builder withRetryOnException(final Class<? extends Exception> exceptionClass) {
+      if (exceptionClass != null &&
+        runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+        runnable.retryOn.add(exceptionClass);
+      }
+      return this;
+    }
+
+    public synchronized Builder withRetryOnExceptionList(final List<Class<? extends Exception>> exceptionClassList) {
+      for (final Class<? extends Exception> exceptionClass : exceptionClassList) {
+        if (exceptionClass != null &&
+          runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+          runnable.retryOn.add(exceptionClass);
+        }
+      }
+      return this;
+    }
+
+    public synchronized Builder withFailOnException(final Class<? extends Exception> exceptionClass) {
+      if (exceptionClass != null &&
+        runnable.failOn.stream().noneMatch(k -> exceptionClass.equals(k))) {
+        runnable.failOn.add(exceptionClass);
+      }
+      return this;
+    }
+
+    public synchronized Builder withDontRetryOnExceptionList(final List<Class<?

Review comment:
       Why not to use similar name like withFailOnExceptionList ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org