You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/05/26 06:57:19 UTC

[ozone] branch master updated: HDDS-5219. Limit number of bad volumes by dfs.datanode.failed.volumes.tolerated. (#2243)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2671b48  HDDS-5219. Limit number of bad volumes by dfs.datanode.failed.volumes.tolerated. (#2243)
2671b48 is described below

commit 2671b48ccf42fccbb1b9f19966106b19850725df
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Wed May 26 14:57:06 2021 +0800

    HDDS-5219. Limit number of bad volumes by dfs.datanode.failed.volumes.tolerated. (#2243)
---
 .../apache/hadoop/hdds/DFSConfigKeysLegacy.java    |   5 -
 .../common/statemachine/DatanodeConfiguration.java |  30 +++++
 .../common/statemachine/DatanodeStateMachine.java  |   6 +
 .../container/common/volume/HddsVolumeChecker.java |  11 +-
 .../container/common/volume/MutableVolumeSet.java  |  42 ++++++-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   5 +-
 .../container/common/TestBlockDeletingService.java |   2 +-
 .../TestSchemaOneBackwardsCompatibility.java       |   4 +-
 .../common/impl/TestContainerPersistence.java      |   2 +-
 .../container/common/impl/TestHddsDispatcher.java  |   5 +-
 .../statemachine/TestDatanodeConfiguration.java    |  12 ++
 .../container/common/volume/TestVolumeSet.java     |   5 +-
 .../common/volume/TestVolumeSetDiskChecks.java     |  11 +-
 .../keyvalue/TestKeyValueBlockIterator.java        |   2 +-
 .../keyvalue/TestKeyValueContainerCheck.java       |   2 +-
 .../container/keyvalue/TestKeyValueHandler.java    |   3 +-
 .../container/ozoneimpl/TestContainerReader.java   |   2 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   3 +-
 .../container/metrics/TestContainerMetrics.java    |   2 +-
 .../server/TestSecureContainerServer.java          |   2 +-
 .../apache/hadoop/ozone/dn/DatanodeTestUtils.java  |  31 +++--
 .../TestDatanodeHddsVolumeFailureDetection.java    |   6 +
 .../TestDatanodeHddsVolumeFailureToleration.java   | 131 +++++++++++++++++++++
 .../ozone/debug/container/ContainerCommands.java   |   2 +-
 .../hadoop/ozone/freon/ChunkManagerDiskWrite.java  |   2 +-
 .../ozone/freon/ClosedContainerReplicator.java     |   3 +-
 .../containergenerator/GeneratorDatanode.java      |   2 +-
 .../ozone/genesis/BenchMarkDatanodeDispatcher.java |   2 +-
 28 files changed, 289 insertions(+), 46 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DFSConfigKeysLegacy.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DFSConfigKeysLegacy.java
index d9e12fc..1e6d73f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DFSConfigKeysLegacy.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DFSConfigKeysLegacy.java
@@ -84,11 +84,6 @@ public final class DFSConfigKeysLegacy {
   public static final String DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT =
       "10m";
 
-  public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY =
-      "dfs.datanode.failed.volumes.tolerated";
-
-  public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
-
   public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY =
       "dfs.metrics.percentiles.intervals";
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index c80f7af..c5885c5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -43,11 +43,15 @@ public class DatanodeConfiguration {
       "hdds.datanode.container.delete.threads.max";
   static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY =
       "hdds.datanode.periodic.disk.check.interval.minutes";
+  public static final String FAILED_VOLUMES_TOLERATED_KEY =
+      "hdds.datanode.failed.volumes.tolerated";
 
   static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
 
   static final long PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT = 15;
 
+  static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1;
+
   /**
    * The maximum number of replication commands a single datanode can execute
    * simultaneously.
@@ -123,6 +127,17 @@ public class DatanodeConfiguration {
   private long periodicDiskCheckIntervalMinutes =
       PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
 
+  @Config(key = "failed.volumes.tolerated",
+      defaultValue = "-1",
+      type = ConfigType.INT,
+      tags = { DATANODE },
+      description = "The number of volumes that are allowed to fail "
+          + "before a datanode stops offering service. "
+          + "Config this to -1 means unlimited, but we should have "
+          + "at least one good volume left."
+  )
+  private int failedVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+
   @PostConstruct
   public void validate() {
     if (replicationMaxStreams < 1) {
@@ -147,6 +162,13 @@ public class DatanodeConfiguration {
       periodicDiskCheckIntervalMinutes =
           PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
     }
+
+    if (failedVolumesTolerated < -1) {
+      LOG.warn(FAILED_VOLUMES_TOLERATED_KEY +
+          "must be greater than -1 and was set to {}. Defaulting to {}",
+          failedVolumesTolerated, FAILED_VOLUMES_TOLERATED_DEFAULT);
+      failedVolumesTolerated = FAILED_VOLUMES_TOLERATED_DEFAULT;
+    }
   }
 
   public void setReplicationMaxStreams(int replicationMaxStreams) {
@@ -173,4 +195,12 @@ public class DatanodeConfiguration {
       long periodicDiskCheckIntervalMinutes) {
     this.periodicDiskCheckIntervalMinutes = periodicDiskCheckIntervalMinutes;
   }
+
+  public int getFailedVolumesTolerated() {
+    return failedVolumesTolerated;
+  }
+
+  public void setFailedVolumesTolerated(int failedVolumesTolerated) {
+    this.failedVolumesTolerated = failedVolumesTolerated;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index aab4fa7..36ed863 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -262,6 +262,12 @@ public class DatanodeStateMachine implements Closeable {
     }
   }
 
+  public void handleFatalVolumeFailures() {
+    LOG.error("DatanodeStateMachine Shutdown due to too many bad volumes, "
+        + "check " + DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_KEY);
+    hddsDatanodeStopService.stopService();
+  }
+
   /**
    * Gets the current context.
    *
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
index 9240a85..8eaf299 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.common.volume;
 
 import javax.annotation.Nonnull;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Timer;
 
@@ -106,13 +108,12 @@ public class HddsVolumeChecker {
 
     this.timer = timer;
 
+    DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
     /**
      * Maximum number of volume failures that can be tolerated without
      * declaring a fatal error.
      */
-    int maxVolumeFailuresTolerated = conf.getInt(
-        DFSConfigKeysLegacy.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
-        DFSConfigKeysLegacy.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
+    int maxVolumeFailuresTolerated = dnConf.getFailedVolumesTolerated();
 
     minDiskCheckGapMs = conf.getTimeDuration(
         DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
@@ -140,7 +141,7 @@ public class HddsVolumeChecker {
 
     if (maxVolumeFailuresTolerated < MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
       throw new DiskErrorException("Invalid value configured for "
-          + DFSConfigKeysLegacy.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY
+          + DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_KEY
           + " - "
           + maxVolumeFailuresTolerated + " "
           + DataNode.MAX_VOLUME_FAILURES_TOLERATED_MSG);
@@ -239,7 +240,7 @@ public class HddsVolumeChecker {
      * @param failedVolumes  set of volumes that failed disk checks.
      */
     void call(Set<HddsVolume> healthyVolumes,
-        Set<HddsVolume> failedVolumes);
+        Set<HddsVolume> failedVolumes) throws IOException;
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
index e42df07..7b0a1c1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
 import org.apache.hadoop.util.DiskChecker;
@@ -102,15 +103,17 @@ public class MutableVolumeSet implements VolumeSet {
   private Runnable shutdownHook;
   private final HddsVolumeChecker volumeChecker;
   private Runnable failedVolumeListener;
+  private StateContext context;
 
-  public MutableVolumeSet(String dnUuid, ConfigurationSource conf)
-      throws IOException {
-    this(dnUuid, null, conf);
+  public MutableVolumeSet(String dnUuid, ConfigurationSource conf,
+      StateContext context) throws IOException {
+    this(dnUuid, null, conf, context);
   }
 
   public MutableVolumeSet(String dnUuid, String clusterID,
-      ConfigurationSource conf)
+      ConfigurationSource conf, StateContext context)
       throws IOException {
+    this.context = context;
     this.datanodeUuid = dnUuid;
     this.clusterID = clusterID;
     this.conf = conf;
@@ -267,7 +270,8 @@ public class MutableVolumeSet implements VolumeSet {
    * Handle one or more failed volumes.
    * @param failedVolumes
    */
-  private void handleVolumeFailures(Set<HddsVolume> failedVolumes) {
+  private void handleVolumeFailures(Set<HddsVolume> failedVolumes)
+      throws IOException {
     this.writeLock();
     try {
       for (HddsVolume v : failedVolumes) {
@@ -275,6 +279,21 @@ public class MutableVolumeSet implements VolumeSet {
         // for new containers.
         failVolume(v.getHddsRootDir().getPath());
       }
+
+      // check failed volume tolerated
+      if (!hasEnoughVolumes()) {
+        // on startup, we could not try to stop uninitialized services
+        if (shutdownHook == null) {
+          DatanodeConfiguration dnConf =
+              conf.getObject(DatanodeConfiguration.class);
+          throw new IOException("Don't have enough good volumes on startup,"
+              + " bad volumes detected: " + failedVolumes.size()
+              + " max tolerated: " + dnConf.getFailedVolumesTolerated());
+        }
+        if (context != null) {
+          context.getParent().handleFatalVolumeFailures();
+        }
+      }
     } finally {
       this.writeUnlock();
     }
@@ -522,6 +541,19 @@ public class MutableVolumeSet implements VolumeSet {
     return ImmutableMap.copyOf(volumeStateMap);
   }
 
+  public boolean hasEnoughVolumes() {
+    DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
+    int maxVolumeFailuresTolerated = dnConf.getFailedVolumesTolerated();
+
+    // Max number of bad volumes allowed, should have at least 1 good volume
+    if (maxVolumeFailuresTolerated ==
+        HddsVolumeChecker.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
+      return getVolumesList().size() >= 1;
+    } else {
+      return getFailedVolumesList().size() <= maxVolumeFailuresTolerated;
+    }
+  }
+
   public StorageLocationReport[] getStorageReport()
       throws IOException {
     boolean failed;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 4177981..736e3d6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -92,6 +92,7 @@ public class OzoneContainer {
   private final AtomicReference<InitializingStatus> initializingStatus;
   private final ReplicationServer replicationServer;
   private DatanodeDetails datanodeDetails;
+  private StateContext context;
 
   enum InitializingStatus {
     UNINITIALIZED, INITIALIZING, INITIALIZED
@@ -113,7 +114,9 @@ public class OzoneContainer {
       throws IOException {
     config = conf;
     this.datanodeDetails = datanodeDetails;
-    volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf);
+    this.context = context;
+    volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
+        context);
     volumeSet.setFailedVolumeListener(this::handleVolumeFailures);
     containerSet = new ContainerSet();
     metadataScanner = null;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 23c76dd..aa88f95 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -158,7 +158,7 @@ public class TestBlockDeletingService {
     conf = new OzoneConfiguration();
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
     datanodeUuid = UUID.randomUUID().toString();
-    volumeSet = new MutableVolumeSet(datanodeUuid, conf);
+    volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
   }
 
   @AfterClass
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
index b08fd0d..e6b841c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
@@ -238,7 +238,7 @@ public class TestSchemaOneBackwardsCompatibility {
     final long numBlocksToDelete = TestDB.NUM_PENDING_DELETION_BLOCKS;
     String datanodeUuid = UUID.randomUUID().toString();
     ContainerSet containerSet = makeContainerSet();
-    VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf);
+    VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
         new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
@@ -305,7 +305,7 @@ public class TestSchemaOneBackwardsCompatibility {
   public void testReadDeletedBlockChunkInfo() throws Exception {
     String datanodeUuid = UUID.randomUUID().toString();
     ContainerSet containerSet = makeContainerSet();
-    VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf);
+    VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
     ContainerMetrics metrics = ContainerMetrics.create(conf);
     KeyValueHandler keyValueHandler =
         new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index d994b93..5440c12 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -141,7 +141,7 @@ public class TestContainerPersistence {
   @Before
   public void setupPaths() throws IOException {
     containerSet = new ContainerSet();
-    volumeSet = new MutableVolumeSet(DATANODE_UUID, conf);
+    volumeSet = new MutableVolumeSet(DATANODE_UUID, conf, null);
     blockManager = new BlockManagerImpl(conf);
     chunkManager = ChunkManagerFactory.createChunkManager(conf, blockManager,
         null);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 34f26a7..3c02007 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -101,7 +101,8 @@ public class TestHddsDispatcher {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testDir);
     DatanodeDetails dd = randomDatanodeDetails();
-    MutableVolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf);
+    MutableVolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf,
+        null);
 
     try {
       UUID scmId = UUID.randomUUID();
@@ -277,7 +278,7 @@ public class TestHddsDispatcher {
   private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
       OzoneConfiguration conf) throws IOException {
     ContainerSet containerSet = new ContainerSet();
-    VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf);
+    VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null);
     DatanodeStateMachine stateMachine = Mockito.mock(
         DatanodeStateMachine.class);
     StateContext context = Mockito.mock(StateContext.class);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
index 35931eb..c0cc28a 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java
@@ -26,6 +26,8 @@ import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConf
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_STREAMS_LIMIT_KEY;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY;
 import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
+import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_KEY;
+import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_DEFAULT;
 
 import static org.junit.Assert.assertEquals;
 
@@ -40,11 +42,13 @@ public class TestDatanodeConfiguration {
     int validReplicationLimit = 123;
     int validDeleteThreads = 42;
     long validDiskCheckIntervalMinutes = 60;
+    int validFailedVolumesTolerated = 10;
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);
     conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, validDeleteThreads);
     conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
         validDiskCheckIntervalMinutes);
+    conf.setInt(FAILED_VOLUMES_TOLERATED_KEY, validFailedVolumesTolerated);
 
     // WHEN
     DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class);
@@ -54,6 +58,8 @@ public class TestDatanodeConfiguration {
     assertEquals(validDeleteThreads, subject.getContainerDeleteThreads());
     assertEquals(validDiskCheckIntervalMinutes,
         subject.getPeriodicDiskCheckIntervalMinutes());
+    assertEquals(validFailedVolumesTolerated,
+        subject.getFailedVolumesTolerated());
   }
 
   @Test
@@ -62,11 +68,13 @@ public class TestDatanodeConfiguration {
     int invalidReplicationLimit = -5;
     int invalidDeleteThreads = 0;
     long invalidDiskCheckIntervalMinutes = -1;
+    int invalidFailedVolumesTolerated = -2;
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);
     conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, invalidDeleteThreads);
     conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
         invalidDiskCheckIntervalMinutes);
+    conf.setInt(FAILED_VOLUMES_TOLERATED_KEY, invalidFailedVolumesTolerated);
 
     // WHEN
     DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class);
@@ -78,6 +86,8 @@ public class TestDatanodeConfiguration {
         subject.getContainerDeleteThreads());
     assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
         subject.getPeriodicDiskCheckIntervalMinutes());
+    assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
+        subject.getFailedVolumesTolerated());
   }
 
   @Test
@@ -95,6 +105,8 @@ public class TestDatanodeConfiguration {
         subject.getContainerDeleteThreads());
     assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
         subject.getPeriodicDiskCheckIntervalMinutes());
+    assertEquals(FAILED_VOLUMES_TOLERATED_DEFAULT,
+        subject.getFailedVolumesTolerated());
   }
 
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java
index 8a61140..1c05125 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java
@@ -64,7 +64,7 @@ public class TestVolumeSet {
   private static final String DUMMY_IP_ADDR = "0.0.0.0";
 
   private void initializeVolumeSet() throws Exception {
-    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf);
+    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null);
   }
 
   @Rule
@@ -230,7 +230,8 @@ public class TestVolumeSet {
     OzoneConfiguration ozoneConfig = new OzoneConfiguration();
     ozoneConfig.set(HDDS_DATANODE_DIR_KEY, readOnlyVolumePath.getAbsolutePath()
         + "," + volumePath.getAbsolutePath());
-    volSet = new MutableVolumeSet(UUID.randomUUID().toString(), ozoneConfig);
+    volSet = new MutableVolumeSet(UUID.randomUUID().toString(), ozoneConfig,
+        null);
     assertEquals(1, volSet.getFailedVolumesList().size());
     assertEquals(readOnlyVolumePath, volSet.getFailedVolumesList().get(0)
         .getHddsRootDir());
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
index 08ddb7b..514a925 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Timer;
@@ -88,7 +89,7 @@ public class TestVolumeSetDiskChecks {
 
     conf = getConfWithDataNodeDirs(numVolumes);
     final MutableVolumeSet volumeSet =
-        new MutableVolumeSet(UUID.randomUUID().toString(), conf);
+        new MutableVolumeSet(UUID.randomUUID().toString(), conf, null);
 
     assertThat(volumeSet.getVolumesList().size(), is(numVolumes));
     assertThat(volumeSet.getFailedVolumesList().size(), is(0));
@@ -113,7 +114,7 @@ public class TestVolumeSetDiskChecks {
 
     conf = getConfWithDataNodeDirs(numVolumes);
     final MutableVolumeSet volumeSet = new MutableVolumeSet(
-        UUID.randomUUID().toString(), conf) {
+        UUID.randomUUID().toString(), conf, null) {
       @Override
       HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
           throws DiskErrorException {
@@ -137,7 +138,7 @@ public class TestVolumeSetDiskChecks {
     conf = getConfWithDataNodeDirs(numVolumes);
 
     final MutableVolumeSet volumeSet = new MutableVolumeSet(
-        UUID.randomUUID().toString(), conf) {
+        UUID.randomUUID().toString(), conf, null) {
       @Override
       HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
           throws DiskErrorException {
@@ -163,6 +164,10 @@ public class TestVolumeSetDiskChecks {
     }
     ozoneConf.set(DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY,
         String.join(",", dirs));
+    DatanodeConfiguration dnConf =
+        ozoneConf.getObject(DatanodeConfiguration.class);
+    dnConf.setFailedVolumesTolerated(numDirs);
+    ozoneConf.setFromObject(dnConf);
     return ozoneConf;
   }
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
index 24badab..6d98d33 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueBlockIterator.java
@@ -85,7 +85,7 @@ public class TestKeyValueBlockIterator {
     testRoot = GenericTestUtils.getRandomizedTestDir();
     conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
-    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf);
+    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null);
 
     containerData = new KeyValueContainerData(105L,
             layout,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
index 716576f..31b8bfa 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -97,7 +97,7 @@ import static org.junit.Assert.assertFalse;
     conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
     chunkManagerTestInfo.updateConfig(conf);
-    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf);
+    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null);
     chunkManager = chunkManagerTestInfo.createChunkManager(true, null);
   }
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 04fa832..a718e82 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -264,7 +264,8 @@ public class TestKeyValueHandler {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, path.getAbsolutePath());
     MutableVolumeSet
-        volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf);
+        volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf,
+        null);
     try {
       ContainerSet cset = new ContainerSet();
       int[] interval = new int[1];
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
index 2d8b231..ad87629 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java
@@ -269,7 +269,7 @@ public class TestContainerReader {
     conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
         datanodeDirs.toString());
     MutableVolumeSet volumeSets =
-        new MutableVolumeSet(datanodeId.toString(), clusterId, conf);
+        new MutableVolumeSet(datanodeId.toString(), clusterId, conf, null);
     ContainerCache cache = ContainerCache.getInstance(conf);
     cache.clear();
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 96bde76..0b0d584 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -104,7 +104,8 @@ public class TestOzoneContainer {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         folder.newFolder().getAbsolutePath());
     commitSpaceMap = new HashMap<String, Long>();
-    volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf);
+    volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
+        null);
     volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index 2a0d98f..f7e1f44 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -92,7 +92,7 @@ public class TestContainerMetrics {
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
       conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path);
       VolumeSet volumeSet = new MutableVolumeSet(
-          datanodeDetails.getUuidString(), conf);
+          datanodeDetails.getUuidString(), conf, null);
       ContainerSet containerSet = new ContainerSet();
       DatanodeStateMachine stateMachine = Mockito.mock(
           DatanodeStateMachine.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 92df53f..b4b2c61 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -168,7 +168,7 @@ public class TestSecureContainerServer {
     conf.set(HDDS_DATANODE_DIR_KEY,
         Paths.get(TEST_DIR, "dfs", "data", "hdds",
             RandomStringUtils.randomAlphabetic(4)).toString());
-    VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf);
+    VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null);
     DatanodeStateMachine stateMachine = Mockito.mock(
         DatanodeStateMachine.class);
     StateContext context = Mockito.mock(StateContext.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java
index 427e88e..4b4dddf 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/DatanodeTestUtils.java
@@ -179,31 +179,48 @@ public final class DatanodeTestUtils {
   }
 
   /**
-   * Simulate a bad volume by removing write permission.
+   * Simulate a bad rootDir by removing write permission.
    * @see {@link org.apache.hadoop.ozone.container.common.volume
    * .HddsVolume#check(Boolean)}
-   * @param vol
+   * @param rootDir
    */
-  public static void simulateBadVolume(HddsVolume vol) {
-    File rootDir = vol.getHddsRootDir();
+  public static void simulateBadRootDir(File rootDir) {
     if (rootDir.exists()) {
       rootDir.setWritable(false);
     }
   }
 
   /**
+   * Simulate a bad volume by removing write permission.
+   * @see {@link org.apache.hadoop.ozone.container.common.volume
+   * .HddsVolume#check(Boolean)}
+   * @param vol
+   */
+  public static void simulateBadVolume(HddsVolume vol) {
+    simulateBadRootDir(vol.getHddsRootDir());
+  }
+
+  /**
    * Restore a simulated bad volume to normal.
    * @see {@link #simulateBadVolume(HddsVolume)}
-   * @param vol
+   * @param rootDir
    */
-  public static void restoreBadVolume(HddsVolume vol) {
-    File rootDir = vol.getHddsRootDir();
+  public static void restoreBadRootDir(File rootDir) {
     if (rootDir.exists()) {
       rootDir.setWritable(true);
     }
   }
 
   /**
+   * Restore a simulated bad rootDir to normal.
+   * @see {@link #simulateBadVolume(HddsVolume)}
+   * @param vol
+   */
+  public static void restoreBadVolume(HddsVolume vol) {
+    restoreBadRootDir(vol.getHddsRootDir());
+  }
+
+  /**
    * Wait for detect volume failure.
    *
    * @param volSet
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java
index 178fa59..ef61ad1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureDetection.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -96,6 +97,11 @@ public class TestDatanodeHddsVolumeFailureDetection {
     ozoneConfig.setTimeDuration(
         DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 5,
         TimeUnit.SECONDS);
+    // set tolerated = 1
+    DatanodeConfiguration dnConf =
+        ozoneConfig.getObject(DatanodeConfiguration.class);
+    dnConf.setFailedVolumesTolerated(1);
+    ozoneConfig.setFromObject(dnConf);
     cluster = MiniOzoneCluster.newBuilder(ozoneConfig)
         .setNumDatanodes(1)
         .setNumDataVolumes(1)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureToleration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureToleration.java
new file mode 100644
index 0000000..55dd958
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/volume/TestDatanodeHddsVolumeFailureToleration.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.dn.volume;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.dn.DatanodeTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION;
+
+/**
+ * This class tests datanode can tolerate configured num of failed volumes.
+ */
+public class TestDatanodeHddsVolumeFailureToleration {
+  /**
+   * Set a timeout for each test.
+   */
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+  private MiniOzoneCluster cluster;
+  private OzoneConfiguration ozoneConfig;
+  private List<HddsDatanodeService> datanodes;
+
+  @Before
+  public void init() throws Exception {
+    ozoneConfig = new OzoneConfiguration();
+    ozoneConfig.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
+    ozoneConfig.setInt(OZONE_REPLICATION, ReplicationFactor.ONE.getValue());
+    // set tolerated = 1
+    DatanodeConfiguration dnConf =
+        ozoneConfig.getObject(DatanodeConfiguration.class);
+    dnConf.setFailedVolumesTolerated(1);
+    ozoneConfig.setFromObject(dnConf);
+    cluster = MiniOzoneCluster.newBuilder(ozoneConfig)
+        .setNumDatanodes(1)
+        .setNumDataVolumes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+    datanodes = cluster.getHddsDatanodes();
+  }
+
+  @After
+  public void shutdown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testTolerationOnStartupSuccess() throws Exception {
+    HddsDatanodeService dn = datanodes.get(0);
+    OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
+    MutableVolumeSet volSet = oc.getVolumeSet();
+    HddsVolume vol0 = volSet.getVolumesList().get(0);
+    // keep the file for restore since we'll do restart
+    File volRootDir0 = vol0.getHddsRootDir();
+
+    // simulate bad volumes <= tolerated
+    DatanodeTestUtils.simulateBadRootDir(volRootDir0);
+
+    // restart datanode to test
+    cluster.restartHddsDatanode(0, true);
+
+    // no exception is good
+
+    // restore bad volumes
+    DatanodeTestUtils.restoreBadRootDir(volRootDir0);
+  }
+
+  @Test
+  public void testTolerationOnStartupFailure() throws Exception {
+    HddsDatanodeService dn = datanodes.get(0);
+    OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
+    MutableVolumeSet volSet = oc.getVolumeSet();
+    HddsVolume vol0 = volSet.getVolumesList().get(0);
+    HddsVolume vol1 = volSet.getVolumesList().get(1);
+    File volRootDir0 = vol0.getHddsRootDir();
+    File volRootDir1 = vol1.getHddsRootDir();
+
+    // simulate bad volumes > tolerated
+    DatanodeTestUtils.simulateBadRootDir(volRootDir0);
+    DatanodeTestUtils.simulateBadRootDir(volRootDir1);
+
+    // restart datanode to test
+    try {
+      cluster.restartHddsDatanode(0, true);
+      Assert.fail();
+    } catch (RuntimeException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("Can't start the HDDS datanode plugin"));
+    }
+
+    // restore bad volumes
+    DatanodeTestUtils.restoreBadRootDir(volRootDir0);
+    DatanodeTestUtils.restoreBadRootDir(volRootDir1);
+  }
+}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
index fd8ebcc..b355eb6 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
@@ -114,7 +114,7 @@ public class ContainerCommands implements Callable<Void>, SubcommandWithParent {
 
     String clusterId = getClusterId(firstStorageDir);
 
-    volumeSet = new MutableVolumeSet(datanodeUuid, conf);
+    volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
 
     Map<ContainerProtos.ContainerType, Handler> handlers = new HashMap<>();
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
index 2dce431..0507b53 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
@@ -102,7 +102,7 @@ public class ChunkManagerDiskWrite extends BaseFreonGenerator implements
       OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();
 
       VolumeSet volumeSet =
-          new MutableVolumeSet("dnid", "clusterid", ozoneConfiguration);
+          new MutableVolumeSet("dnid", "clusterid", ozoneConfiguration, null);
 
       Random random = new Random();
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 6f03af1..7f3de10 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -171,7 +171,8 @@ public class ClosedContainerReplicator extends BaseFreonGenerator implements
 
     ContainerMetrics metrics = ContainerMetrics.create(conf);
 
-    MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf);
+    MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf,
+        null);
 
     Map<ContainerType, Handler> handlers = new HashMap<>();
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
index 745a813..5a0d00c 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
@@ -159,7 +159,7 @@ public class GeneratorDatanode extends BaseGenerator {
     datanodeId = HddsVolumeUtil
         .getProperty(props, OzoneConsts.DATANODE_UUID, versionFile);
 
-    volumeSet = new MutableVolumeSet(datanodeId, clusterId, config);
+    volumeSet = new MutableVolumeSet(datanodeId, clusterId, config, null);
 
     volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
 
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
index 7eced3c..c5cd876 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
@@ -102,7 +102,7 @@ public class BenchMarkDatanodeDispatcher {
     conf.set("ozone.scm.container.size", "10MB");
 
     ContainerSet containerSet = new ContainerSet();
-    volumeSet = new MutableVolumeSet(datanodeUuid, conf);
+    volumeSet = new MutableVolumeSet(datanodeUuid, conf, null);
     StateContext context = new StateContext(
         conf, DatanodeStates.RUNNING, null);
     ContainerMetrics metrics = ContainerMetrics.create(conf);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org