You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/09/29 13:33:42 UTC

[GitHub] [ozone] Galsza opened a new pull request, #3788: Hdds 7095 allow on demand scanning for containers

Galsza opened a new pull request, #3788:
URL: https://github.com/apache/ozone/pull/3788

   ## What changes were proposed in this pull request?
   
   This is a draft pull request outline for enabling the on-demand scanner. The on-demand scanner will enable to check containers for checksum errors on the fly, next to the periodically run Metadata scanner and Full scanner.
   
   ## What is the link to the Apache JIRA
   
   [HDDS-7095](https://issues.apache.org/jira/browse/HDDS-7095)
   
   ## How was this patch tested?
   
   This is a proof of concept draft pull request, testing still needs to be done, as well as adding unit and integration tests.
   
   ## List of work in progress items
   
   - Adding unit and integration tests. This includes unit tests for the on-demand scanner and its metrics, adding more tests for HddsDispatcher, writing integration tests for unhealthy / healthy containers, and adding more docker tests.
   - Tuning and figuring out the proper configuration, as well as adding the necessary configuration items to various xmls
   - Handling edge cases like having read error on an open container
   - Adding on-demand scanning to more places perhaps
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
errose28 commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r993639556


##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.verification.VerificationMode;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the on-demand container scanner.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestOnDemandContainerScanner {
+
+  private final AtomicLong containerIdSeq = new AtomicLong(100);
+
+  @Mock
+  private Container<ContainerData> healthy;
+
+  @Mock
+  private Container<ContainerData> openContainer;
+
+  @Mock
+  private Container<ContainerData> corruptData;
+
+  private ContainerScannerConfiguration conf;
+  private ContainerController controller;
+
+  @Before
+  public void setup() {
+    conf = newInstanceOf(ContainerScannerConfiguration.class);
+    conf.setMetadataScanInterval(0);
+    conf.setDataScanInterval(0);
+    controller = mockContainerController();
+  }
+
+  @After
+  public void tearDown() {
+    OnDemandContainerScanner.shutdown();
+  }
+
+  @Test
+  public void testOnDemandContainerScanner() throws Exception {
+    //Without initialization,
+    // there shouldn't be interaction with containerController
+    OnDemandContainerScanner.scanContainer(corruptData);
+    Mockito.verifyZeroInteractions(controller);
+    OnDemandContainerScanner.init(conf, controller);
+    testContainerMarkedUnhealthy(healthy, never());
+    testContainerMarkedUnhealthy(corruptData, atLeastOnce());
+    testContainerMarkedUnhealthy(openContainer, never());
+  }
+
+  @Test
+  public void testContainerScannerMultipleInitsAndShutdowns() throws Exception {
+    OnDemandContainerScanner.init(conf, controller);
+    OnDemandContainerScanner.init(conf, controller);
+    OnDemandContainerScanner.shutdown();
+    OnDemandContainerScanner.shutdown();
+    //There shouldn't be an interaction after shutdown:
+    testContainerMarkedUnhealthy(corruptData, never());
+  }
+
+  private void testContainerMarkedUnhealthy(
+      Container<?> container, VerificationMode invocationTimes)
+      throws IOException {
+    OnDemandContainerScanner.scanContainer(container);
+    waitForScanToFinish();
+    Mockito.verify(controller, invocationTimes).markContainerUnhealthy(
+        container.getContainerData().getContainerID());
+  }
+
+  private void waitForScanToFinish() {

Review Comment:
   Methods like these that are duplicated between this test and TestContainerScannerMetrics can be moved to ContainerTestUtils.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java:
##########
@@ -307,29 +307,27 @@ private void buildContainerSet() {
   private void startContainerScrub() {
     ContainerScannerConfiguration c = config.getObject(
         ContainerScannerConfiguration.class);
-    boolean enabled = c.isEnabled();
-
-    if (!enabled) {
+    if (!c.isEnabled()) {
       LOG.info("Background container scanner has been disabled.");
-    } else {
-      if (this.metadataScanner == null) {
-        this.metadataScanner = new ContainerMetadataScanner(c, controller);
-      }
-      this.metadataScanner.start();
-
-      if (c.getBandwidthPerVolume() == 0L) {
-        LOG.warn(VOLUME_BYTES_PER_SECOND_KEY + " is set to 0, " +
-            "so background container data scanner will not start.");
-        return;
-      }
+      return;
+    }
+    OnDemandContainerScanner.init(c, controller);
+    if (this.metadataScanner == null) {
+      this.metadataScanner = new ContainerMetadataScanner(c, controller);
+    }
+    this.metadataScanner.start();
 
-      dataScanners = new ArrayList<>();
-      for (StorageVolume v : volumeSet.getVolumesList()) {
-        ContainerDataScanner s = new ContainerDataScanner(c, controller,
-            (HddsVolume) v);
-        s.start();
-        dataScanners.add(s);
-      }
+    if (c.getBandwidthPerVolume() == 0L) {

Review Comment:
   We should probably have a similar check and warning for the on demand scanner. Since it is not a background service we can't just not start it like the other scanners. There would probably need to be a check in `OnDemandContainerScanner#scanContainer` to not run if the bandwidth is 0.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java:
##########
@@ -307,29 +307,27 @@ private void buildContainerSet() {
   private void startContainerScrub() {
     ContainerScannerConfiguration c = config.getObject(
         ContainerScannerConfiguration.class);
-    boolean enabled = c.isEnabled();
-
-    if (!enabled) {
+    if (!c.isEnabled()) {
       LOG.info("Background container scanner has been disabled.");

Review Comment:
   Currently this uses one enable flag for on demand and background scanners. I think this is fine for simplicity.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Container<?>, Boolean> toBeScannedContainers;

Review Comment:
   It looks like this is being used to remove duplicates from the queue. I think we should add a comment explaining this.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java:
##########
@@ -373,11 +368,21 @@ private ContainerCommandResponseProto dispatchRequest(
             container.getContainerData().getState() == State.UNHEALTHY);
         sendCloseContainerActionIfNeeded(container);
       }
-
+      if (cmdType == Type.CreateContainer
+          && result == Result.SUCCESS && dispatcherContext != null) {
+        Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+        container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+      }
       if (result == Result.SUCCESS) {
         updateBCSID(container, dispatcherContext, cmdType);
         audit(action, eventType, params, AuditEventStatus.SUCCESS, null);
       } else {
+        //TODO HDDS-7096:
+        // This is a too general place for on demand scanning.
+        // Create a specific exception that signals for on demand scanning
+        // and move this general scan to where it is more appropriate.
+        // Add integration tests to test the full functionality.
+        OnDemandContainerScanner.scanContainer(container);

Review Comment:
   I agree this is too general but it is ok for now. After reviewing the code I am again leaning towards an AOP implementation in HDDS-7096 like we discussed offline.
   - Methods whose failure should trigger scanning would:
       1. Throw a specific type of exception when a scan is required
       2. Be annotated to mark the behavior, providing a link for casual readers between the method and the aspect.
   - An aspect would be created to queue containers for scanning using the static on demand scanning utility in this PR.
       - The aspect's point cut would be methods marked with the annotation that throw the special exception.
       - The aspect's advice would be to on demand scan the container. The container could be collected from the method arguments or the target of the method invocation.
   
   This approach could also be used to improve the volume scanner, which currently has `onFailure` calls in try/catches spread throughout the code. We will end up in the same situation with the on demand scanner otherwise.
   
   Also I think we should have every on demand scan do an on demand volume check since those are cheap and the volume scanner already drops frequent repeat scans for us.
   
   Anyways these are future ideas and not actionable in this PR, I just wanted to put them out there.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Container<?>, Boolean> toBeScannedContainers;
+  private final OnDemandScannerMetrics metrics;
+  @VisibleForTesting
+  private static Future<?> lastScanFuture;
+
+  private OnDemandContainerScanner(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    containerController = controller;
+    throttler = new DataTransferThrottler(
+        conf.getOnDemandBandwidthPerVolume());
+    canceler = new Canceler();
+    metrics = OnDemandScannerMetrics.create();
+    scanExecutor = Executors.newSingleThreadExecutor();
+    toBeScannedContainers = ConcurrentHashMap.newKeySet();
+  }
+
+  public static synchronized void init(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    if (instance != null) {
+      LOG.warn("Trying to initialize on demand scanner" +
+          " a second time on a datanode.");
+      return;
+    }
+    instance = new OnDemandContainerScanner(conf, controller);
+  }
+
+  public static void scanContainer(Container<?> container) {
+    if (instance == null) {
+      return;
+    }
+    if (container.shouldScanData() &&
+        instance.toBeScannedContainers.add(container)) {
+      lastScanFuture = instance.scanExecutor.submit(() -> {
+        instance.toBeScannedContainers.remove(container);
+        if (container.shouldScanData()) {
+          performOnDemandScan(container);
+        }
+      });
+    }
+  }
+
+  private static void performOnDemandScan(Container<?> container) {
+    long containerId = container.getContainerData().getContainerID();
+    try {
+      ContainerData containerData = container.getContainerData();
+      logScanStart(containerData);
+      if (container.scanData(instance.throttler, instance.canceler)) {
+        Instant now = Instant.now();
+        logScanCompleted(containerData, now);
+        instance.containerController.updateDataScanTimestamp(containerId, now);
+      } else {
+        instance.containerController.markContainerUnhealthy(containerId);
+        instance.metrics.incNumUnHealthyContainers();
+      }
+      instance.metrics.incNumContainersScanned();
+    } catch (IOException e) {
+      LOG.warn("Unexpected exception while scanning container "
+          + containerId, e);
+    }
+  }
+
+  private static void logScanStart(ContainerData containerData) {
+    if (LOG.isDebugEnabled()) {
+      Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
+      Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
+      LOG.debug("Scanning container {}, last scanned {}",
+          containerData.getContainerID(), lastScanTime);
+    }
+  }
+
+  private static void logScanCompleted(
+      ContainerData containerData, Instant timestamp) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Completed scan of container {} at {}",
+          containerData.getContainerID(), timestamp);
+    }
+  }
+
+  public static OnDemandScannerMetrics getMetrics() {
+    return instance.metrics;
+  }
+
+  public static synchronized void shutdown() {
+    if (instance == null) {
+      return;
+    }
+    instance.shutdownScanner();
+  }
+
+  private synchronized void shutdownScanner() {
+    instance = null;
+    metrics.unregister();
+    if (!scanExecutor.isShutdown()) {
+      scanExecutor.shutdown();
+    }
+    try {
+      long timeoutSeconds = 5;
+      if (!scanExecutor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
+        LOG.warn("On demand scanner shut down forcefully after {} seconds",
+            timeoutSeconds);
+        scanExecutor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("On demand scanner interrupted while waiting for shut down.");
+      scanExecutor.shutdownNow();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  public static Future<?> getLastScanFuture() {
+    return lastScanFuture;
+  }

Review Comment:
   How about just having scanContainer return the future instead? It's void right now so the extra information can be used only when needed (like tests).



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java:
##########
@@ -307,29 +307,27 @@ private void buildContainerSet() {
   private void startContainerScrub() {
     ContainerScannerConfiguration c = config.getObject(
         ContainerScannerConfiguration.class);
-    boolean enabled = c.isEnabled();
-
-    if (!enabled) {
+    if (!c.isEnabled()) {
       LOG.info("Background container scanner has been disabled.");

Review Comment:
   ```suggestion
         LOG.info("Container scanners have been disabled.");
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java:
##########
@@ -45,12 +45,16 @@ public class ContainerScannerConfiguration {
       "hdds.container.scrub.data.scan.interval";
   public static final String VOLUME_BYTES_PER_SECOND_KEY =
       "hdds.container.scrub.volume.bytes.per.second";
+  public static final String ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY =
+      "hdds.container.scrub.on.demand.volume.bytes.per.second";
 
   public static final long METADATA_SCAN_INTERVAL_DEFAULT =
       Duration.ofHours(3).toMillis();
   public static final long DATA_SCAN_INTERVAL_DEFAULT =
       Duration.ofDays(7).toMillis();
   public static final long BANDWIDTH_PER_VOLUME_DEFAULT = 1048576;   // 1MB
+  // 1MB
+  public static final long ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT = 1048576;

Review Comment:
   We had discussed raising the default to 5mb for the background scanner. Should we do the same for the on demand scanner?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java:
##########
@@ -561,11 +566,18 @@ private boolean isContainerFull(Container container) {
 
   private boolean isContainerUnhealthy(Container container) {
     return Optional.ofNullable(container).map(
-        cont -> (cont.getContainerState() ==
-            ContainerDataProto.State.UNHEALTHY))
+            cont -> (cont.getContainerState() ==
+                ContainerDataProto.State.UNHEALTHY))
         .orElse(Boolean.FALSE);
   }
 
+  private void scanContainerIfNeeded(Container<?> container) {
+    if (container.getContainerState() == State.CLOSED ||
+        container.getContainerState() == State.QUASI_CLOSED) {

Review Comment:
   OnDemandScanner#scanContainer already has a similar check, can we remove this method?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandScannerMetrics.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
+/**
+ * This class captures the on-demand container data scanner metrics.
+ **/
+@InterfaceAudience.Private
+@Metrics(about = "On-demand container data scanner metrics", context = "dfs")
+public final class OnDemandScannerMetrics
+    extends AbstractContainerScannerMetrics {

Review Comment:
   Looks like the numScanIterations metric will not be used by the on demand scanner. We could leave it at zero like it is now, or pull it down into the two background scanner classes.



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.verification.VerificationMode;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the on-demand container scanner.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestOnDemandContainerScanner {

Review Comment:
   In addition to this unit test, it would be good to have an integration test that verifies a client reading or writing to a corrupted container causes it to be queued for scanning, and eventually marked unhealthy and reported to SCM. `TestDataScanner` does this kind of thing for the background metadata scanner, and it could be expanded to test this case as well.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Container<?>, Boolean> toBeScannedContainers;
+  private final OnDemandScannerMetrics metrics;
+  @VisibleForTesting

Review Comment:
   nit. This annotation is not needed here since the field is private.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Container<?>, Boolean> toBeScannedContainers;

Review Comment:
   Also I don't see hashcode or equals for the Container class. Using container ID as the key might be better.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Container<?>, Boolean> toBeScannedContainers;

Review Comment:
   A test case in TestOnDemandContainerScanner to test that duplicates are removed would be a good addition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] Galsza commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
Galsza commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r995391339


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Container<?>, Boolean> toBeScannedContainers;
+  private final OnDemandScannerMetrics metrics;
+  @VisibleForTesting
+  private static Future<?> lastScanFuture;
+
+  private OnDemandContainerScanner(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    containerController = controller;
+    throttler = new DataTransferThrottler(
+        conf.getOnDemandBandwidthPerVolume());
+    canceler = new Canceler();
+    metrics = OnDemandScannerMetrics.create();
+    scanExecutor = Executors.newSingleThreadExecutor();
+    toBeScannedContainers = ConcurrentHashMap.newKeySet();
+  }
+
+  public static synchronized void init(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    if (instance != null) {
+      LOG.warn("Trying to initialize on demand scanner" +
+          " a second time on a datanode.");
+      return;
+    }
+    instance = new OnDemandContainerScanner(conf, controller);
+  }
+
+  public static void scanContainer(Container<?> container) {
+    if (instance == null) {
+      return;
+    }
+    if (container.shouldScanData() &&
+        instance.toBeScannedContainers.add(container)) {
+      lastScanFuture = instance.scanExecutor.submit(() -> {
+        instance.toBeScannedContainers.remove(container);
+        if (container.shouldScanData()) {
+          performOnDemandScan(container);
+        }
+      });
+    }
+  }
+
+  private static void performOnDemandScan(Container<?> container) {
+    long containerId = container.getContainerData().getContainerID();
+    try {
+      ContainerData containerData = container.getContainerData();
+      logScanStart(containerData);
+      if (container.scanData(instance.throttler, instance.canceler)) {
+        Instant now = Instant.now();
+        logScanCompleted(containerData, now);
+        instance.containerController.updateDataScanTimestamp(containerId, now);
+      } else {
+        instance.containerController.markContainerUnhealthy(containerId);
+        instance.metrics.incNumUnHealthyContainers();
+      }
+      instance.metrics.incNumContainersScanned();
+    } catch (IOException e) {
+      LOG.warn("Unexpected exception while scanning container "
+          + containerId, e);
+    }
+  }
+
+  private static void logScanStart(ContainerData containerData) {
+    if (LOG.isDebugEnabled()) {
+      Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
+      Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
+      LOG.debug("Scanning container {}, last scanned {}",
+          containerData.getContainerID(), lastScanTime);
+    }
+  }
+
+  private static void logScanCompleted(
+      ContainerData containerData, Instant timestamp) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Completed scan of container {} at {}",
+          containerData.getContainerID(), timestamp);
+    }
+  }
+
+  public static OnDemandScannerMetrics getMetrics() {
+    return instance.metrics;
+  }
+
+  public static synchronized void shutdown() {
+    if (instance == null) {
+      return;
+    }
+    instance.shutdownScanner();
+  }
+
+  private synchronized void shutdownScanner() {
+    instance = null;
+    metrics.unregister();
+    if (!scanExecutor.isShutdown()) {
+      scanExecutor.shutdown();
+    }
+    try {
+      long timeoutSeconds = 5;
+      if (!scanExecutor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
+        LOG.warn("On demand scanner shut down forcefully after {} seconds",
+            timeoutSeconds);
+        scanExecutor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("On demand scanner interrupted while waiting for shut down.");
+      scanExecutor.shutdownNow();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  public static Future<?> getLastScanFuture() {
+    return lastScanFuture;
+  }

Review Comment:
   Returns an optional of the future now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
errose28 commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r1006017800


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java:
##########
@@ -121,111 +119,126 @@ public static void shutdown() throws IOException {
     }
   }
 
+  //This test performs 2 separate tests because creating
+  // and running a cluster is expensive.
   @Test
-  public void testOpenContainerIntegrity() throws Exception {
+  public void testScannersMarkContainerUnhealthy() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    Instant testStartTime = Instant.now();
-
-    String value = "sample value";
+    String value = "sample key value";
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    for (int i = 0; i < 10; i++) {
-      String keyName = UUID.randomUUID().toString();
-
-      OzoneOutputStream out = bucket.createKey(keyName,
-          value.getBytes(UTF_8).length, RATIS,
-          ONE, new HashMap<>());
-      out.write(value.getBytes(UTF_8));
-      out.close();
-      OzoneKey key = bucket.getKey(keyName);
-      Assert.assertEquals(keyName, key.getName());
-      OzoneInputStream is = bucket.readKey(keyName);
-      byte[] fileContent = new byte[value.getBytes(UTF_8).length];
-      is.read(fileContent);
-      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
-          keyName, RATIS,
-          ONE));
-      Assert.assertEquals(value, new String(fileContent, UTF_8));
-      Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
-      Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
-    }
-
+    String keyNameInClosedContainer = "keyNameInClosedContainer";
+    OzoneOutputStream key = createKey(volumeName, bucketName,
+        keyNameInClosedContainer);
+    // write data more than 1 chunk
+    int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);

Review Comment:
   I'm not sure, we can leave it as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
errose28 commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r1006017800


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java:
##########
@@ -121,111 +119,126 @@ public static void shutdown() throws IOException {
     }
   }
 
+  //This test performs 2 separate tests because creating
+  // and running a cluster is expensive.
   @Test
-  public void testOpenContainerIntegrity() throws Exception {
+  public void testScannersMarkContainerUnhealthy() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    Instant testStartTime = Instant.now();
-
-    String value = "sample value";
+    String value = "sample key value";
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    for (int i = 0; i < 10; i++) {
-      String keyName = UUID.randomUUID().toString();
-
-      OzoneOutputStream out = bucket.createKey(keyName,
-          value.getBytes(UTF_8).length, RATIS,
-          ONE, new HashMap<>());
-      out.write(value.getBytes(UTF_8));
-      out.close();
-      OzoneKey key = bucket.getKey(keyName);
-      Assert.assertEquals(keyName, key.getName());
-      OzoneInputStream is = bucket.readKey(keyName);
-      byte[] fileContent = new byte[value.getBytes(UTF_8).length];
-      is.read(fileContent);
-      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
-          keyName, RATIS,
-          ONE));
-      Assert.assertEquals(value, new String(fileContent, UTF_8));
-      Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
-      Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
-    }
-
+    String keyNameInClosedContainer = "keyNameInClosedContainer";
+    OzoneOutputStream key = createKey(volumeName, bucketName,
+        keyNameInClosedContainer);
+    // write data more than 1 chunk
+    int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);

Review Comment:
   I'm not sure, we can leave it as is. It is not critical to the component being tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] Galsza commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
Galsza commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r1005390453


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java:
##########
@@ -373,11 +368,21 @@ private ContainerCommandResponseProto dispatchRequest(
             container.getContainerData().getState() == State.UNHEALTHY);
         sendCloseContainerActionIfNeeded(container);
       }
-
+      if (cmdType == Type.CreateContainer
+          && result == Result.SUCCESS && dispatcherContext != null) {
+        Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+        container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+      }

Review Comment:
   This logic was interrupting the otherwise very long and well described special case. I was trying to increase readability of this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] Galsza commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
Galsza commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r995393600


##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.verification.VerificationMode;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the on-demand container scanner.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestOnDemandContainerScanner {

Review Comment:
   WIP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 merged pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
errose28 merged PR #3788:
URL: https://github.com/apache/ozone/pull/3788


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
errose28 commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r1005980516


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java:
##########
@@ -373,11 +368,21 @@ private ContainerCommandResponseProto dispatchRequest(
             container.getContainerData().getState() == State.UNHEALTHY);
         sendCloseContainerActionIfNeeded(container);
       }
-
+      if (cmdType == Type.CreateContainer
+          && result == Result.SUCCESS && dispatcherContext != null) {
+        Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+        container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+      }

Review Comment:
   Ok. Just wanted to make sure there wasn't a functionality change intended here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] Galsza commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
Galsza commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r1005385575


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java:
##########
@@ -121,111 +119,126 @@ public static void shutdown() throws IOException {
     }
   }
 
+  //This test performs 2 separate tests because creating
+  // and running a cluster is expensive.
   @Test
-  public void testOpenContainerIntegrity() throws Exception {
+  public void testScannersMarkContainerUnhealthy() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    Instant testStartTime = Instant.now();
-
-    String value = "sample value";
+    String value = "sample key value";
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    for (int i = 0; i < 10; i++) {
-      String keyName = UUID.randomUUID().toString();
-
-      OzoneOutputStream out = bucket.createKey(keyName,
-          value.getBytes(UTF_8).length, RATIS,
-          ONE, new HashMap<>());
-      out.write(value.getBytes(UTF_8));
-      out.close();
-      OzoneKey key = bucket.getKey(keyName);
-      Assert.assertEquals(keyName, key.getName());
-      OzoneInputStream is = bucket.readKey(keyName);
-      byte[] fileContent = new byte[value.getBytes(UTF_8).length];
-      is.read(fileContent);
-      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
-          keyName, RATIS,
-          ONE));
-      Assert.assertEquals(value, new String(fileContent, UTF_8));
-      Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
-      Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
-    }
-
+    String keyNameInClosedContainer = "keyNameInClosedContainer";
+    OzoneOutputStream key = createKey(volumeName, bucketName,
+        keyNameInClosedContainer);
+    // write data more than 1 chunk
+    int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);

Review Comment:
   I'm not sure about the inner workings of the test environment but trying to write less than one chunk results in the waitForContainerClose method timing out. I guess the reason for that is that data less than one chunk gets buffered and the buffer waits until more data is added. So no container is created without a whole chunk being written. (Other tests do this too, I'm not sure how to fix this. Do you have a recommendation? I'm leaving it like this for now)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] Galsza commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
Galsza commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r995390999


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java:
##########
@@ -307,29 +307,27 @@ private void buildContainerSet() {
   private void startContainerScrub() {
     ContainerScannerConfiguration c = config.getObject(
         ContainerScannerConfiguration.class);
-    boolean enabled = c.isEnabled();
-
-    if (!enabled) {
+    if (!c.isEnabled()) {
       LOG.info("Background container scanner has been disabled.");

Review Comment:
   Yup, I thought the same thing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on PR #3788:
URL: https://github.com/apache/ozone/pull/3788#issuecomment-1318898454

   Nice work @Galsza and @errose28 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] Galsza commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
Galsza commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r995393152


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Container<?>, Boolean> toBeScannedContainers;
+  private final OnDemandScannerMetrics metrics;
+  @VisibleForTesting
+  private static Future<?> lastScanFuture;
+
+  private OnDemandContainerScanner(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    containerController = controller;
+    throttler = new DataTransferThrottler(
+        conf.getOnDemandBandwidthPerVolume());
+    canceler = new Canceler();
+    metrics = OnDemandScannerMetrics.create();
+    scanExecutor = Executors.newSingleThreadExecutor();
+    toBeScannedContainers = ConcurrentHashMap.newKeySet();
+  }
+
+  public static synchronized void init(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    if (instance != null) {
+      LOG.warn("Trying to initialize on demand scanner" +
+          " a second time on a datanode.");
+      return;
+    }
+    instance = new OnDemandContainerScanner(conf, controller);
+  }
+
+  public static void scanContainer(Container<?> container) {
+    if (instance == null) {
+      return;
+    }
+    if (container.shouldScanData() &&
+        instance.toBeScannedContainers.add(container)) {
+      lastScanFuture = instance.scanExecutor.submit(() -> {
+        instance.toBeScannedContainers.remove(container);
+        if (container.shouldScanData()) {
+          performOnDemandScan(container);
+        }
+      });
+    }
+  }
+
+  private static void performOnDemandScan(Container<?> container) {
+    long containerId = container.getContainerData().getContainerID();
+    try {
+      ContainerData containerData = container.getContainerData();
+      logScanStart(containerData);
+      if (container.scanData(instance.throttler, instance.canceler)) {
+        Instant now = Instant.now();

Review Comment:
   The clock won't be used for testing just yet. The instant.now is used to update the container's last scan time and therefore it is logged separately to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] Galsza commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
Galsza commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r995393328


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandScannerMetrics.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
+/**
+ * This class captures the on-demand container data scanner metrics.
+ **/
+@InterfaceAudience.Private
+@Metrics(about = "On-demand container data scanner metrics", context = "dfs")
+public final class OnDemandScannerMetrics
+    extends AbstractContainerScannerMetrics {

Review Comment:
   Yes, I'm aware of this, and it comes from how the OnDemandScanner is a bit different to the scheduled/offline scanners. In a perfect world, AbstractContainerScanner could be used for the OnDemandScanner as well, but I didn't know in advance how much the OnDemandScanner would differ from offline scanners, and deemed that trying to fit it into the AbstractContainerScanner would result in wasted effort for now.
   A side effect of this is that there is a slight code duplication with ContainerDataScanner as well, where the main scanning method is almost the same as the one in OnDemandScanner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] Galsza commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
Galsza commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r995390828


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java:
##########
@@ -307,29 +307,27 @@ private void buildContainerSet() {
   private void startContainerScrub() {
     ContainerScannerConfiguration c = config.getObject(
         ContainerScannerConfiguration.class);
-    boolean enabled = c.isEnabled();
-
-    if (!enabled) {
+    if (!c.isEnabled()) {
       LOG.info("Background container scanner has been disabled.");
-    } else {
-      if (this.metadataScanner == null) {
-        this.metadataScanner = new ContainerMetadataScanner(c, controller);
-      }
-      this.metadataScanner.start();
-
-      if (c.getBandwidthPerVolume() == 0L) {
-        LOG.warn(VOLUME_BYTES_PER_SECOND_KEY + " is set to 0, " +
-            "so background container data scanner will not start.");
-        return;
-      }
+      return;
+    }
+    OnDemandContainerScanner.init(c, controller);
+    if (this.metadataScanner == null) {
+      this.metadataScanner = new ContainerMetadataScanner(c, controller);
+    }
+    this.metadataScanner.start();
 
-      dataScanners = new ArrayList<>();
-      for (StorageVolume v : volumeSet.getVolumesList()) {
-        ContainerDataScanner s = new ContainerDataScanner(c, controller,
-            (HddsVolume) v);
-        s.start();
-        dataScanners.add(s);
-      }
+    if (c.getBandwidthPerVolume() == 0L) {

Review Comment:
   Nice catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
sodonnel commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r993672113


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Container<?>, Boolean> toBeScannedContainers;
+  private final OnDemandScannerMetrics metrics;
+  @VisibleForTesting
+  private static Future<?> lastScanFuture;
+
+  private OnDemandContainerScanner(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    containerController = controller;
+    throttler = new DataTransferThrottler(
+        conf.getOnDemandBandwidthPerVolume());
+    canceler = new Canceler();
+    metrics = OnDemandScannerMetrics.create();
+    scanExecutor = Executors.newSingleThreadExecutor();
+    toBeScannedContainers = ConcurrentHashMap.newKeySet();
+  }
+
+  public static synchronized void init(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    if (instance != null) {
+      LOG.warn("Trying to initialize on demand scanner" +
+          " a second time on a datanode.");
+      return;
+    }
+    instance = new OnDemandContainerScanner(conf, controller);
+  }
+
+  public static void scanContainer(Container<?> container) {
+    if (instance == null) {
+      return;
+    }
+    if (container.shouldScanData() &&
+        instance.toBeScannedContainers.add(container)) {
+      lastScanFuture = instance.scanExecutor.submit(() -> {
+        instance.toBeScannedContainers.remove(container);
+        if (container.shouldScanData()) {
+          performOnDemandScan(container);
+        }
+      });
+    }
+  }
+
+  private static void performOnDemandScan(Container<?> container) {
+    long containerId = container.getContainerData().getContainerID();
+    try {
+      ContainerData containerData = container.getContainerData();
+      logScanStart(containerData);
+      if (container.scanData(instance.throttler, instance.canceler)) {
+        Instant now = Instant.now();

Review Comment:
   I am not sure what all `now` will be used for, but it may make sense to pass a clock to this object when creating it. In a few places in the code, we use the `MonotonicClock` class for this. At runtime it really just wraps `Instant.now` etc, but in tests you can pass in TestClock instead, which lets you manipulate the time without sleeps. If you have any need to create tests when the passage of time affects things, it would make sense to use this approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
sodonnel commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r993664378


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Container<?>, Boolean> toBeScannedContainers;
+  private final OnDemandScannerMetrics metrics;
+  @VisibleForTesting
+  private static Future<?> lastScanFuture;
+
+  private OnDemandContainerScanner(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    containerController = controller;
+    throttler = new DataTransferThrottler(
+        conf.getOnDemandBandwidthPerVolume());
+    canceler = new Canceler();
+    metrics = OnDemandScannerMetrics.create();
+    scanExecutor = Executors.newSingleThreadExecutor();
+    toBeScannedContainers = ConcurrentHashMap.newKeySet();
+  }
+
+  public static synchronized void init(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    if (instance != null) {
+      LOG.warn("Trying to initialize on demand scanner" +
+          " a second time on a datanode.");
+      return;
+    }
+    instance = new OnDemandContainerScanner(conf, controller);
+  }
+
+  public static void scanContainer(Container<?> container) {
+    if (instance == null) {
+      return;
+    }
+    if (container.shouldScanData() &&
+        instance.toBeScannedContainers.add(container)) {
+      lastScanFuture = instance.scanExecutor.submit(() -> {
+        instance.toBeScannedContainers.remove(container);
+        if (container.shouldScanData()) {
+          performOnDemandScan(container);
+        }
+      });
+    }
+  }
+
+  private static void performOnDemandScan(Container<?> container) {
+    long containerId = container.getContainerData().getContainerID();
+    try {
+      ContainerData containerData = container.getContainerData();
+      logScanStart(containerData);
+      if (container.scanData(instance.throttler, instance.canceler)) {
+        Instant now = Instant.now();
+        logScanCompleted(containerData, now);
+        instance.containerController.updateDataScanTimestamp(containerId, now);
+      } else {
+        instance.containerController.markContainerUnhealthy(containerId);
+        instance.metrics.incNumUnHealthyContainers();
+      }
+      instance.metrics.incNumContainersScanned();
+    } catch (IOException e) {
+      LOG.warn("Unexpected exception while scanning container "
+          + containerId, e);
+    }
+  }
+
+  private static void logScanStart(ContainerData containerData) {
+    if (LOG.isDebugEnabled()) {
+      Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
+      Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
+      LOG.debug("Scanning container {}, last scanned {}",
+          containerData.getContainerID(), lastScanTime);
+    }
+  }
+
+  private static void logScanCompleted(
+      ContainerData containerData, Instant timestamp) {
+    if (LOG.isDebugEnabled()) {

Review Comment:
   When there are simple parameters being passed to the `LOG.debug`, you don't need to wrap it in `if (LOG.isDebugEnabled())` as the underlying logger has this check anyway.
   
   Note that above, where you creating a string to pass to debug, it does make sense to wrap the log in an IF statement, to avoid creating the string when its not going to be logged anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] Galsza commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
Galsza commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r1000373288


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java:
##########
@@ -373,11 +368,21 @@ private ContainerCommandResponseProto dispatchRequest(
             container.getContainerData().getState() == State.UNHEALTHY);
         sendCloseContainerActionIfNeeded(container);
       }
-
+      if (cmdType == Type.CreateContainer
+          && result == Result.SUCCESS && dispatcherContext != null) {
+        Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+        container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+      }
       if (result == Result.SUCCESS) {
         updateBCSID(container, dispatcherContext, cmdType);
         audit(action, eventType, params, AuditEventStatus.SUCCESS, null);
       } else {
+        //TODO HDDS-7096:
+        // This is a too general place for on demand scanning.
+        // Create a specific exception that signals for on demand scanning
+        // and move this general scan to where it is more appropriate.
+        // Add integration tests to test the full functionality.
+        OnDemandContainerScanner.scanContainer(container);

Review Comment:
   Thank for these observations. As agreed, this is going to be continued in [HDDS-7096](https://issues.apache.org/jira/browse/HDDS-7096) with the exceptions. And eventually the other ideas should be implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a diff in pull request #3788: HDDS-7095. allow on demand scanning for containers

Posted by GitBox <gi...@apache.org>.
errose28 commented on code in PR #3788:
URL: https://github.com/apache/ozone/pull/3788#discussion_r1003539953


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java:
##########
@@ -373,11 +368,21 @@ private ContainerCommandResponseProto dispatchRequest(
             container.getContainerData().getState() == State.UNHEALTHY);
         sendCloseContainerActionIfNeeded(container);
       }
-
+      if (cmdType == Type.CreateContainer
+          && result == Result.SUCCESS && dispatcherContext != null) {
+        Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+        container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+      }

Review Comment:
   What's the reason for moving this code block lower?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java:
##########
@@ -86,6 +92,15 @@ public class ContainerScannerConfiguration {
           + " by scanner per volume.")
   private long bandwidthPerVolume = BANDWIDTH_PER_VOLUME_DEFAULT;
 
+  @Config(key = "on.demand.volume.bytes.per.second",
+      type = ConfigType.LONG,
+      defaultValue = "5242880",
+      tags = {ConfigTag.STORAGE},
+      description = "Config parameter to throttle I/O bandwidth used"
+          + " by on demand scanner per volume.")

Review Comment:
   ```suggestion
             + " by the on demand container scanner per volume.")
   ```
   We can update the background scanner description too to clarify it is separate from the volume scanner.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java:
##########
@@ -121,111 +119,126 @@ public static void shutdown() throws IOException {
     }
   }
 
+  //This test performs 2 separate tests because creating
+  // and running a cluster is expensive.
   @Test
-  public void testOpenContainerIntegrity() throws Exception {
+  public void testScannersMarkContainerUnhealthy() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    Instant testStartTime = Instant.now();
-
-    String value = "sample value";
+    String value = "sample key value";
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    for (int i = 0; i < 10; i++) {
-      String keyName = UUID.randomUUID().toString();
-
-      OzoneOutputStream out = bucket.createKey(keyName,
-          value.getBytes(UTF_8).length, RATIS,
-          ONE, new HashMap<>());
-      out.write(value.getBytes(UTF_8));
-      out.close();
-      OzoneKey key = bucket.getKey(keyName);
-      Assert.assertEquals(keyName, key.getName());
-      OzoneInputStream is = bucket.readKey(keyName);
-      byte[] fileContent = new byte[value.getBytes(UTF_8).length];
-      is.read(fileContent);
-      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
-          keyName, RATIS,
-          ONE));
-      Assert.assertEquals(value, new String(fileContent, UTF_8));
-      Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
-      Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
-    }
-
+    String keyNameInClosedContainer = "keyNameInClosedContainer";
+    OzoneOutputStream key = createKey(volumeName, bucketName,
+        keyNameInClosedContainer);
+    // write data more than 1 chunk
+    int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);
+    byte[] data = ContainerTestHelper
+        .getFixedLengthString(value, sizeLargerThanOneChunk)
+        .getBytes(UTF_8);
+    key.write(data);
+
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    TestHelper.waitForContainerClose(key, cluster);
+    key.flush();
+    key.close();

Review Comment:
   Is this the correct order, closing the container before flushing the key? Seems like it should be the other way around.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java:
##########
@@ -307,29 +307,27 @@ private void buildContainerSet() {
   private void startContainerScrub() {
     ContainerScannerConfiguration c = config.getObject(
         ContainerScannerConfiguration.class);
-    boolean enabled = c.isEnabled();
-
-    if (!enabled) {
+    if (!c.isEnabled()) {
       LOG.info("Background container scanner has been disabled.");

Review Comment:
   Can we just update the log message to reflect this?



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.verification.VerificationMode;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+
+/**
+ * Unit tests for the on-demand container scanner.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestOnDemandContainerScanner {
+
+  private final AtomicLong containerIdSeq = new AtomicLong(100);
+
+  @Mock
+  private Container<ContainerData> healthy;
+
+  @Mock
+  private Container<ContainerData> openContainer;
+
+  @Mock
+  private Container<ContainerData> corruptData;
+
+  private ContainerScannerConfiguration conf;
+  private ContainerController controller;
+
+  @Before
+  public void setup() {
+    conf = newInstanceOf(ContainerScannerConfiguration.class);
+    conf.setMetadataScanInterval(0);
+    conf.setDataScanInterval(0);
+    controller = mockContainerController();
+  }
+
+  @After
+  public void tearDown() {
+    OnDemandContainerScanner.shutdown();
+  }
+
+  @Test
+  public void testOnDemandContainerScanner() throws Exception {
+    //Without initialization,
+    // there shouldn't be interaction with containerController
+    OnDemandContainerScanner.scanContainer(corruptData);
+    Mockito.verifyZeroInteractions(controller);
+    OnDemandContainerScanner.init(conf, controller);
+    testContainerMarkedUnhealthy(healthy, never());
+    testContainerMarkedUnhealthy(corruptData, atLeastOnce());
+    testContainerMarkedUnhealthy(openContainer, never());
+  }
+
+  @Test
+  public void testContainerScannerMultipleInitsAndShutdowns() throws Exception {
+    OnDemandContainerScanner.init(conf, controller);
+    OnDemandContainerScanner.init(conf, controller);
+    OnDemandContainerScanner.shutdown();
+    OnDemandContainerScanner.shutdown();
+    //There shouldn't be an interaction after shutdown:
+    testContainerMarkedUnhealthy(corruptData, never());
+  }
+
+  @Test
+  public void testSameContainerQueuedMultipleTimes() {

Review Comment:
   The actual asserts in this test may not run due to execution order. Since the containers are mocked, can we ensure an ongoing scan by scanning a mocked container whose `scanData` method bocks with a countdown latch or similar construct? See `UpgradeTestUtils#newPausingFinalizationExecutor` for an example of using this approach to halt code execution until the test can run some checks or setup. Then we can run the queue duplicate checks in this test, and we can also check that `Container#scanData` is only called on the first mock queued.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java:
##########
@@ -45,12 +45,18 @@ public class ContainerScannerConfiguration {
       "hdds.container.scrub.data.scan.interval";
   public static final String VOLUME_BYTES_PER_SECOND_KEY =
       "hdds.container.scrub.volume.bytes.per.second";
+  public static final String ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY =
+      "hdds.container.scrub.on.demand.volume.bytes.per.second";
 
   public static final long METADATA_SCAN_INTERVAL_DEFAULT =
       Duration.ofHours(3).toMillis();
   public static final long DATA_SCAN_INTERVAL_DEFAULT =
       Duration.ofDays(7).toMillis();
-  public static final long BANDWIDTH_PER_VOLUME_DEFAULT = 1048576;   // 1MB
+
+  private static final long BYTES_IN_MEGABYTES = 1048576L;

Review Comment:
   We can use `OzoneConsts.MB` instead of defining our own constant here. Might be useful for other bandwidth configurations in this class as well.



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java:
##########
@@ -157,4 +166,32 @@ public static void createDbInstancesForTestIfNeeded(
           null, null);
     }
   }
+
+  public static void waitForScanToFinish(Future<?> scanResult) {
+    try {
+      scanResult.get();
+    } catch (Exception e) {
+      throw new RuntimeException("Error while waiting" +
+          " for on-demand scan to finish");
+    }
+  }

Review Comment:
   This method isn't totally necessary. If you look at other test examples they usually just put `throws Exception` on each test method since it is assumed that any exceptions that make it to the top level indicate a test failure. Ideally the container scanner's logging and exceptions should be descriptive enough without wrapping them with an extra error message.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java:
##########
@@ -121,111 +119,126 @@ public static void shutdown() throws IOException {
     }
   }
 
+  //This test performs 2 separate tests because creating
+  // and running a cluster is expensive.
   @Test
-  public void testOpenContainerIntegrity() throws Exception {
+  public void testScannersMarkContainerUnhealthy() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    Instant testStartTime = Instant.now();
-
-    String value = "sample value";
+    String value = "sample key value";
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    for (int i = 0; i < 10; i++) {
-      String keyName = UUID.randomUUID().toString();
-
-      OzoneOutputStream out = bucket.createKey(keyName,
-          value.getBytes(UTF_8).length, RATIS,
-          ONE, new HashMap<>());
-      out.write(value.getBytes(UTF_8));
-      out.close();
-      OzoneKey key = bucket.getKey(keyName);
-      Assert.assertEquals(keyName, key.getName());
-      OzoneInputStream is = bucket.readKey(keyName);
-      byte[] fileContent = new byte[value.getBytes(UTF_8).length];
-      is.read(fileContent);
-      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
-          keyName, RATIS,
-          ONE));
-      Assert.assertEquals(value, new String(fileContent, UTF_8));
-      Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
-      Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
-    }
-
+    String keyNameInClosedContainer = "keyNameInClosedContainer";
+    OzoneOutputStream key = createKey(volumeName, bucketName,
+        keyNameInClosedContainer);
+    // write data more than 1 chunk
+    int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);
+    byte[] data = ContainerTestHelper
+        .getFixedLengthString(value, sizeLargerThanOneChunk)
+        .getBytes(UTF_8);
+    key.write(data);
+
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    TestHelper.waitForContainerClose(key, cluster);
+    key.flush();
+    key.close();
+
+    String keyNameInOpenContainer = "keyNameInOpenContainer";
+    OzoneOutputStream key2 = createKey(volumeName, bucketName,
+        keyNameInOpenContainer);
+    key2.write(data);
+    key2.close();
     // wait for the container report to propagate to SCM
     Thread.sleep(5000);
 
-
     Assert.assertEquals(1, cluster.getHddsDatanodes().size());
 
     HddsDatanodeService dn = cluster.getHddsDatanodes().get(0);
     OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
-    ContainerSet cs = oc.getContainerSet();
-    Container c = cs.getContainerIterator().next();
+    ContainerSet containerSet = oc.getContainerSet();
+    //Given an open and a closed container
+    Assert.assertTrue(containerSet.containerCount() > 1);
+    Container<?> openContainer = getContainerInState(containerSet, OPEN);
+    Container<?> closedContainer = getContainerInState(containerSet, CLOSED);
 
-    Assert.assertTrue(cs.containerCount() > 0);
-
-    // delete the chunks directory.
-    File chunksDir = new File(c.getContainerData().getContainerPath(),
-        "chunks");
-    deleteDirectory(chunksDir);
-    Assert.assertFalse(chunksDir.exists());
+    //When deleting their metadata to make them unhealthy and scanning them
+    deleteChunksDirForContainer(openContainer);
+    deleteChunksDirForContainer(closedContainer);
 
     ContainerScannerConfiguration conf = ozoneConfig.getObject(
         ContainerScannerConfiguration.class);
     ContainerMetadataScanner sb = new ContainerMetadataScanner(conf,
         oc.getController());
-    sb.scanContainer(c);
-
+    //Scan the open container and trigger on-demand scan for the closed one
+    sb.scanContainer(openContainer);
+    tryReadKeyWithMissingChunksDir(bucket, keyNameInClosedContainer);
     // wait for the incremental container report to propagate to SCM
     Thread.sleep(5000);
 
     ContainerManager cm = cluster.getStorageContainerManager()
         .getContainerManager();
-    Set<ContainerReplica> replicas = cm.getContainerReplicas(
-        ContainerID.valueOf(c.getContainerData().getContainerID()));
-    Assert.assertEquals(1, replicas.size());
-    ContainerReplica r = replicas.iterator().next();
-    Assert.assertEquals(StorageContainerDatanodeProtocolProtos.
-        ContainerReplicaProto.State.UNHEALTHY, r.getState());
+    ContainerReplica openContainerReplica = getContainerReplica(
+        cm, openContainer.getContainerData().getContainerID());
+    ContainerReplica closedContainerReplica = getContainerReplica(
+        cm, closedContainer.getContainerData().getContainerID());
+    //Then both containers are marked unhealthy
+    Assert.assertEquals(State.UNHEALTHY, openContainerReplica.getState());
+    Assert.assertEquals(State.UNHEALTHY, closedContainerReplica.getState());
+  }
+
+  private ContainerReplica getContainerReplica(
+      ContainerManager cm, long containerId) throws ContainerNotFoundException {
+    Set<ContainerReplica> containerReplicas = cm.getContainerReplicas(
+        ContainerID.valueOf(
+            containerId));
+    Assert.assertEquals(1, containerReplicas.size());
+    return containerReplicas.iterator().next();
+  }
+
+  //ignore the result of the key read because it is expected to fail
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  private void tryReadKeyWithMissingChunksDir(
+      OzoneBucket bucket, String keyNameInClosedContainer) throws IOException {
+    try (OzoneInputStream key = bucket.readKey(keyNameInClosedContainer)) {
+      key.read();
+    } catch (StorageContainerException ignored) {

Review Comment:
   Let's assert that the exception is thrown here.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java:
##########
@@ -121,111 +119,126 @@ public static void shutdown() throws IOException {
     }
   }
 
+  //This test performs 2 separate tests because creating
+  // and running a cluster is expensive.
   @Test
-  public void testOpenContainerIntegrity() throws Exception {
+  public void testScannersMarkContainerUnhealthy() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    Instant testStartTime = Instant.now();
-
-    String value = "sample value";
+    String value = "sample key value";
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    for (int i = 0; i < 10; i++) {
-      String keyName = UUID.randomUUID().toString();
-
-      OzoneOutputStream out = bucket.createKey(keyName,
-          value.getBytes(UTF_8).length, RATIS,
-          ONE, new HashMap<>());
-      out.write(value.getBytes(UTF_8));
-      out.close();
-      OzoneKey key = bucket.getKey(keyName);
-      Assert.assertEquals(keyName, key.getName());
-      OzoneInputStream is = bucket.readKey(keyName);
-      byte[] fileContent = new byte[value.getBytes(UTF_8).length];
-      is.read(fileContent);
-      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
-          keyName, RATIS,
-          ONE));
-      Assert.assertEquals(value, new String(fileContent, UTF_8));
-      Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
-      Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
-    }
-
+    String keyNameInClosedContainer = "keyNameInClosedContainer";
+    OzoneOutputStream key = createKey(volumeName, bucketName,
+        keyNameInClosedContainer);
+    // write data more than 1 chunk
+    int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);

Review Comment:
   Why do we need more than one chunk?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Long, Boolean> containerRescheduleCheckSet;
+  private final OnDemandScannerMetrics metrics;
+
+  private OnDemandContainerScanner(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    containerController = controller;
+    throttler = new DataTransferThrottler(
+        conf.getOnDemandBandwidthPerVolume());
+    canceler = new Canceler();
+    metrics = OnDemandScannerMetrics.create();
+    scanExecutor = Executors.newSingleThreadExecutor();
+    containerRescheduleCheckSet = ConcurrentHashMap.newKeySet();
+  }
+
+  public static synchronized void init(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    if (instance != null) {
+      LOG.warn("Trying to initialize on demand scanner" +
+          " a second time on a datanode.");
+      return;
+    }
+    instance = new OnDemandContainerScanner(conf, controller);
+  }
+
+  public static Optional<Future<?>> scanContainer(Container<?> container) {
+    if (instance == null || !container.shouldScanData()) {
+      return Optional.empty();
+    }
+    Future<?> resultFuture = null;
+    long containerId = container.getContainerData().getContainerID();
+    if (addContainerToScheduledContainers(containerId)) {
+      resultFuture = instance.scanExecutor.submit(() -> {
+        removeContainerFromScheduledContainers(containerId);
+        if (container.shouldScanData()) {
+          performOnDemandScan(container);
+        }
+      });
+    }
+    return Optional.ofNullable(resultFuture);
+  }
+
+  private static boolean addContainerToScheduledContainers(long containerId) {
+    return instance.containerRescheduleCheckSet.add(containerId);
+  }
+
+  private static void removeContainerFromScheduledContainers(
+      long containerId) {
+    instance.containerRescheduleCheckSet.remove(containerId);
+  }
+
+  private static void performOnDemandScan(Container<?> container) {
+    long containerId = container.getContainerData().getContainerID();
+    try {
+      ContainerData containerData = container.getContainerData();
+      logScanStart(containerData);
+      if (container.scanData(instance.throttler, instance.canceler)) {
+        Instant now = Instant.now();
+        logScanCompleted(containerData, now);
+        instance.containerController.updateDataScanTimestamp(containerId, now);
+      } else {
+        instance.containerController.markContainerUnhealthy(containerId);
+        instance.metrics.incNumUnHealthyContainers();
+      }
+      instance.metrics.incNumContainersScanned();
+    } catch (IOException e) {
+      LOG.warn("Unexpected exception while scanning container "
+          + containerId, e);
+    }
+  }
+
+  private static void logScanStart(ContainerData containerData) {
+    if (LOG.isDebugEnabled()) {
+      Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
+      Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
+      LOG.debug("Scanning container {}, last scanned {}",
+          containerData.getContainerID(), lastScanTime);
+    }
+  }
+
+  private static void logScanCompleted(
+      ContainerData containerData, Instant timestamp) {
+    LOG.debug("Completed scan of container {} at {}",
+        containerData.getContainerID(), timestamp);
+  }
+
+  public static OnDemandScannerMetrics getMetrics() {
+    return instance.metrics;
+  }
+
+  public static synchronized void shutdown() {
+    if (instance == null) {
+      return;
+    }
+    instance.shutdownScanner();
+  }
+
+  private synchronized void shutdownScanner() {
+    instance = null;
+    metrics.unregister();
+    if (!scanExecutor.isShutdown()) {
+      scanExecutor.shutdown();
+    }
+    try {
+      long timeoutSeconds = 5;
+      if (!scanExecutor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
+        LOG.warn("On demand scanner shut down forcefully after {} seconds",
+            timeoutSeconds);
+        scanExecutor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("On demand scanner interrupted while waiting for shut down.");
+      scanExecutor.shutdownNow();
+      throw new RuntimeException(e);

Review Comment:
   I think we should set the interrupt flag and return instead of throwing. See `AbstractContainerScanner#shutdown` for reference.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+  private static volatile OnDemandContainerScanner instance;
+
+  private final ExecutorService scanExecutor;
+  private final ContainerController containerController;
+  private final DataTransferThrottler throttler;
+  private final Canceler canceler;
+  private final ConcurrentHashMap
+      .KeySetView<Long, Boolean> containerRescheduleCheckSet;
+  private final OnDemandScannerMetrics metrics;
+
+  private OnDemandContainerScanner(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    containerController = controller;
+    throttler = new DataTransferThrottler(
+        conf.getOnDemandBandwidthPerVolume());
+    canceler = new Canceler();
+    metrics = OnDemandScannerMetrics.create();
+    scanExecutor = Executors.newSingleThreadExecutor();
+    containerRescheduleCheckSet = ConcurrentHashMap.newKeySet();
+  }
+
+  public static synchronized void init(
+      ContainerScannerConfiguration conf, ContainerController controller) {
+    if (instance != null) {
+      LOG.warn("Trying to initialize on demand scanner" +
+          " a second time on a datanode.");
+      return;
+    }
+    instance = new OnDemandContainerScanner(conf, controller);
+  }
+
+  public static Optional<Future<?>> scanContainer(Container<?> container) {
+    if (instance == null || !container.shouldScanData()) {
+      return Optional.empty();
+    }
+    Future<?> resultFuture = null;
+    long containerId = container.getContainerData().getContainerID();
+    if (addContainerToScheduledContainers(containerId)) {
+      resultFuture = instance.scanExecutor.submit(() -> {
+        removeContainerFromScheduledContainers(containerId);
+        if (container.shouldScanData()) {
+          performOnDemandScan(container);
+        }
+      });
+    }
+    return Optional.ofNullable(resultFuture);
+  }
+
+  private static boolean addContainerToScheduledContainers(long containerId) {
+    return instance.containerRescheduleCheckSet.add(containerId);
+  }
+
+  private static void removeContainerFromScheduledContainers(
+      long containerId) {
+    instance.containerRescheduleCheckSet.remove(containerId);
+  }
+
+  private static void performOnDemandScan(Container<?> container) {
+    long containerId = container.getContainerData().getContainerID();
+    try {
+      ContainerData containerData = container.getContainerData();
+      logScanStart(containerData);
+      if (container.scanData(instance.throttler, instance.canceler)) {
+        Instant now = Instant.now();
+        logScanCompleted(containerData, now);
+        instance.containerController.updateDataScanTimestamp(containerId, now);
+      } else {
+        instance.containerController.markContainerUnhealthy(containerId);
+        instance.metrics.incNumUnHealthyContainers();
+      }
+      instance.metrics.incNumContainersScanned();
+    } catch (IOException e) {
+      LOG.warn("Unexpected exception while scanning container "
+          + containerId, e);
+    }
+  }
+
+  private static void logScanStart(ContainerData containerData) {
+    if (LOG.isDebugEnabled()) {
+      Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
+      Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
+      LOG.debug("Scanning container {}, last scanned {}",
+          containerData.getContainerID(), lastScanTime);
+    }
+  }
+
+  private static void logScanCompleted(
+      ContainerData containerData, Instant timestamp) {
+    LOG.debug("Completed scan of container {} at {}",
+        containerData.getContainerID(), timestamp);
+  }
+
+  public static OnDemandScannerMetrics getMetrics() {
+    return instance.metrics;
+  }
+
+  public static synchronized void shutdown() {
+    if (instance == null) {
+      return;
+    }
+    instance.shutdownScanner();
+  }
+
+  private synchronized void shutdownScanner() {

Review Comment:
   This should probably invoke the canceler like the shutdown of the background data scanner does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org