You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by el...@apache.org on 2019/02/05 15:13:58 UTC

[hadoop] branch trunk updated: HDDS-776. Make OM initialization resilient to dns failures. Contributed by Doroszlai, Attila.

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f2da01  HDDS-776. Make OM initialization resilient to dns failures. Contributed by Doroszlai, Attila.
9f2da01 is described below

commit 9f2da015916939af7d987338d170fb3e9589aaa4
Author: Márton Elek <el...@apache.org>
AuthorDate: Tue Feb 5 16:11:16 2019 +0100

    HDDS-776. Make OM initialization resilient to dns failures. Contributed by Doroszlai, Attila.
---
 hadoop-hdds/common/pom.xml                         |  4 ++
 .../org/apache/hadoop/utils/RetriableTask.java     | 78 ++++++++++++++++++++++
 .../org/apache/hadoop/utils/TestRetriableTask.java | 76 +++++++++++++++++++++
 hadoop-hdds/pom.xml                                |  8 ++-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 22 +++++-
 5 files changed, 185 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index 93bb63f..cb34d27 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -123,6 +123,10 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>commons-validator</artifactId>
       <version>1.6</version>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RetriableTask.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RetriableTask.java
new file mode 100644
index 0000000..a847ae0
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RetriableTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.util.ThreadUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/**
+ * {@code Callable} implementation that retries a delegate task according to
+ * the specified {@code RetryPolicy}.  Sleeps between retries in the caller
+ * thread.
+ *
+ * @param <V> the result type of method {@code call}
+ */
+public class RetriableTask<V> implements Callable<V> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RetriableTask.class);
+
+  private final String name;
+  private final Callable<V> task;
+  private final RetryPolicy retryPolicy;
+
+  public RetriableTask(RetryPolicy retryPolicy, String name, Callable<V> task) {
+    this.retryPolicy = retryPolicy;
+    this.name = name;
+    this.task = task;
+  }
+
+  @Override
+  public V call() throws Exception {
+    int attempts = 0;
+    Exception cause;
+    while (true) {
+      try {
+        return task.call();
+      } catch (Exception e) {
+        cause = e;
+        RetryPolicy.RetryAction action = retryPolicy.shouldRetry(e, ++attempts,
+             0, true);
+        if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+          LOG.info("Execution of task {} failed, will be retried in {} ms",
+              name, action.delayMillis);
+          ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
+        } else {
+          break;
+        }
+      }
+    }
+
+    String msg = String.format(
+        "Execution of task %s failed permanently after %d attempts",
+        name, attempts);
+    LOG.warn(msg, cause);
+    throw new IOException(msg, cause);
+  }
+
+}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRetriableTask.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRetriableTask.java
new file mode 100644
index 0000000..e9df3a8
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRetriableTask.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.ZipException;
+
+/**
+ * Tests for {@link RetriableTask}.
+ */
+public class TestRetriableTask {
+
+  @Test
+  public void returnsSuccessfulResult() throws Exception {
+    String result = "bilbo";
+    RetriableTask<String> task = new RetriableTask<>(
+        RetryPolicies.RETRY_FOREVER, "test", () -> result);
+    assertEquals(result, task.call());
+  }
+
+  @Test
+  public void returnsSuccessfulResultAfterFailures() throws Exception {
+    String result = "gandalf";
+    AtomicInteger attempts = new AtomicInteger();
+    RetriableTask<String> task = new RetriableTask<>(
+        RetryPolicies.RETRY_FOREVER, "test",
+        () -> {
+          if (attempts.incrementAndGet() <= 2) {
+            throw new Exception("testing");
+          }
+          return result;
+        });
+    assertEquals(result, task.call());
+  }
+
+  @Test
+  public void respectsRetryPolicy() {
+    int expectedAttempts = 3;
+    AtomicInteger attempts = new AtomicInteger();
+    RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+        expectedAttempts, 1, TimeUnit.MILLISECONDS);
+    RetriableTask<String> task = new RetriableTask<>(retryPolicy, "thr", () -> {
+      attempts.incrementAndGet();
+      throw new ZipException("testing");
+    });
+
+    IOException e = assertThrows(IOException.class, task::call);
+    assertEquals(ZipException.class, e.getCause().getClass());
+    assertEquals(expectedAttempts, attempts.get());
+  }
+
+}
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index d9ecedd..35cfd0b 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -55,6 +55,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
     <maven-surefire-plugin.version>3.0.0-M1</maven-surefire-plugin.version>
 
+    <junit.jupiter.version>5.3.1</junit.jupiter.version>
   </properties>
   <repositories>
     <repository>
@@ -182,7 +183,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
         <version>${bouncycastle.version}</version>
       </dependency>
 
-
+      <dependency>
+        <groupId>org.junit.jupiter</groupId>
+        <artifactId>junit-jupiter-api</artifactId>
+        <version>${junit.jupiter.version}</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <dependencies>
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 041d9cf..a01ad0c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -112,6 +113,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.utils.RetriableTask;
 import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,6 +142,7 @@ import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
 import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
 import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
@@ -695,8 +698,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     StorageState state = omStorage.getState();
     if (state != StorageState.INITIALIZED) {
       try {
-        ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf);
-        ScmInfo scmInfo = scmBlockClient.getScmInfo();
+        ScmInfo scmInfo = getScmInfo(conf);
         String clusterId = scmInfo.getClusterId();
         String scmId = scmInfo.getScmId();
         if (clusterId == null || clusterId.isEmpty()) {
@@ -726,6 +728,22 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  private static ScmInfo getScmInfo(OzoneConfiguration conf)
+      throws IOException {
+    try {
+      RetryPolicy retryPolicy = retryUpToMaximumCountWithFixedSleep(
+          10, 5, TimeUnit.SECONDS);
+      RetriableTask<ScmInfo> retriable = new RetriableTask<>(
+          retryPolicy, "OM#getScmInfo",
+          () -> getScmBlockClient(conf).getScmInfo());
+      return retriable.call();
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Failed to get SCM info", e);
+    }
+  }
+
   /**
    * Parses the command line options for OM initialization.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org