You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by el...@apache.org on 2019/02/05 15:13:58 UTC
[hadoop] branch trunk updated: HDDS-776. Make OM initialization
resilient to dns failures. Contributed by Doroszlai, Attila.
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9f2da01 HDDS-776. Make OM initialization resilient to dns failures. Contributed by Doroszlai, Attila.
9f2da01 is described below
commit 9f2da015916939af7d987338d170fb3e9589aaa4
Author: Márton Elek <el...@apache.org>
AuthorDate: Tue Feb 5 16:11:16 2019 +0100
HDDS-776. Make OM initialization resilient to dns failures. Contributed by Doroszlai, Attila.
---
hadoop-hdds/common/pom.xml | 4 ++
.../org/apache/hadoop/utils/RetriableTask.java | 78 ++++++++++++++++++++++
.../org/apache/hadoop/utils/TestRetriableTask.java | 76 +++++++++++++++++++++
hadoop-hdds/pom.xml | 8 ++-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 22 +++++-
5 files changed, 185 insertions(+), 3 deletions(-)
diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index 93bb63f..cb34d27 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -123,6 +123,10 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>commons-validator</artifactId>
<version>1.6</version>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RetriableTask.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RetriableTask.java
new file mode 100644
index 0000000..a847ae0
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RetriableTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.util.ThreadUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/**
+ * {@code Callable} implementation that retries a delegate task according to
+ * the specified {@code RetryPolicy}. Sleeps between retries in the caller
+ * thread.
+ *
+ * @param <V> the result type of method {@code call}
+ */
+public class RetriableTask<V> implements Callable<V> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RetriableTask.class);
+
+ private final String name;
+ private final Callable<V> task;
+ private final RetryPolicy retryPolicy;
+
+ public RetriableTask(RetryPolicy retryPolicy, String name, Callable<V> task) {
+ this.retryPolicy = retryPolicy;
+ this.name = name;
+ this.task = task;
+ }
+
+ @Override
+ public V call() throws Exception {
+ int attempts = 0;
+ Exception cause;
+ while (true) {
+ try {
+ return task.call();
+ } catch (Exception e) {
+ cause = e;
+ RetryPolicy.RetryAction action = retryPolicy.shouldRetry(e, ++attempts,
+ 0, true);
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+ LOG.info("Execution of task {} failed, will be retried in {} ms",
+ name, action.delayMillis);
+ ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
+ } else {
+ break;
+ }
+ }
+ }
+
+ String msg = String.format(
+ "Execution of task %s failed permanently after %d attempts",
+ name, attempts);
+ LOG.warn(msg, cause);
+ throw new IOException(msg, cause);
+ }
+
+}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRetriableTask.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRetriableTask.java
new file mode 100644
index 0000000..e9df3a8
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRetriableTask.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.ZipException;
+
+/**
+ * Tests for {@link RetriableTask}.
+ */
+public class TestRetriableTask {
+
+ @Test
+ public void returnsSuccessfulResult() throws Exception {
+ String result = "bilbo";
+ RetriableTask<String> task = new RetriableTask<>(
+ RetryPolicies.RETRY_FOREVER, "test", () -> result);
+ assertEquals(result, task.call());
+ }
+
+ @Test
+ public void returnsSuccessfulResultAfterFailures() throws Exception {
+ String result = "gandalf";
+ AtomicInteger attempts = new AtomicInteger();
+ RetriableTask<String> task = new RetriableTask<>(
+ RetryPolicies.RETRY_FOREVER, "test",
+ () -> {
+ if (attempts.incrementAndGet() <= 2) {
+ throw new Exception("testing");
+ }
+ return result;
+ });
+ assertEquals(result, task.call());
+ }
+
+ @Test
+ public void respectsRetryPolicy() {
+ int expectedAttempts = 3;
+ AtomicInteger attempts = new AtomicInteger();
+ RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ expectedAttempts, 1, TimeUnit.MILLISECONDS);
+ RetriableTask<String> task = new RetriableTask<>(retryPolicy, "thr", () -> {
+ attempts.incrementAndGet();
+ throw new ZipException("testing");
+ });
+
+ IOException e = assertThrows(IOException.class, task::call);
+ assertEquals(ZipException.class, e.getCause().getClass());
+ assertEquals(expectedAttempts, attempts.get());
+ }
+
+}
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index d9ecedd..35cfd0b 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -55,6 +55,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<maven-surefire-plugin.version>3.0.0-M1</maven-surefire-plugin.version>
+ <junit.jupiter.version>5.3.1</junit.jupiter.version>
</properties>
<repositories>
<repository>
@@ -182,7 +183,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<version>${bouncycastle.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>${junit.jupiter.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
<dependencies>
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 041d9cf..a01ad0c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@@ -112,6 +113,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.utils.RetriableTask;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,6 +142,7 @@ import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
@@ -695,8 +698,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
StorageState state = omStorage.getState();
if (state != StorageState.INITIALIZED) {
try {
- ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf);
- ScmInfo scmInfo = scmBlockClient.getScmInfo();
+ ScmInfo scmInfo = getScmInfo(conf);
String clusterId = scmInfo.getClusterId();
String scmId = scmInfo.getScmId();
if (clusterId == null || clusterId.isEmpty()) {
@@ -726,6 +728,22 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
+ private static ScmInfo getScmInfo(OzoneConfiguration conf)
+ throws IOException {
+ try {
+ RetryPolicy retryPolicy = retryUpToMaximumCountWithFixedSleep(
+ 10, 5, TimeUnit.SECONDS);
+ RetriableTask<ScmInfo> retriable = new RetriableTask<>(
+ retryPolicy, "OM#getScmInfo",
+ () -> getScmBlockClient(conf).getScmInfo());
+ return retriable.call();
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Failed to get SCM info", e);
+ }
+ }
+
/**
* Parses the command line options for OM initialization.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org