You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by er...@apache.org on 2022/11/17 01:18:10 UTC

[ozone] branch master updated: HDDS-7095. allow on demand scanning for containers (#3788)

This is an automated email from the ASF dual-hosted git repository.

erose pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 95568ab735 HDDS-7095. allow on demand scanning for containers (#3788)
95568ab735 is described below

commit 95568ab735af635768e758bf5f9413d554fee6a7
Author: Galsza <10...@users.noreply.github.com>
AuthorDate: Thu Nov 17 02:18:05 2022 +0100

    HDDS-7095. allow on demand scanning for containers (#3788)
---
 .../container/common/impl/HddsDispatcher.java      |  45 ++---
 .../ozoneimpl/ContainerScannerConfiguration.java   |  29 +++-
 .../ozoneimpl/OnDemandContainerScanner.java        | 178 ++++++++++++++++++++
 .../ozoneimpl/OnDemandScannerMetrics.java          |  42 +++++
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  57 ++++---
 .../ozone/container/common/ContainerTestUtils.java |  39 ++++-
 .../TestContainerScannerConfiguration.java         |  14 +-
 .../ozoneimpl/TestContainerScannerMetrics.java     |  74 ++++++---
 .../ozoneimpl/TestOnDemandContainerScanner.java    | 157 ++++++++++++++++++
 .../hadoop/ozone/dn/scanner/TestDataScanner.java   | 181 +++++++++++----------
 10 files changed, 662 insertions(+), 154 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 6dde0181a8..5059a64291 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -18,11 +18,9 @@
 
 package org.apache.hadoop.ozone.container.common.impl;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -58,19 +56,21 @@ import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
 import org.apache.hadoop.security.UserGroupInformation;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
-import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
-import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
-
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
+import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
+
 /**
  * Ozone Container dispatcher takes a call from the netty server and routes it
  * to the right handler function.
@@ -332,11 +332,6 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
       // state here.
 
       Result result = responseProto.getResult();
-      if (cmdType == Type.CreateContainer
-          && result == Result.SUCCESS && dispatcherContext != null) {
-        Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
-        container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
-      }
       if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) {
         // If the container is open/closing and the container operation
         // has failed, it should be first marked unhealthy and the initiate the
@@ -373,11 +368,21 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
             container.getContainerData().getState() == State.UNHEALTHY);
         sendCloseContainerActionIfNeeded(container);
       }
-
+      if (cmdType == Type.CreateContainer
+          && result == Result.SUCCESS && dispatcherContext != null) {
+        Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+        container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+      }
       if (result == Result.SUCCESS) {
         updateBCSID(container, dispatcherContext, cmdType);
         audit(action, eventType, params, AuditEventStatus.SUCCESS, null);
       } else {
+        //TODO HDDS-7096:
+        // This is a too general place for on demand scanning.
+        // Create a specific exception that signals for on demand scanning
+        // and move this general scan to where it is more appropriate.
+        // Add integration tests to test the full functionality.
+        OnDemandContainerScanner.scanContainer(container);
         audit(action, eventType, params, AuditEventStatus.FAILURE,
             new Exception(responseProto.getMessage()));
       }
@@ -561,8 +566,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
 
   private boolean isContainerUnhealthy(Container container) {
     return Optional.ofNullable(container).map(
-        cont -> (cont.getContainerState() ==
-            ContainerDataProto.State.UNHEALTHY))
+            cont -> (cont.getContainerState() ==
+                ContainerDataProto.State.UNHEALTHY))
         .orElse(Boolean.FALSE);
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java
index 4fa7773d1e..20c519bd6e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigType;
 import org.apache.hadoop.hdds.conf.PostConstruct;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,12 +46,17 @@ public class ContainerScannerConfiguration {
       "hdds.container.scrub.data.scan.interval";
   public static final String VOLUME_BYTES_PER_SECOND_KEY =
       "hdds.container.scrub.volume.bytes.per.second";
+  public static final String ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY =
+      "hdds.container.scrub.on.demand.volume.bytes.per.second";
 
   public static final long METADATA_SCAN_INTERVAL_DEFAULT =
       Duration.ofHours(3).toMillis();
   public static final long DATA_SCAN_INTERVAL_DEFAULT =
       Duration.ofDays(7).toMillis();
-  public static final long BANDWIDTH_PER_VOLUME_DEFAULT = 1048576;   // 1MB
+
+  public static final long BANDWIDTH_PER_VOLUME_DEFAULT = OzoneConsts.MB * 5L;
+  public static final long ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT =
+      OzoneConsts.MB * 5L;
 
   @Config(key = "enabled",
       type = ConfigType.BOOLEAN,
@@ -80,12 +86,21 @@ public class ContainerScannerConfiguration {
 
   @Config(key = "volume.bytes.per.second",
       type = ConfigType.LONG,
-      defaultValue = "1048576",
+      defaultValue = "5242880",
       tags = {ConfigTag.STORAGE},
       description = "Config parameter to throttle I/O bandwidth used"
           + " by scanner per volume.")
   private long bandwidthPerVolume = BANDWIDTH_PER_VOLUME_DEFAULT;
 
+  @Config(key = "on.demand.volume.bytes.per.second",
+      type = ConfigType.LONG,
+      defaultValue = "5242880",
+      tags = {ConfigTag.STORAGE},
+      description = "Config parameter to throttle I/O bandwidth used"
+          + " by the demand container scanner per volume.")
+  private long onDemandBandwidthPerVolume
+      = ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT;
+
   @PostConstruct
   public void validate() {
     if (metadataScanInterval < 0) {
@@ -108,6 +123,12 @@ public class ContainerScannerConfiguration {
           bandwidthPerVolume, BANDWIDTH_PER_VOLUME_DEFAULT);
       bandwidthPerVolume = BANDWIDTH_PER_VOLUME_DEFAULT;
     }
+    if (onDemandBandwidthPerVolume < 0) {
+      LOG.warn(ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY +
+              " must be >= 0 and was set to {}. Defaulting to {}",
+          onDemandBandwidthPerVolume, ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT);
+      onDemandBandwidthPerVolume = ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT;
+    }
   }
 
   public void setEnabled(boolean enabled) {
@@ -137,4 +158,8 @@ public class ContainerScannerConfiguration {
   public long getBandwidthPerVolume() {
     return bandwidthPerVolume;
   }
+
+  public long getOnDemandBandwidthPerVolume() {
+    return onDemandBandwidthPerVolume;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java
new file mode 100644
index 0000000000..69c8ba3fee
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Long, Boolean> containerRescheduleCheckSet;
+  private final OnDemandScannerMetrics metrics;
+
+  private OnDemandContainerScanner(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    containerController = controller;
+    throttler = new DataTransferThrottler(
+        conf.getOnDemandBandwidthPerVolume());
+    canceler = new Canceler();
+    metrics = OnDemandScannerMetrics.create();
+    scanExecutor = Executors.newSingleThreadExecutor();
+    containerRescheduleCheckSet = ConcurrentHashMap.newKeySet();
+  }
+
+  public static synchronized void init(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    if (instance != null) {
+      LOG.warn("Trying to initialize on demand scanner" +
+          " a second time on a datanode.");
+      return;
+    }
+    instance = new OnDemandContainerScanner(conf, controller);
+  }
+
+  public static Optional<Future<?>> scanContainer(Container<?> container) {
+    if (instance == null || !container.shouldScanData()) {
+      return Optional.empty();
+    }
+    Future<?> resultFuture = null;
+    long containerId = container.getContainerData().getContainerID();
+    if (addContainerToScheduledContainers(containerId)) {
+      resultFuture = instance.scanExecutor.submit(() -> {
+        if (container.shouldScanData()) {
+          performOnDemandScan(container);
+        }
+        removeContainerFromScheduledContainers(containerId);
+      });
+    }
+    return Optional.ofNullable(resultFuture);
+  }
+
+  private static boolean addContainerToScheduledContainers(long containerId) {
+    return instance.containerRescheduleCheckSet.add(containerId);
+  }
+
+  private static void removeContainerFromScheduledContainers(
+      long containerId) {
+    instance.containerRescheduleCheckSet.remove(containerId);
+  }
+
+  private static void performOnDemandScan(Container<?> container) {
+    long containerId = container.getContainerData().getContainerID();
+    try {
+      ContainerData containerData = container.getContainerData();
+      logScanStart(containerData);
+      if (container.scanData(instance.throttler, instance.canceler)) {
+        Instant now = Instant.now();
+        logScanCompleted(containerData, now);
+        instance.containerController.updateDataScanTimestamp(containerId, now);
+      } else {
+        instance.containerController.markContainerUnhealthy(containerId);
+        instance.metrics.incNumUnHealthyContainers();
+      }
+      instance.metrics.incNumContainersScanned();
+    } catch (IOException e) {
+      LOG.warn("Unexpected exception while scanning container "
+          + containerId, e);
+    }
+  }
+
+  private static void logScanStart(ContainerData containerData) {
+    if (LOG.isDebugEnabled()) {
+      Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
+      Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
+      LOG.debug("Scanning container {}, last scanned {}",
+          containerData.getContainerID(), lastScanTime);
+    }
+  }
+
+  private static void logScanCompleted(
+      ContainerData containerData, Instant timestamp) {
+    LOG.debug("Completed scan of container {} at {}",
+        containerData.getContainerID(), timestamp);
+  }
+
+  public static OnDemandScannerMetrics getMetrics() {
+    return instance.metrics;
+  }
+
+  @VisibleForTesting
+  public static DataTransferThrottler getThrottler() {
+    return instance.throttler;
+  }
+
+  @VisibleForTesting
+  public static Canceler getCanceler() {
+    return instance.canceler;
+  }
+
+  public static synchronized void shutdown() {
+    if (instance == null) {
+      return;
+    }
+    instance.shutdownScanner();
+  }
+
+  private synchronized void shutdownScanner() {
+    instance = null;
+    metrics.unregister();
+    this.canceler.cancel("On-demand container" +
+        " scanner is shutting down.");
+    if (!scanExecutor.isShutdown()) {
+      scanExecutor.shutdown();
+    }
+    try {
+      long timeoutSeconds = 5;
+      if (!scanExecutor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
+        LOG.warn("On demand scanner shut down forcefully after {} seconds",
+            timeoutSeconds);
+        scanExecutor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("On demand scanner interrupted while waiting for shut down.");
+      scanExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandScannerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandScannerMetrics.java
new file mode 100644
index 0000000000..473713704c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandScannerMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
+/**
+ * This class captures the on-demand container data scanner metrics.
+ **/
+@InterfaceAudience.Private
+@Metrics(about = "On-demand container data scanner metrics", context = "dfs")
+public final class OnDemandScannerMetrics
+    extends AbstractContainerScannerMetrics {
+
+  private OnDemandScannerMetrics(String name, MetricsSystem ms) {
+    super(name, ms);
+  }
+
+  public static OnDemandScannerMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    String name = "On-demand container scanner metrics";
+    return ms.register(name, null, new OnDemandScannerMetrics(name, ms));
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 2ceb5d916a..5d2153465e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -84,6 +84,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_SCRUBBING_SERVICE_WORKERS_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY;
 import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.VOLUME_BYTES_PER_SECOND_KEY;
 
 /**
@@ -307,30 +308,45 @@ public class OzoneContainer {
   private void startContainerScrub() {
     ContainerScannerConfiguration c = config.getObject(
         ContainerScannerConfiguration.class);
-    boolean enabled = c.isEnabled();
+    if (!c.isEnabled()) {
+      LOG.info("Scheduled background container scanners and " +
+          "the on-demand container scanner have been disabled.");
+      return;
+    }
+    initOnDemandContainerScanner(c);
+    initMetadataScanner(c);
+    initContainerScanner(c);
+  }
 
-    if (!enabled) {
-      LOG.info("Background container scanner has been disabled.");
-    } else {
-      if (this.metadataScanner == null) {
-        this.metadataScanner = new ContainerMetadataScanner(c, controller);
-      }
-      this.metadataScanner.start();
+  private void initContainerScanner(ContainerScannerConfiguration c) {
+    if (c.getBandwidthPerVolume() == 0L) {
+      LOG.warn(VOLUME_BYTES_PER_SECOND_KEY + " is set to 0, " +
+          "so background container data scanner will not start.");
+      return;
+    }
+    dataScanners = new ArrayList<>();
+    for (StorageVolume v : volumeSet.getVolumesList()) {
+      ContainerDataScanner s = new ContainerDataScanner(c, controller,
+          (HddsVolume) v);
+      s.start();
+      dataScanners.add(s);
+    }
+  }
 
-      if (c.getBandwidthPerVolume() == 0L) {
-        LOG.warn(VOLUME_BYTES_PER_SECOND_KEY + " is set to 0, " +
-            "so background container data scanner will not start.");
-        return;
-      }
+  private void initMetadataScanner(ContainerScannerConfiguration c) {
+    if (this.metadataScanner == null) {
+      this.metadataScanner = new ContainerMetadataScanner(c, controller);
+    }
+    this.metadataScanner.start();
+  }
 
-      dataScanners = new ArrayList<>();
-      for (StorageVolume v : volumeSet.getVolumesList()) {
-        ContainerDataScanner s = new ContainerDataScanner(c, controller,
-            (HddsVolume) v);
-        s.start();
-        dataScanners.add(s);
-      }
+  private void initOnDemandContainerScanner(ContainerScannerConfiguration c) {
+    if (c.getOnDemandBandwidthPerVolume() == 0L) {
+      LOG.warn(ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY + " is set to 0, " +
+          "so the on-demand container data scanner will not start.");
+      return;
     }
+    OnDemandContainerScanner.init(c, controller);
   }
 
   /**
@@ -349,6 +365,7 @@ public class OzoneContainer {
     for (ContainerDataScanner s : dataScanners) {
       s.shutdown();
     }
+    OnDemandContainerScanner.shutdown();
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index a60f0d52ae..a216075889 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -17,11 +17,6 @@
 
 package org.apache.hadoop.ozone.container.common;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Random;
-import java.util.UUID;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -29,11 +24,15 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
@@ -47,9 +46,18 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.security.UserGroupInformation;
-
 import org.mockito.Mockito;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Helper utility to test containers.
  */
@@ -157,4 +165,23 @@ public final class ContainerTestUtils {
           null, null);
     }
   }
+
+  public static void setupMockContainer(
+      Container<ContainerData> c, boolean shouldScanData,
+      boolean scanMetaDataSuccess, boolean scanDataSuccess,
+      AtomicLong containerIdSeq) {
+    setupMockContainer(c, shouldScanData, scanDataSuccess, containerIdSeq);
+    when(c.scanMetaData()).thenReturn(scanMetaDataSuccess);
+  }
+
+  public static void setupMockContainer(
+      Container<ContainerData> c, boolean shouldScanData,
+      boolean scanDataSuccess, AtomicLong containerIdSeq) {
+    ContainerData data = mock(ContainerData.class);
+    when(data.getContainerID()).thenReturn(containerIdSeq.getAndIncrement());
+    when(c.getContainerData()).thenReturn(data);
+    when(c.shouldScanData()).thenReturn(shouldScanData);
+    when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
+        .thenReturn(scanDataSuccess);
+  }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerConfiguration.java
index 6a4607c845..077acbb9d9 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerConfiguration.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerConfiguration.java
@@ -29,8 +29,11 @@ import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfig
 import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.DATA_SCAN_INTERVAL_KEY;
 import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.METADATA_SCAN_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.METADATA_SCAN_INTERVAL_KEY;
+import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT;
+import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY;
 import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.VOLUME_BYTES_PER_SECOND_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 /**
  * Test for {@link ContainerScannerConfiguration}.
@@ -48,10 +51,12 @@ public class TestContainerScannerConfiguration {
   public void acceptsValidValues() {
     long validInterval = Duration.ofHours(1).toMillis();
     long validBandwidth = (long) StorageUnit.MB.toBytes(1);
+    long validOnDemandBandwidth = (long) StorageUnit.MB.toBytes(2);
 
     conf.setLong(METADATA_SCAN_INTERVAL_KEY, validInterval);
     conf.setLong(DATA_SCAN_INTERVAL_KEY, validInterval);
     conf.setLong(VOLUME_BYTES_PER_SECOND_KEY, validBandwidth);
+    conf.setLong(ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY, validOnDemandBandwidth);
 
     ContainerScannerConfiguration csConf =
         conf.getObject(ContainerScannerConfiguration.class);
@@ -59,6 +64,8 @@ public class TestContainerScannerConfiguration {
     assertEquals(validInterval, csConf.getMetadataScanInterval());
     assertEquals(validInterval, csConf.getDataScanInterval());
     assertEquals(validBandwidth, csConf.getBandwidthPerVolume());
+    assertEquals(validOnDemandBandwidth,
+        csConf.getOnDemandBandwidthPerVolume());
   }
 
   @Test
@@ -69,6 +76,7 @@ public class TestContainerScannerConfiguration {
     conf.setLong(METADATA_SCAN_INTERVAL_KEY, invalidInterval);
     conf.setLong(DATA_SCAN_INTERVAL_KEY, invalidInterval);
     conf.setLong(VOLUME_BYTES_PER_SECOND_KEY, invalidBandwidth);
+    conf.setLong(ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY, invalidBandwidth);
 
     ContainerScannerConfiguration csConf =
         conf.getObject(ContainerScannerConfiguration.class);
@@ -79,6 +87,8 @@ public class TestContainerScannerConfiguration {
         csConf.getDataScanInterval());
     assertEquals(BANDWIDTH_PER_VOLUME_DEFAULT,
         csConf.getBandwidthPerVolume());
+    assertEquals(ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT,
+        csConf.getOnDemandBandwidthPerVolume());
   }
 
   @Test
@@ -86,12 +96,14 @@ public class TestContainerScannerConfiguration {
     ContainerScannerConfiguration csConf =
         conf.getObject(ContainerScannerConfiguration.class);
 
-    assertEquals(false, csConf.isEnabled());
+    assertFalse(csConf.isEnabled());
     assertEquals(METADATA_SCAN_INTERVAL_DEFAULT,
         csConf.getMetadataScanInterval());
     assertEquals(DATA_SCAN_INTERVAL_DEFAULT,
         csConf.getDataScanInterval());
     assertEquals(BANDWIDTH_PER_VOLUME_DEFAULT,
         csConf.getBandwidthPerVolume());
+    assertEquals(ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT,
+        csConf.getOnDemandBandwidthPerVolume());
   }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerMetrics.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerMetrics.java
index cc8a60c9ca..6d9eb31db1 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerMetrics.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerMetrics.java
@@ -17,12 +17,13 @@
  */
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
-import org.apache.hadoop.hdfs.util.Canceler;
-import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.commons.compress.utils.Lists;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -32,15 +33,18 @@ import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -73,9 +77,15 @@ public class TestContainerScannerMetrics {
     conf = newInstanceOf(ContainerScannerConfiguration.class);
     conf.setMetadataScanInterval(0);
     conf.setDataScanInterval(0);
+    conf.setEnabled(true);
     controller = mockContainerController();
   }
 
+  @AfterEach
+  public void tearDown() {
+    OnDemandContainerScanner.shutdown();
+  }
+
   @Test
   public void testContainerMetaDataScannerMetrics() {
     ContainerMetadataScanner subject =
@@ -129,15 +139,54 @@ public class TestContainerScannerMetrics {
     assertNull(DefaultMetricsSystem.instance().getSource(name));
   }
 
+  @Test
+  public void testOnDemandScannerMetrics() throws Exception {
+    OnDemandContainerScanner.init(conf, controller);
+    ArrayList<Optional<Future<?>>> resultFutureList = Lists.newArrayList();
+    resultFutureList.add(OnDemandContainerScanner.scanContainer(corruptData));
+    resultFutureList.add(
+        OnDemandContainerScanner.scanContainer(corruptMetadata));
+    resultFutureList.add(OnDemandContainerScanner.scanContainer(healthy));
+    waitOnScannerToFinish(resultFutureList);
+    OnDemandScannerMetrics metrics = OnDemandContainerScanner.getMetrics();
+    //Containers with shouldScanData = false shouldn't increase
+    // the number of scanned containers
+    assertEquals(1, metrics.getNumUnHealthyContainers());
+    assertEquals(2, metrics.getNumContainersScanned());
+  }
+
+  private void waitOnScannerToFinish(
+      ArrayList<Optional<Future<?>>> resultFutureList)
+      throws ExecutionException, InterruptedException {
+    for (Optional<Future<?>> future : resultFutureList) {
+      if (future.isPresent()) {
+        future.get().get();
+      }
+    }
+  }
+
+  @Test
+  public void testOnDemandScannerMetricsUnregisters() {
+    OnDemandContainerScanner.init(conf, controller);
+    String metricsName = OnDemandContainerScanner.getMetrics().getName();
+    assertNotNull(DefaultMetricsSystem.instance().getSource(metricsName));
+    OnDemandContainerScanner.shutdown();
+    OnDemandContainerScanner.scanContainer(healthy);
+    assertNull(DefaultMetricsSystem.instance().getSource(metricsName));
+  }
+
   private ContainerController mockContainerController() {
     // healthy container
-    setupMockContainer(healthy, true, true, true);
+    ContainerTestUtils.setupMockContainer(healthy,
+        true, true, true, containerIdSeq);
 
     // unhealthy container (corrupt data)
-    setupMockContainer(corruptData, true, true, false);
+    ContainerTestUtils.setupMockContainer(corruptData,
+        true, true, false, containerIdSeq);
 
     // unhealthy container (corrupt metadata)
-    setupMockContainer(corruptMetadata, false, false, false);
+    ContainerTestUtils.setupMockContainer(corruptMetadata,
+        false, false, false, containerIdSeq);
 
     Collection<Container<?>> containers = Arrays.asList(
         healthy, corruptData, corruptMetadata);
@@ -147,17 +196,4 @@ public class TestContainerScannerMetrics {
 
     return mock;
   }
-
-  private void setupMockContainer(
-      Container<ContainerData> c, boolean shouldScanData,
-      boolean scanMetaDataSuccess, boolean scanDataSuccess) {
-    ContainerData data = mock(ContainerData.class);
-    when(data.getContainerID()).thenReturn(containerIdSeq.getAndIncrement());
-    when(c.getContainerData()).thenReturn(data);
-    when(c.shouldScanData()).thenReturn(shouldScanData);
-    when(c.scanMetaData()).thenReturn(scanMetaDataSuccess);
-    when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
-        .thenReturn(scanDataSuccess);
-  }
-
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java
new file mode 100644
index 0000000000..c2686b559c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+
+/**
+ * Unit tests for the on-demand container scanner.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestOnDemandContainerScanner {
+
+  private final AtomicLong containerIdSeq = new AtomicLong(100);
+
+  @Mock
+  private Container<ContainerData> healthy;
+
+  @Mock
+  private Container<ContainerData> openContainer;
+
+  @Mock
+  private Container<ContainerData> corruptData;
+
+  private ContainerScannerConfiguration conf;
+  private ContainerController controller;
+
+  @Before
+  public void setup() {
+    conf = newInstanceOf(ContainerScannerConfiguration.class);
+    conf.setMetadataScanInterval(0);
+    conf.setDataScanInterval(0);
+    controller = mockContainerController();
+  }
+
+  @After
+  public void tearDown() {
+    OnDemandContainerScanner.shutdown();
+  }
+
+  @Test
+  public void testOnDemandContainerScanner() throws Exception {
+    //Without initialization,
+    // there shouldn't be interaction with containerController
+    OnDemandContainerScanner.scanContainer(corruptData);
+    Mockito.verifyZeroInteractions(controller);
+    OnDemandContainerScanner.init(conf, controller);
+    testContainerMarkedUnhealthy(healthy, never());
+    testContainerMarkedUnhealthy(corruptData, atLeastOnce());
+    testContainerMarkedUnhealthy(openContainer, never());
+  }
+
+  @Test
+  public void testContainerScannerMultipleInitsAndShutdowns() throws Exception {
+    OnDemandContainerScanner.init(conf, controller);
+    OnDemandContainerScanner.init(conf, controller);
+    OnDemandContainerScanner.shutdown();
+    OnDemandContainerScanner.shutdown();
+    //There shouldn't be an interaction after shutdown:
+    testContainerMarkedUnhealthy(corruptData, never());
+  }
+
+  @Test
+  public void testSameContainerQueuedMultipleTimes() throws Exception {
+    OnDemandContainerScanner.init(conf, controller);
+    //Given a container that has not finished scanning
+    CountDownLatch latch = new CountDownLatch(1);
+    Mockito.lenient().when(corruptData.scanData(
+            OnDemandContainerScanner.getThrottler(),
+            OnDemandContainerScanner.getCanceler()))
+        .thenAnswer((Answer<Boolean>) invocation -> {
+          latch.await();
+          return false;
+        });
+    Optional<Future<?>> onGoingScan = OnDemandContainerScanner
+        .scanContainer(corruptData);
+    Assert.assertTrue(onGoingScan.isPresent());
+    Assert.assertFalse(onGoingScan.get().isDone());
+    //When scheduling the same container again
+    Optional<Future<?>> secondScan = OnDemandContainerScanner
+        .scanContainer(corruptData);
+    //Then the second scan is not scheduled and the first scan can still finish
+    Assert.assertFalse(secondScan.isPresent());
+    latch.countDown();
+    onGoingScan.get().get();
+    Mockito.verify(controller, atLeastOnce()).
+        markContainerUnhealthy(corruptData.getContainerData().getContainerID());
+  }
+
+  private void testContainerMarkedUnhealthy(
+      Container<?> container, VerificationMode invocationTimes)
+      throws InterruptedException, ExecutionException, IOException {
+    Optional<Future<?>> result =
+        OnDemandContainerScanner.scanContainer(container);
+    if (result.isPresent()) {
+      result.get().get();
+    }
+    Mockito.verify(controller, invocationTimes).markContainerUnhealthy(
+        container.getContainerData().getContainerID());
+  }
+
+  private ContainerController mockContainerController() {
+    // healthy container
+    ContainerTestUtils.setupMockContainer(healthy,
+        true, true, containerIdSeq);
+
+    // unhealthy container (corrupt data)
+    ContainerTestUtils.setupMockContainer(corruptData,
+        true, false, containerIdSeq);
+
+    // unhealthy container (corrupt metadata)
+    ContainerTestUtils.setupMockContainer(openContainer,
+        false, false, containerIdSeq);
+
+    return mock(ContainerController.class);
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java
index 0075982eb9..f0ad740c45 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java
@@ -20,39 +20,35 @@
 package org.apache.hadoop.ozone.dn.scanner;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationFactor;
-import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerMetadataScanner;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -62,14 +58,15 @@ import org.junit.rules.Timeout;
 
 import java.io.File;
 import java.io.IOException;
-import java.time.Instant;
-import java.util.HashMap;
 import java.util.Set;
 import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 
 /**
  * This class tests the data scanner functionality.
@@ -85,7 +82,6 @@ public class TestDataScanner {
   private static OzoneConfiguration ozoneConfig;
   private static OzoneClient ozClient = null;
   private static ObjectStore store = null;
-  private static OzoneManager ozoneManager;
   private static StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
 
@@ -93,6 +89,8 @@ public class TestDataScanner {
   public static void init() throws Exception {
     ozoneConfig = new OzoneConfiguration();
     ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s");
+    ozoneConfig.set(ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_ENABLED,
+        String.valueOf(true));
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, PlacementPolicy.class);
     ozoneConfig.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
@@ -103,7 +101,6 @@ public class TestDataScanner {
     cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 30000);
     ozClient = OzoneClientFactory.getRpcClient(ozoneConfig);
     store = ozClient.getObjectStore();
-    ozoneManager = cluster.getOzoneManager();
     storageContainerLocationClient =
         cluster.getStorageContainerLocationClient();
   }
@@ -121,111 +118,123 @@ public class TestDataScanner {
     }
   }
 
+  //This test performs 2 separate tests because creating
+  // and running a cluster is expensive.
   @Test
-  public void testOpenContainerIntegrity() throws Exception {
+  public void testScannersMarkContainerUnhealthy() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    Instant testStartTime = Instant.now();
-
-    String value = "sample value";
+    String value = "sample key value";
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    for (int i = 0; i < 10; i++) {
-      String keyName = UUID.randomUUID().toString();
-
-      OzoneOutputStream out = bucket.createKey(keyName,
-          value.getBytes(UTF_8).length, RATIS,
-          ONE, new HashMap<>());
-      out.write(value.getBytes(UTF_8));
-      out.close();
-      OzoneKey key = bucket.getKey(keyName);
-      Assert.assertEquals(keyName, key.getName());
-      OzoneInputStream is = bucket.readKey(keyName);
-      byte[] fileContent = new byte[value.getBytes(UTF_8).length];
-      is.read(fileContent);
-      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
-          keyName, RATIS,
-          ONE));
-      Assert.assertEquals(value, new String(fileContent, UTF_8));
-      Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
-      Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
-    }
-
+    String keyNameInClosedContainer = "keyNameInClosedContainer";
+    OzoneOutputStream key = createKey(volumeName, bucketName,
+        keyNameInClosedContainer);
+    // write data more than 1 chunk
+    int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);
+    byte[] data = ContainerTestHelper
+        .getFixedLengthString(value, sizeLargerThanOneChunk)
+        .getBytes(UTF_8);
+    key.write(data);
+
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    key.flush();
+    TestHelper.waitForContainerClose(key, cluster);
+    key.close();
+    String keyNameInOpenContainer = "keyNameInOpenContainer";
+    OzoneOutputStream key2 = createKey(volumeName, bucketName,
+        keyNameInOpenContainer);
+    key2.write(data);
+    key2.close();
     // wait for the container report to propagate to SCM
     Thread.sleep(5000);
 
-
     Assert.assertEquals(1, cluster.getHddsDatanodes().size());
 
     HddsDatanodeService dn = cluster.getHddsDatanodes().get(0);
     OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
-    ContainerSet cs = oc.getContainerSet();
-    Container c = cs.getContainerIterator().next();
+    ContainerSet containerSet = oc.getContainerSet();
+    //Given an open and a closed container
+    Assert.assertTrue(containerSet.containerCount() > 1);
+    Container<?> openContainer = getContainerInState(containerSet, OPEN);
+    Container<?> closedContainer = getContainerInState(containerSet, CLOSED);
 
-    Assert.assertTrue(cs.containerCount() > 0);
-
-    // delete the chunks directory.
-    File chunksDir = new File(c.getContainerData().getContainerPath(),
-        "chunks");
-    deleteDirectory(chunksDir);
-    Assert.assertFalse(chunksDir.exists());
+    //When deleting their metadata to make them unhealthy and scanning them
+    deleteChunksDirForContainer(openContainer);
+    deleteChunksDirForContainer(closedContainer);
 
     ContainerScannerConfiguration conf = ozoneConfig.getObject(
         ContainerScannerConfiguration.class);
     ContainerMetadataScanner sb = new ContainerMetadataScanner(conf,
         oc.getController());
-    sb.scanContainer(c);
-
+    //Scan the open container and trigger on-demand scan for the closed one
+    sb.scanContainer(openContainer);
+    tryReadKeyWithMissingChunksDir(bucket, keyNameInClosedContainer);
     // wait for the incremental container report to propagate to SCM
     Thread.sleep(5000);
 
     ContainerManager cm = cluster.getStorageContainerManager()
         .getContainerManager();
-    Set<ContainerReplica> replicas = cm.getContainerReplicas(
-        ContainerID.valueOf(c.getContainerData().getContainerID()));
-    Assert.assertEquals(1, replicas.size());
-    ContainerReplica r = replicas.iterator().next();
-    Assert.assertEquals(StorageContainerDatanodeProtocolProtos.
-        ContainerReplicaProto.State.UNHEALTHY, r.getState());
+    ContainerReplica openContainerReplica = getContainerReplica(
+        cm, openContainer.getContainerData().getContainerID());
+    ContainerReplica closedContainerReplica = getContainerReplica(
+        cm, closedContainer.getContainerData().getContainerID());
+    //Then both containers are marked unhealthy
+    Assert.assertEquals(State.UNHEALTHY, openContainerReplica.getState());
+    Assert.assertEquals(State.UNHEALTHY, closedContainerReplica.getState());
+  }
+
+  private ContainerReplica getContainerReplica(
+      ContainerManager cm, long containerId) throws ContainerNotFoundException {
+    Set<ContainerReplica> containerReplicas = cm.getContainerReplicas(
+        ContainerID.valueOf(
+            containerId));
+    Assert.assertEquals(1, containerReplicas.size());
+    return containerReplicas.iterator().next();
+  }
+
+  //ignore the result of the key read because it is expected to fail
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  private void tryReadKeyWithMissingChunksDir(
+      OzoneBucket bucket, String keyNameInClosedContainer) throws IOException {
+    try (OzoneInputStream key = bucket.readKey(keyNameInClosedContainer)) {
+      Assert.assertThrows(IOException.class, key::read);
+    }
   }
 
-  boolean deleteDirectory(File directoryToBeDeleted) {
+  private void deleteChunksDirForContainer(Container<?> container) {
+    File chunksDir = new File(container.getContainerData().getContainerPath(),
+        "chunks");
+    deleteDirectory(chunksDir);
+    Assert.assertFalse(chunksDir.exists());
+  }
+
+  private Container<?> getContainerInState(
+      ContainerSet cs, ContainerProtos.ContainerDataProto.State state) {
+    return cs.getContainerMap().values().stream()
+        .filter(c -> state ==
+            c.getContainerState())
+        .findAny()
+        .orElseThrow(() ->
+            new RuntimeException("No Open container found for testing"));
+  }
+
+  private OzoneOutputStream createKey(String volumeName, String bucketName,
+                                      String keyName) throws Exception {
+    return TestHelper.createKey(
+        keyName, RATIS, ONE, 0, store, volumeName, bucketName);
+  }
+
+  void deleteDirectory(File directoryToBeDeleted) {
     File[] allContents = directoryToBeDeleted.listFiles();
     if (allContents != null) {
       for (File file : allContents) {
         deleteDirectory(file);
       }
     }
-    return directoryToBeDeleted.delete();
-  }
-
-  private boolean verifyRatisReplication(String volumeName, String bucketName,
-                                         String keyName, ReplicationType type,
-                                         ReplicationFactor factor)
-      throws IOException {
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
-        .build();
-    HddsProtos.ReplicationType replicationType =
-        HddsProtos.ReplicationType.valueOf(type.toString());
-    HddsProtos.ReplicationFactor replicationFactor =
-        HddsProtos.ReplicationFactor.valueOf(factor.getValue());
-    OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
-    for (OmKeyLocationInfo info :
-        keyInfo.getLatestVersionLocations().getLocationList()) {
-      ContainerInfo container =
-          storageContainerLocationClient.getContainer(info.getContainerID());
-      if (!ReplicationConfig.getLegacyFactor(container.getReplicationConfig())
-          .equals(replicationFactor) || (
-          container.getReplicationType() != replicationType)) {
-        return false;
-      }
-    }
-    return true;
+    directoryToBeDeleted.delete();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org