You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2019/03/12 19:32:55 UTC

[hadoop] branch trunk updated: HDDS-1163. Basic framework for Ozone Data Scrubber. Contributed by Supratim Deka.

This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 24793d2  HDDS-1163. Basic framework for Ozone Data Scrubber. Contributed by Supratim Deka.
24793d2 is described below

commit 24793d2d971788de904165f7490f17d79d078a6a
Author: Arpit Agarwal <ar...@apache.org>
AuthorDate: Wed Mar 13 04:32:39 2019 +0900

    HDDS-1163. Basic framework for Ozone Data Scrubber. Contributed by Supratim Deka.
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   5 +-
 .../common/src/main/resources/ozone-default.xml    |  10 +
 .../container/common/interfaces/Container.java     |   5 +
 .../container/keyvalue/KeyValueContainer.java      |  69 +++-
 .../container/keyvalue/KeyValueContainerCheck.java | 432 +++++++++++++++++++++
 .../container/ozoneimpl/ContainerScrubber.java     | 158 ++++++++
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  34 ++
 .../keyvalue/TestKeyValueContainerCheck.java       | 194 +++++++++
 8 files changed, 904 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 3dd28f6..3bb3895 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -65,6 +65,9 @@ public final class HddsConfigKeys {
   public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
   public static final String HDDS_SCM_CHILLMODE_ENABLED =
       "hdds.scm.chillmode.enabled";
+  public static final String HDDS_CONTAINERSCRUB_ENABLED =
+      "hdds.containerscrub.enabled";
+  public static final boolean HDDS_CONTAINERSCRUB_ENABLED_DEFAULT = false;
   public static final boolean HDDS_SCM_CHILLMODE_ENABLED_DEFAULT = true;
   public static final String HDDS_SCM_CHILLMODE_MIN_DATANODE =
       "hdds.scm.chillmode.min.datanode";
@@ -255,4 +258,4 @@ public final class HddsConfigKeys {
   public static final String
       HDDS_DATANODE_HTTP_KERBEROS_KEYTAB_FILE_KEY =
       "hdds.datanode.http.kerberos.keytab";
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 54eb5c8..331b5c4 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1347,6 +1347,16 @@
   </property>
 
   <property>
+    <name>hdds.containerscrub.enabled</name>
+    <value>false</value>
+    <tag>DATANODE</tag>
+    <description>
+      Boolean value to enable data and metadata scrubbing in the containers
+      running on each datanode.
+    </description>
+  </property>
+
+  <property>
     <name>hdds.container.action.max.limit</name>
     <value>20</value>
     <tag>DATANODE</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 89f09fd..1fcaaf5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -151,4 +151,9 @@ public interface Container<CONTAINERDATA extends ContainerData> extends RwLock {
    * updates the blockCommitSequenceId.
    */
   void updateBlockCommitSequenceId(long blockCommitSequenceId);
+
+  /**
+   * check and report the structural integrity of the container.
+   */
+  void check() throws StorageContainerException;
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index de1b109..20dfd9c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -565,8 +565,13 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
    */
   @Override
   public File getContainerFile() {
-    return new File(containerData.getMetadataPath(), containerData
-        .getContainerID() + OzoneConsts.CONTAINER_EXTENSION);
+    return getContainerFile(containerData.getMetadataPath(),
+            containerData.getContainerID());
+  }
+
+  static File getContainerFile(String metadataPath, long containerId) {
+    return new File(metadataPath,
+        containerId + OzoneConsts.CONTAINER_EXTENSION);
   }
 
   @Override
@@ -635,6 +640,66 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
   }
 
   /**
+   * run integrity checks on the Container metadata.
+   */
+  public void check() throws StorageContainerException {
+    ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK;
+    long containerId = containerData.getContainerID();
+
+    switch (containerData.getState()) {
+    case OPEN:
+      level = ContainerCheckLevel.FAST_CHECK;
+      LOG.info("Doing Fast integrity checks for Container ID : {},"
+          + " because it is OPEN", containerId);
+      break;
+    case CLOSING:
+      level = ContainerCheckLevel.FAST_CHECK;
+      LOG.info("Doing Fast integrity checks for Container ID : {},"
+          + " because it is CLOSING", containerId);
+      break;
+    case CLOSED:
+    case QUASI_CLOSED:
+      level = ContainerCheckLevel.FULL_CHECK;
+      LOG.debug("Doing Full integrity checks for Container ID : {},"
+              + " because it is in {} state", containerId,
+          containerData.getState());
+      break;
+    default:
+      throw new StorageContainerException(
+          "Invalid Container state found for Container : " + containerData
+              .getContainerID(), INVALID_CONTAINER_STATE);
+    }
+
+    if (level == ContainerCheckLevel.NO_CHECK) {
+      LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
+      return;
+    }
+
+    KeyValueContainerCheck checker =
+        new KeyValueContainerCheck(containerData.getMetadataPath(), config,
+            containerId, containerData);
+
+    switch (level) {
+    case FAST_CHECK:
+      checker.fastCheck();
+      break;
+    case FULL_CHECK:
+      checker.fullCheck();
+      break;
+    case NO_CHECK:
+      LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
+      break;
+    default:
+      // we should not be here at all, scuttle the ship!
+      Preconditions.checkNotNull(0, "Invalid Containercheck level");
+    }
+  }
+
+  private enum ContainerCheckLevel {
+    NO_CHECK, FAST_CHECK, FULL_CHECK
+  }
+
+  /**
    * Creates a temporary file.
    * @param file
    * @return
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
new file mode 100644
index 0000000..5366c27
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.utils.MetadataStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
+
+/**
+ * Class to run integrity checks on Datanode Containers.
+ * Provide infra for Data Scrubbing
+ */
+
+public class KeyValueContainerCheck {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Container.class);
+
+  private long containerID;
+  private KeyValueContainerData inMemContainerData; //from caller, maybe null
+  private KeyValueContainerData onDiskContainerData; //loaded from fs/disk
+  private Configuration checkConfig;
+
+  private String metadataPath;
+
+  public KeyValueContainerCheck(String metadataPath, Configuration conf,
+      long containerID, KeyValueContainerData containerData) {
+    Preconditions.checkArgument(metadataPath != null);
+
+    this.checkConfig = conf;
+    this.containerID = containerID;
+    this.onDiskContainerData = null;
+    this.inMemContainerData = containerData;
+    this.metadataPath = metadataPath;
+  }
+
+  /**
+   * fast checks are basic and do not look inside the metadata files.
+   * Or into the structures on disk. These checks can be done on Open
+   * containers as well without concurrency implications
+   * Checks :
+   * 1. check directory layout
+   * 2. check container file
+   *
+   * @return void
+   */
+
+  public KvCheckError fastCheck() {
+
+    KvCheckError error;
+    LOG.trace("Running fast check for container {};", containerID);
+
+    error = loadContainerData();
+    if (error != KvCheckError.ERROR_NONE) {
+      return error;
+    }
+
+    error = checkLayout();
+    if (error != KvCheckError.ERROR_NONE) {
+      return error;
+    }
+
+    error = checkContainerFile();
+
+    return error;
+  }
+
+  /**
+   * full checks comprise scanning all metadata inside the container.
+   * Including the KV database. These checks are intrusive, consume more
+   * resources compared to fast checks and should only be done on Closed
+   * or Quasi-closed Containers. Concurrency being limited to delete
+   * workflows.
+   * <p>
+   * fullCheck is a superset of fastCheck
+   *
+   * @return void
+   */
+  public KvCheckError fullCheck() {
+    /**
+
+     */
+    KvCheckError error;
+
+    error = fastCheck();
+    if (error != KvCheckError.ERROR_NONE) {
+
+      LOG.trace("fastCheck failed, aborting full check for Container {}",
+          containerID);
+      return error;
+    }
+
+    error = checkBlockDB();
+
+    return error;
+  }
+
+  /**
+   * Check the integrity of the directory structure of the container.
+   *
+   * @return error code or ERROR_NONE
+   */
+  private KvCheckError checkLayout() {
+    boolean success;
+    KvCheckError error = KvCheckError.ERROR_NONE;
+
+    // is metadataPath accessible as a directory?
+    try {
+      checkDirPath(metadataPath);
+    } catch (IOException ie) {
+      error = KvCheckError.METADATA_PATH_ACCESS;
+      handleCorruption(ie.getMessage(), error, ie);
+      return error;
+    }
+
+    String chunksPath = onDiskContainerData.getChunksPath();
+    // is chunksPath accessible as a directory?
+    try {
+      checkDirPath(chunksPath);
+    } catch (IOException ie) {
+      error = KvCheckError.CHUNKS_PATH_ACCESS;
+      handleCorruption(ie.getMessage(), error, ie);
+      return error;
+    }
+
+    return error;
+  }
+
+  private void checkDirPath(String path) throws IOException {
+
+    File dirPath = new File(path);
+    String errStr = null;
+    boolean success = true;
+
+    try {
+      if (!dirPath.isDirectory()) {
+        success = false;
+        errStr = "Not a directory [" + path + "]";
+      }
+    } catch (SecurityException se) {
+      throw new IOException("Security exception checking dir ["
+          + path + "]", se);
+    } catch (Exception e) {
+      throw new IOException("Generic exception checking dir ["
+          + path + "]", e);
+    }
+
+    try {
+      String[] ls = dirPath.list();
+      if (ls == null) {
+        // null result implies operation failed
+        success = false;
+        errStr = "null listing for directory [" + path + "]";
+      }
+    } catch (Exception e) {
+      throw new IOException("Exception listing dir [" + path + "]", e);
+    }
+
+    if (!success) {
+      Preconditions.checkState(errStr != null);
+      throw new IOException(errStr);
+    }
+  }
+
+  private KvCheckError checkContainerFile() {
+    /**
+     * compare the values in the container file loaded from disk,
+     * with the values we are expecting
+     */
+    KvCheckError error = KvCheckError.ERROR_NONE;
+    String dbType;
+    Preconditions
+        .checkState(onDiskContainerData != null, "Container File not loaded");
+    KvCheckAction next;
+
+    try {
+      ContainerUtils.verifyChecksum(onDiskContainerData);
+    } catch (Exception e) {
+      error = KvCheckError.CONTAINERDATA_CKSUM;
+      handleCorruption("Container File Checksum mismatch", error, e);
+      return error;
+    }
+
+    if (onDiskContainerData.getContainerType()
+        != ContainerProtos.ContainerType.KeyValueContainer) {
+      String errStr = "Bad Container type in Containerdata for " + containerID;
+      error = KvCheckError.CONTAINERDATA_TYPE;
+      handleCorruption(errStr, error, null);
+      return error; // Abort if we do not know the type of Container
+    }
+
+    if (onDiskContainerData.getContainerID() != containerID) {
+      String errStr =
+          "Bad ContainerID field in Containerdata for " + containerID;
+      error = KvCheckError.CONTAINERDATA_ID;
+      next = handleCorruption(errStr, error, null);
+      if (next == KvCheckAction.ABORT) {
+        return error;
+      } // else continue checking other data elements
+    }
+
+    dbType = onDiskContainerData.getContainerDBType();
+    if (!dbType.equals(OZONE_METADATA_STORE_IMPL_ROCKSDB) &&
+        !dbType.equals(OZONE_METADATA_STORE_IMPL_LEVELDB)) {
+      String errStr = "Unknown DBType [" + dbType
+          + "] in Container File for  [" + containerID + "]";
+      error = KvCheckError.CONTAINERDATA_DBTYPE;
+      handleCorruption(errStr, error, null);
+      return error;
+    }
+
+    KeyValueContainerData kvData = onDiskContainerData;
+    if (!metadataPath.toString().equals(kvData.getMetadataPath())) {
+      String errStr =
+          "Bad metadata path in Containerdata for " + containerID + "Expected ["
+              + metadataPath.toString() + "] Got [" + kvData.getMetadataPath()
+              + "]";
+      error = KvCheckError.CONTAINERDATA_METADATA_PATH;
+      next = handleCorruption(errStr, error, null);
+      if (next == KvCheckAction.ABORT) {
+        return error;
+      }
+    }
+
+    return error;
+  }
+
+  private KvCheckError checkBlockDB() {
+    /**
+     * Check the integrity of the DB inside each container.
+     * In Scope:
+     * 1. iterate over each key (Block) and locate the chunks for the block
+     * 2. garbage detection : chunks which exist in the filesystem,
+     *    but not in the DB. This function is implemented as HDDS-1202
+     * Not in scope:
+     * 1. chunk checksum verification. this is left to a separate
+     * slow chunk scanner
+     */
+    KvCheckError error;
+    Preconditions.checkState(onDiskContainerData != null,
+        "invoke loadContainerData prior to calling this function");
+    File dbFile;
+    File metaDir = new File(metadataPath);
+
+    try {
+      dbFile = KeyValueContainerLocationUtil
+          .getContainerDBFile(metaDir, containerID);
+
+      if (!dbFile.exists() || !dbFile.canRead()) {
+
+        String dbFileErrorMsg = "Unable to access DB File [" + dbFile.toString()
+            + "] for Container [" + containerID + "] metadata path ["
+            + metadataPath + "]";
+        error = KvCheckError.DB_ACCESS;
+        handleCorruption(dbFileErrorMsg, error, null);
+        return error;
+      }
+    } catch (Exception e) {
+      String dbFileErrorMessage =
+          "Exception when initializing DBFile" + "with metadatapath ["
+              + metadataPath + "] for Container [" + containerID
+              + "]";
+      error = KvCheckError.DB_ACCESS;
+      handleCorruption(dbFileErrorMessage, error, e);
+      return error;
+    }
+    onDiskContainerData.setDbFile(dbFile);
+
+    try {
+      MetadataStore db = BlockUtils
+          .getDB(onDiskContainerData, checkConfig);
+      error = iterateBlockDB(db);
+    } catch (Exception e) {
+      error = KvCheckError.DB_ITERATOR;
+      handleCorruption("Block DB Iterator aborted", error, e);
+      return error;
+    }
+
+    return error;
+  }
+
+  private KvCheckError iterateBlockDB(MetadataStore db)
+      throws IOException {
+    KvCheckError error = KvCheckError.ERROR_NONE;
+    Preconditions.checkState(db != null);
+
+    // get "normal" keys from the Block DB
+    KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
+        new File(onDiskContainerData.getContainerPath()));
+
+    // ensure there is a chunk file for each key in the DB
+    while (kvIter.hasNext()) {
+      BlockData block = kvIter.nextBlock();
+
+      List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
+      for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
+        File chunkFile;
+        try {
+          chunkFile = ChunkUtils
+              .getChunkFile(onDiskContainerData,
+                  ChunkInfo.getFromProtoBuf(chunk));
+        } catch (Exception e) {
+          error = KvCheckError.MISSING_CHUNK_FILE;
+          handleCorruption("Unable to access chunk path", error, e);
+          return error;
+        }
+
+        if (!chunkFile.exists()) {
+          error = KvCheckError.MISSING_CHUNK_FILE;
+
+          // concurrent mutation in Block DB? lookup the block again.
+          byte[] bdata = db.get(
+              Longs.toByteArray(block.getBlockID().getLocalID()));
+          if (bdata == null) {
+            LOG.trace("concurrency with delete, ignoring deleted block");
+            error = KvCheckError.ERROR_NONE;
+            break; // skip to next block from kvIter
+          } else {
+            handleCorruption("Missing chunk file", error, null);
+            return error;
+          }
+        }
+      }
+    }
+
+    return error;
+  }
+
+  private KvCheckError loadContainerData() {
+    KvCheckError error = KvCheckError.ERROR_NONE;
+
+    File containerFile = KeyValueContainer
+        .getContainerFile(metadataPath.toString(), containerID);
+
+    try {
+      onDiskContainerData = (KeyValueContainerData) ContainerDataYaml
+          .readContainerFile(containerFile);
+    } catch (IOException e) {
+      error = KvCheckError.FILE_LOAD;
+      handleCorruption("Unable to load Container File", error, e);
+    }
+
+    return error;
+  }
+
+  private KvCheckAction handleCorruption(String reason,
+      KvCheckError error, Exception e) {
+
+    // XXX HDDS-1201 need to implement corruption handling/reporting
+
+    String errStr =
+        "Corruption detected in container: [" + containerID + "] reason: ["
+            + reason + "] error code: [" + error + "]";
+    String logMessage = null;
+
+    StackTraceElement[] stackeElems = Thread.currentThread().getStackTrace();
+    String caller =
+        "Corruption reported from Source File: [" + stackeElems[2].getFileName()
+            + "] Line: [" + stackeElems[2].getLineNumber() + "]";
+
+    if (e != null) {
+      logMessage = errStr + " exception: [" + e.getMessage() + "]";
+      e.printStackTrace();
+    } else {
+      logMessage = errStr;
+    }
+
+    LOG.error(caller);
+    LOG.error(logMessage);
+
+    return KvCheckAction.ABORT;
+  }
+
+  /**
+   * Pre-defined error codes for Container Metadata check.
+   */
+  public enum KvCheckError {
+    ERROR_NONE,
+    FILE_LOAD, // unable to load container metafile
+    METADATA_PATH_ACCESS, // metadata path is not accessible
+    CHUNKS_PATH_ACCESS, // chunks path is not accessible
+    CONTAINERDATA_ID, // bad Container-ID stored in Container file
+    CONTAINERDATA_METADATA_PATH, // bad metadata path in Container file
+    CONTAINERDATA_CHUNKS_PATH, // bad chunks path in Container file
+    CONTAINERDATA_CKSUM, // container file checksum mismatch
+    CONTAINERDATA_TYPE, // container file incorrect type of Container
+    CONTAINERDATA_DBTYPE, // unknown DB Type specified in Container File
+    DB_ACCESS, // unable to load Metastore DB
+    DB_ITERATOR, // unable to create block iterator for Metastore DB
+    MISSING_CHUNK_FILE // chunk file not found
+  }
+
+  private enum KvCheckAction {
+    CONTINUE, // Continue with remaining checks on the corrupt Container
+    ABORT     // Abort checks for the container
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java
new file mode 100644
index 0000000..dea7323
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.net.ntp.TimeStamp;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+/**
+ * Background Metadata scrubbing for Ozone Containers.
+ * Future scope : data(chunks) checksum verification.
+ */
+public class ContainerScrubber implements Runnable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerScrubber.class);
+  private final ContainerSet containerSet;
+  private final OzoneConfiguration config;
+  private final long timePerContainer = 10000; // 10 sec in millis
+  private boolean halt;
+  private Thread scrubThread;
+
+  public ContainerScrubber(ContainerSet cSet, OzoneConfiguration conf) {
+    Preconditions.checkNotNull(cSet,
+        "ContainerScrubber received a null ContainerSet");
+    Preconditions.checkNotNull(conf);
+    this.containerSet = cSet;
+    this.config = conf;
+    this.halt = false;
+    this.scrubThread = null;
+  }
+
+  @Override public void run() {
+    /**
+     * the outer daemon loop exits on down()
+     */
+    LOG.info("Background ContainerScrubber starting up");
+    while (true) {
+
+      scrub();
+
+      if (this.halt) {
+        break; // stop and exit if requested
+      }
+
+      try {
+        Thread.sleep(300000); /* 5 min between scans */
+      } catch (InterruptedException e) {
+        LOG.info("Background ContainerScrubber interrupted. Going to exit");
+      }
+    }
+  }
+
+  /**
+   * Start the scrub scanner thread.
+   */
+  public void up() {
+
+    this.halt = false;
+    if (this.scrubThread == null) {
+      this.scrubThread = new Thread(this);
+      scrubThread.start();
+    } else {
+      LOG.info("Scrubber up called multiple times. Scrub thread already up.");
+    }
+  }
+
+  /**
+   * Stop the scrub scanner thread. Wait for thread to exit
+   */
+  public void down() {
+
+    this.halt = true;
+    if (scrubThread == null) {
+      LOG.info("Scrubber down invoked, but scrub thread is not running");
+      return;
+    }
+
+    this.scrubThread.interrupt();
+    try {
+      this.scrubThread.join();
+    } catch (Exception e) {
+      LOG.warn("Exception when waiting for Container Scrubber thread ", e);
+    } finally {
+      this.scrubThread = null;
+    }
+  }
+
+  /**
+   * Current implementation : fixed rate scrub, no feedback loop.
+   * Dynamic throttling based on system load monitoring to be
+   * implemented later as jira [XXX]
+   *
+   * @param startTime
+   */
+  private void throttleScrubber(TimeStamp startTime) {
+    TimeStamp endTime = new TimeStamp(System.currentTimeMillis());
+    long timeTaken = endTime.getTime() - startTime.getTime();
+
+    if (timeTaken < timePerContainer) {
+      try {
+        Thread.sleep(timePerContainer - timeTaken);
+      } catch (InterruptedException e) {
+        LOG.debug("Ignoring interrupted sleep inside throttle");
+      }
+    }
+  }
+
+  private void scrub() {
+
+    Iterator<Container> containerIt = containerSet.getContainerIterator();
+    long count = 0;
+
+    while (containerIt.hasNext()) {
+      TimeStamp startTime = new TimeStamp(System.currentTimeMillis());
+      Container container = containerIt.next();
+
+      if (this.halt) {
+        break; // stop if requested
+      }
+
+      try {
+        container.check();
+      } catch (StorageContainerException e) {
+        LOG.error("Error unexpected exception {} for Container {}", e,
+            container.getContainerData().getContainerID());
+        // XXX Action required here
+      }
+      count++;
+
+      throttleScrubber(startTime);
+    }
+
+    LOG.debug("iterator ran integrity checks on {} containers", count);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 92d76ef..3bc060a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto
@@ -69,6 +70,7 @@ public class OzoneContainer {
   private final XceiverServerSpi writeChannel;
   private final XceiverServerSpi readChannel;
   private final ContainerController controller;
+  private ContainerScrubber scrubber;
 
   /**
    * Construct OzoneContainer object.
@@ -82,6 +84,8 @@ public class OzoneContainer {
     this.config = conf;
     this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
     this.containerSet = new ContainerSet();
+    this.scrubber = null;
+
     buildContainerSet();
     final ContainerMetrics metrics = ContainerMetrics.create(conf);
     this.handlers = Maps.newHashMap();
@@ -139,6 +143,34 @@ public class OzoneContainer {
 
   }
 
+
+  /**
+   * Start background daemon thread for performing container integrity checks.
+   */
+  private void startContainerScrub() {
+    boolean enabled = config.getBoolean(
+        HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED,
+        HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT);
+
+    if (!enabled) {
+      LOG.info("Background container scrubber has been disabled by {}",
+              HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED);
+    } else {
+      this.scrubber = new ContainerScrubber(containerSet, config);
+      scrubber.up();
+    }
+  }
+
+  /**
+   * Stop the scanner thread and wait for thread to die.
+   */
+  private void stopContainerScrub() {
+    if (scrubber == null) {
+      return;
+    }
+    scrubber.down();
+  }
+
   /**
    * Starts serving requests to ozone container.
    *
@@ -146,6 +178,7 @@ public class OzoneContainer {
    */
   public void start() throws IOException {
     LOG.info("Attempting to start container services.");
+    startContainerScrub();
     writeChannel.start();
     readChannel.start();
     hddsDispatcher.init();
@@ -157,6 +190,7 @@ public class OzoneContainer {
   public void stop() {
     //TODO: at end of container IO integration work.
     LOG.info("Attempting to stop container services.");
+    stopContainerScrub();
     writeChannel.stop();
     readChannel.stop();
     hddsDispatcher.shutdown();
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
new file mode 100644
index 0000000..f1b8fe7
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
+import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.utils.MetadataStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Basic sanity test for the KeyValueContainerCheck class.
+ */
+@RunWith(Parameterized.class) public class TestKeyValueContainerCheck {
+  private final String storeImpl;
+  private KeyValueContainer container;
+  private KeyValueContainerData containerData;
+  private ChunkManagerImpl chunkManager;
+  private VolumeSet volumeSet;
+  private Configuration conf;
+  private File testRoot;
+
+  public TestKeyValueContainerCheck(String metadataImpl) {
+    this.storeImpl = metadataImpl;
+  }
+
+  @Parameterized.Parameters public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {{OZONE_METADATA_STORE_IMPL_LEVELDB},
+        {OZONE_METADATA_STORE_IMPL_ROCKSDB}});
+  }
+
+  @Before public void setUp() throws Exception {
+    this.testRoot = GenericTestUtils.getRandomizedTestDir();
+    conf = new OzoneConfiguration();
+    conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
+    conf.set(OZONE_METADATA_STORE_IMPL, storeImpl);
+    volumeSet = new VolumeSet(UUID.randomUUID().toString(), conf);
+  }
+
+  @After public void teardown() {
+    volumeSet.shutdown();
+    FileUtil.fullyDelete(testRoot);
+  }
+
+  /**
+   * Sanity test, when there are no corruptions induced.
+   * @throws Exception
+   */
+  @Test public void testKeyValueContainerCheckNoCorruption() throws Exception {
+    long containerID = 101;
+    int deletedBlocks = 1;
+    int normalBlocks = 3;
+    int chunksPerBlock = 4;
+    KeyValueContainerCheck.KvCheckError error;
+
+    // test Closed Container
+    createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536,
+        chunksPerBlock);
+    File chunksPath = new File(containerData.getChunksPath());
+    assertTrue(chunksPath.listFiles().length
+        == (deletedBlocks + normalBlocks) * chunksPerBlock);
+
+    KeyValueContainerCheck kvCheck =
+        new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
+            containerID, containerData);
+
+    // first run checks on a Open Container
+    error = kvCheck.fastCheck();
+    assertTrue(error == KeyValueContainerCheck.KvCheckError.ERROR_NONE);
+
+    container.close();
+
+    // next run checks on a Closed Container
+    error = kvCheck.fullCheck();
+    assertTrue(error == KeyValueContainerCheck.KvCheckError.ERROR_NONE);
+  }
+
+  /**
+   * Creates a container with normal and deleted blocks.
+   * First it will insert normal blocks, and then it will insert
+   * deleted blocks.
+   * @param containerId
+   * @param normalBlocks
+   * @param deletedBlocks
+   * @throws Exception
+   */
+  private void createContainerWithBlocks(long containerId, int normalBlocks,
+      int deletedBlocks, long chunkLen, int chunksPerBlock) throws Exception {
+    long chunkCount;
+    String strBlock = "block";
+    String strChunk = "-chunkFile";
+    byte[] chunkData = new byte[(int) chunkLen];
+
+    containerData = new KeyValueContainerData(containerId,
+        (long) StorageUnit.MB.toBytes(chunksPerBlock * chunkLen),
+        UUID.randomUUID().toString(), UUID.randomUUID().toString());
+    container = new KeyValueContainer(containerData, conf);
+    container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
+        UUID.randomUUID().toString());
+    MetadataStore metadataStore = BlockUtils.getDB(containerData, conf);
+    chunkManager = new ChunkManagerImpl(true);
+
+    assertTrue(containerData.getChunksPath() != null);
+    File chunksPath = new File(containerData.getChunksPath());
+    assertTrue(chunksPath.exists());
+    // Initially chunks folder should be empty.
+    assertTrue(chunksPath.listFiles().length == 0);
+
+    List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
+    for (int i = 0; i < (normalBlocks + deletedBlocks); i++) {
+      BlockID blockID = new BlockID(containerId, i);
+      BlockData blockData = new BlockData(blockID);
+
+      chunkList.clear();
+      for (chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
+        String chunkName = strBlock + i + strChunk + chunkCount;
+        long offset = chunkCount * chunkLen;
+        ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen);
+        chunkList.add(info.getProtoBufMessage());
+        chunkManager
+            .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
+                new DispatcherContext.Builder()
+                    .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
+                    .build());
+        chunkManager
+            .writeChunk(container, blockID, info, ByteBuffer.wrap(chunkData),
+                new DispatcherContext.Builder()
+                    .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
+                    .build());
+      }
+      blockData.setChunks(chunkList);
+
+      if (i >= normalBlocks) {
+        // deleted key
+        metadataStore.put(DFSUtil.string2Bytes(
+            OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID()),
+            blockData.getProtoBufMessage().toByteArray());
+      } else {
+        // normal key
+        metadataStore.put(Longs.toByteArray(blockID.getLocalID()),
+            blockData.getProtoBufMessage().toByteArray());
+      }
+    }
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org