You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/12/05 18:47:20 UTC

[02/29] hadoop git commit: HDFS-11149. Support for parallel checking of FsVolumes.

HDFS-11149. Support for parallel checking of FsVolumes.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eaaa3295
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eaaa3295
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eaaa3295

Branch: refs/heads/YARN-5734
Commit: eaaa32950cbae42a74e28e3db3f0cdb1ff158119
Parents: 8f6e143
Author: Arpit Agarwal <ar...@apache.org>
Authored: Tue Nov 29 20:31:02 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Tue Nov 29 20:31:02 2016 -0800

----------------------------------------------------------------------
 .../datanode/checker/DatasetVolumeChecker.java  | 442 +++++++++++++++++++
 .../server/datanode/fsdataset/FsDatasetSpi.java |   7 +
 .../server/datanode/fsdataset/FsVolumeSpi.java  |  12 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  15 +-
 .../src/main/resources/hdfs-default.xml         |  10 +-
 .../server/datanode/SimulatedFSDataset.java     |   7 +
 .../server/datanode/TestDirectoryScanner.java   |   7 +
 .../checker/TestDatasetVolumeChecker.java       | 261 +++++++++++
 .../TestDatasetVolumeCheckerFailures.java       | 193 ++++++++
 .../datanode/extdataset/ExternalVolumeImpl.java |   7 +
 10 files changed, 953 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
new file mode 100644
index 0000000..8a57812
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.nio.channels.ClosedChannelException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
+
+/**
+ * A class that encapsulates running disk checks against each volume of an
+ * {@link FsDatasetSpi} and allows retrieving a list of failed volumes.
+ *
+ * This splits out behavior that was originally implemented across
+ * DataNode, FsDatasetImpl and FsVolumeList.
+ */
+public class DatasetVolumeChecker {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DatasetVolumeChecker.class);
+
+  private AsyncChecker<VolumeCheckContext, VolumeCheckResult> delegateChecker;
+
+  private final AtomicLong numVolumeChecks = new AtomicLong(0);
+  private final AtomicLong numSyncDatasetChecks = new AtomicLong(0);
+  private final AtomicLong numAsyncDatasetChecks = new AtomicLong(0);
+  private final AtomicLong numSkippedChecks = new AtomicLong(0);
+
+  /**
+   * Max allowed time for a disk check in milliseconds. If the check
+   * doesn't complete within this time we declare the disk as dead.
+   */
+  private final long maxAllowedTimeForCheckMs;
+
+  /**
+   * Maximum number of volume failures that can be tolerated without
+   * declaring a fatal error.
+   */
+  private final int maxVolumeFailuresTolerated;
+
+  /**
+   * Minimum time between two successive disk checks of a volume.
+   */
+  private final long minDiskCheckGapMs;
+
+  /**
+   * Timestamp of the last check of all volumes.
+   */
+  private long lastAllVolumesCheck;
+
+  private final Timer timer;
+
+  private static final VolumeCheckContext IGNORED_CONTEXT =
+      new VolumeCheckContext();
+
+  /**
+   * @param conf Configuration object.
+   * @param timer {@link Timer} object used for throttling checks.
+   */
+  public DatasetVolumeChecker(Configuration conf, Timer timer)
+      throws DiskErrorException {
+    maxAllowedTimeForCheckMs = conf.getTimeDuration(
+        DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+        DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    if (maxAllowedTimeForCheckMs <= 0) {
+      throw new DiskErrorException("Invalid value configured for "
+          + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+          + maxAllowedTimeForCheckMs + " (should be > 0)");
+    }
+
+    this.timer = timer;
+
+    maxVolumeFailuresTolerated = conf.getInt(
+        DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
+        DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
+
+    minDiskCheckGapMs = conf.getTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    if (minDiskCheckGapMs < 0) {
+      throw new DiskErrorException("Invalid value configured for "
+          + DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - "
+          + minDiskCheckGapMs + " (should be >= 0)");
+    }
+
+    lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
+
+    if (maxVolumeFailuresTolerated < 0) {
+      throw new DiskErrorException("Invalid value configured for "
+          + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+          + maxVolumeFailuresTolerated + " (should be non-negative)");
+    }
+
+    delegateChecker = new ThrottledAsyncChecker<>(
+        timer, minDiskCheckGapMs, Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataNode DiskChecker thread %d")
+                .setDaemon(true)
+                .build()));
+  }
+
+  /**
+   * Run checks against all volumes of a dataset.
+   *
+   * This check may be performed at service startup and subsequently at
+   * regular intervals to detect and handle failed volumes.
+   *
+   * @param dataset - FsDatasetSpi to be checked.
+   * @return set of failed volumes.
+   */
+  public Set<StorageLocation> checkAllVolumes(
+      final FsDatasetSpi<? extends FsVolumeSpi> dataset)
+      throws InterruptedException {
+
+    if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+      numSkippedChecks.incrementAndGet();
+      return Collections.emptySet();
+    }
+
+    lastAllVolumesCheck = timer.monotonicNow();
+    final Set<StorageLocation> healthyVolumes = new HashSet<>();
+    final Set<StorageLocation> failedVolumes = new HashSet<>();
+    final Set<StorageLocation> allVolumes = new HashSet<>();
+
+    final FsDatasetSpi.FsVolumeReferences references =
+        dataset.getFsVolumeReferences();
+    final CountDownLatch resultsLatch = new CountDownLatch(references.size());
+
+    for (int i = 0; i < references.size(); ++i) {
+      final FsVolumeReference reference = references.getReference(i);
+      allVolumes.add(reference.getVolume().getStorageLocation());
+      ListenableFuture<VolumeCheckResult> future =
+          delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
+      LOG.info("Scheduled health check for volume {}", reference.getVolume());
+      Futures.addCallback(future, new ResultHandler(
+          reference, healthyVolumes, failedVolumes, resultsLatch, null));
+    }
+
+    // Wait until our timeout elapses, after which we give up on
+    // the remaining volumes.
+    if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
+      LOG.warn("checkAllVolumes timed out after {} ms" +
+          maxAllowedTimeForCheckMs);
+    }
+
+    numSyncDatasetChecks.incrementAndGet();
+    synchronized (this) {
+      // All volumes that have not been detected as healthy should be
+      // considered failed. This is a superset of 'failedVolumes'.
+      //
+      // Make a copy under the mutex as Sets.difference() returns a view
+      // of a potentially changing set.
+      return new HashSet<>(Sets.difference(allVolumes, healthyVolumes));
+    }
+  }
+
+  /**
+   * Start checks against all volumes of a dataset, invoking the
+   * given callback when the operation has completed. The function
+   * does not wait for the checks to complete.
+   *
+   * If a volume cannot be referenced then it is already closed and
+   * cannot be checked. No error is propagated to the callback for that
+   * volume.
+   *
+   * @param dataset - FsDatasetSpi to be checked.
+   * @param callback - Callback to be invoked when the checks are complete.
+   * @return true if the check was scheduled and the callback will be invoked.
+   *         false if the check was not scheduled and the callback will not be
+   *         invoked.
+   */
+  public boolean checkAllVolumesAsync(
+      final FsDatasetSpi<? extends FsVolumeSpi> dataset,
+      Callback callback) {
+
+    if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+      numSkippedChecks.incrementAndGet();
+      return false;
+    }
+
+    lastAllVolumesCheck = timer.monotonicNow();
+    final Set<StorageLocation> healthyVolumes = new HashSet<>();
+    final Set<StorageLocation> failedVolumes = new HashSet<>();
+    final FsDatasetSpi.FsVolumeReferences references =
+        dataset.getFsVolumeReferences();
+    final CountDownLatch latch = new CountDownLatch(references.size());
+
+    LOG.info("Checking {} volumes", references.size());
+    for (int i = 0; i < references.size(); ++i) {
+      final FsVolumeReference reference = references.getReference(i);
+      // The context parameter is currently ignored.
+      ListenableFuture<VolumeCheckResult> future =
+          delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
+      Futures.addCallback(future, new ResultHandler(
+          reference, healthyVolumes, failedVolumes, latch, callback));
+    }
+    numAsyncDatasetChecks.incrementAndGet();
+    return true;
+  }
+
+  /**
+   * A callback interface that is supplied the result of running an
+   * async disk check on multiple volumes.
+   */
+  public interface Callback {
+    /**
+     * @param healthyVolumes set of volumes that passed disk checks.
+     * @param failedVolumes set of volumes that failed disk checks.
+     */
+    void call(Set<StorageLocation> healthyVolumes,
+              Set<StorageLocation> failedVolumes);
+  }
+
+  /**
+   * Check a single volume, returning a {@link ListenableFuture}
+   * that can be used to retrieve the final result.
+   *
+   * If the volume cannot be referenced then it is already closed and
+   * cannot be checked. No error is propagated to the callback.
+   *
+   * @param volume the volume that is to be checked.
+   * @param callback callback to be invoked when the volume check completes.
+   */
+  public void checkVolume(
+      final FsVolumeSpi volume,
+      Callback callback) {
+    FsVolumeReference volumeReference;
+    try {
+      volumeReference = volume.obtainReference();
+    } catch (ClosedChannelException e) {
+      // The volume has already been closed.
+      callback.call(new HashSet<>(), new HashSet<>());
+      return;
+    }
+    ListenableFuture<VolumeCheckResult> future =
+        delegateChecker.schedule(volume, IGNORED_CONTEXT);
+    numVolumeChecks.incrementAndGet();
+    Futures.addCallback(future, new ResultHandler(
+        volumeReference, new HashSet<>(), new HashSet<>(),
+        new CountDownLatch(1), callback));
+  }
+
+  /**
+   * A callback to process the results of checking a volume.
+   */
+  private class ResultHandler
+      implements FutureCallback<VolumeCheckResult> {
+    private final FsVolumeReference reference;
+    private final Set<StorageLocation> failedVolumes;
+    private final Set<StorageLocation> healthyVolumes;
+    private final CountDownLatch latch;
+    private final AtomicLong numVolumes;
+
+    @Nullable
+    private final Callback callback;
+
+    ResultHandler(FsVolumeReference reference,
+                  Set<StorageLocation> healthyVolumes,
+                  Set<StorageLocation> failedVolumes,
+                  CountDownLatch latch,
+                  @Nullable Callback callback) {
+      Preconditions.checkState(reference != null);
+      this.reference = reference;
+      this.healthyVolumes = healthyVolumes;
+      this.failedVolumes = failedVolumes;
+      this.latch = latch;
+      this.callback = callback;
+      numVolumes = new AtomicLong(latch.getCount());
+    }
+
+    @Override
+    public void onSuccess(@Nonnull VolumeCheckResult result) {
+      switch(result) {
+      case HEALTHY:
+      case DEGRADED:
+        LOG.debug("Volume {} is {}.", reference.getVolume(), result);
+        markHealthy();
+        break;
+      case FAILED:
+        LOG.warn("Volume {} detected as being unhealthy",
+            reference.getVolume());
+        markFailed();
+        break;
+      default:
+        LOG.error("Unexpected health check result {} for volume {}",
+            result, reference.getVolume());
+        markHealthy();
+        break;
+      }
+      cleanup();
+    }
+
+    @Override
+    public void onFailure(@Nonnull Throwable t) {
+      Throwable exception = (t instanceof ExecutionException) ?
+          t.getCause() : t;
+      LOG.warn("Exception running disk checks against volume " +
+          reference.getVolume(), exception);
+      markFailed();
+      cleanup();
+    }
+
+    private void markHealthy() {
+      synchronized (DatasetVolumeChecker.this) {
+        healthyVolumes.add(reference.getVolume().getStorageLocation());
+      }
+    }
+
+    private void markFailed() {
+      synchronized (DatasetVolumeChecker.this) {
+        failedVolumes.add(reference.getVolume().getStorageLocation());
+      }
+    }
+
+    private void cleanup() {
+      IOUtils.cleanup(null, reference);
+      invokeCallback();
+    }
+
+    private void invokeCallback() {
+      try {
+        latch.countDown();
+
+        if (numVolumes.decrementAndGet() == 0 &&
+            callback != null) {
+          callback.call(healthyVolumes, failedVolumes);
+        }
+      } catch(Exception e) {
+        // Propagating this exception is unlikely to be helpful.
+        LOG.warn("Unexpected exception", e);
+      }
+    }
+  }
+
+  /**
+   * Shutdown the checker and its associated ExecutorService.
+   *
+   * See {@link ExecutorService#awaitTermination} for the interpretation
+   * of the parameters.
+   */
+  public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
+    try {
+      delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
+    } catch (InterruptedException e) {
+      LOG.warn("DatasetVolumeChecker interrupted during shutdown.");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * This method is for testing only.
+   *
+   * @param testDelegate
+   */
+  @VisibleForTesting
+  void setDelegateChecker(
+      AsyncChecker<VolumeCheckContext, VolumeCheckResult> testDelegate) {
+    delegateChecker = testDelegate;
+  }
+
+  /**
+   * Return the number of {@link #checkVolume} invocations.
+   */
+  public long getNumVolumeChecks() {
+    return numVolumeChecks.get();
+  }
+
+  /**
+   * Return the number of {@link #checkAllVolumes} invocations.
+   */
+  public long getNumSyncDatasetChecks() {
+    return numSyncDatasetChecks.get();
+  }
+
+  /**
+   * Return the number of {@link #checkAllVolumesAsync} invocations.
+   */
+  public long getNumAsyncDatasetChecks() {
+    return numAsyncDatasetChecks.get();
+  }
+
+  /**
+   * Return the number of checks skipped because the minimum gap since the
+   * last check had not elapsed.
+   */
+  public long getNumSkippedChecks() {
+    return numSkippedChecks.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 547392f..57ec2b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -159,6 +159,13 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
       return references.get(index).getVolume();
     }
 
+    /**
+     * Get the reference for a given index.
+     */
+    public FsVolumeReference getReference(int index) {
+      return references.get(index);
+    }
+
     @Override
     public void close() throws IOException {
       IOException ioe = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index dbba31d..a11a207 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -34,11 +34,15 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 
 /**
  * This is an interface for the underlying volume.
  */
-public interface FsVolumeSpi {
+public interface FsVolumeSpi
+    extends Checkable<FsVolumeSpi.VolumeCheckContext, VolumeCheckResult> {
+
   /**
    * Obtain a reference object that had increased 1 reference count of the
    * volume.
@@ -408,4 +412,10 @@ public interface FsVolumeSpi {
   LinkedList<ScanInfo> compileReport(String bpid,
       LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
       throws InterruptedException, IOException;
+
+  /**
+   * Context for the {@link #check} call.
+   */
+  class VolumeCheckContext {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 5880b3e..a231e03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -69,7 +71,6 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.BlockDirFilter;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
@@ -914,7 +915,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   @Override
-  public FsDatasetSpi getDataset() {
+  public FsDatasetSpi<? extends FsVolumeSpi> getDataset() {
     return dataset;
   }
 
@@ -962,6 +963,16 @@ public class FsVolumeImpl implements FsVolumeSpi {
       s.checkDirs();
     }
   }
+
+  @Override
+  public VolumeCheckResult check(VolumeCheckContext ignored)
+      throws DiskErrorException {
+    // TODO:FEDERATION valid synchronization
+    for(BlockPoolSlice s : bpSlices.values()) {
+      s.checkDirs();
+    }
+    return VolumeCheckResult.HEALTHY;
+  }
     
   void getVolumeMap(ReplicaMap volumeMap,
                     final RamDiskReplicaTracker ramDiskReplicaMap)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index c9d74bb..671c98c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4376,11 +4376,11 @@
     <name>dfs.datanode.disk.check.timeout</name>
     <value>10m</value>
     <description>
-      Maximum allowed time for a disk check to complete. If the check does
-      not complete within this time interval then the disk is declared as
-      failed. This setting supports multiple time unit suffixes as described
-      in dfs.heartbeat.interval. If no suffix is specified then milliseconds
-      is assumed.
+      Maximum allowed time for a disk check to complete during DataNode
+      startup. If the check does not complete within this time interval
+      then the disk is declared as failed. This setting supports
+      multiple time unit suffixes as described in dfs.heartbeat.interval.
+      If no suffix is specified then milliseconds is assumed.
     </description>
   </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 8e6191a..5d63d07 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -42,6 +42,7 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -557,6 +558,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         throws InterruptedException, IOException {
       return null;
     }
+
+    @Override
+    public VolumeCheckResult check(VolumeCheckContext context)
+        throws Exception {
+      return VolumeCheckResult.HEALTHY;
+    }
   }
 
   private final Map<String, Map<Block, BInfo>> blockMap

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index d05e2a7..f08b579 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -897,6 +898,12 @@ public class TestDirectoryScanner {
       return null;
     }
 
+
+    @Override
+    public VolumeCheckResult check(VolumeCheckContext context)
+        throws Exception {
+      return VolumeCheckResult.HEALTHY;
+    }
   }
 
   private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
new file mode 100644
index 0000000..fa809d1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests for {@link DatasetVolumeChecker} when the {@link FsVolumeSpi#check}
+ * method returns different values of {@link VolumeCheckResult}.
+ */
+@RunWith(Parameterized.class)
+public class TestDatasetVolumeChecker {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestDatasetVolumeChecker.class);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  /**
+   * Run each test case for each possible value of {@link VolumeCheckResult}.
+   * Including "null" for 'throw exception'.
+   * @return
+   */
+  @Parameters(name="{0}")
+  public static Collection<Object[]> data() {
+    List<Object[]> values = new ArrayList<>();
+    for (VolumeCheckResult result : VolumeCheckResult.values()) {
+      values.add(new Object[] {result});
+    }
+    values.add(new Object[] {null});
+    return values;
+  }
+
+  /**
+   * When null, the check call should throw an exception.
+   */
+  private final VolumeCheckResult expectedVolumeHealth;
+  private static final int NUM_VOLUMES = 2;
+
+
+  public TestDatasetVolumeChecker(VolumeCheckResult expectedVolumeHealth) {
+    this.expectedVolumeHealth = expectedVolumeHealth;
+  }
+
+  /**
+   * Test {@link DatasetVolumeChecker#checkVolume} propagates the
+   * check to the delegate checker.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 10000)
+  public void testCheckOneVolume() throws Exception {
+    LOG.info("Executing {}", testName.getMethodName());
+    final FsVolumeSpi volume = makeVolumes(1, expectedVolumeHealth).get(0);
+    final DatasetVolumeChecker checker =
+        new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+    checker.setDelegateChecker(new DummyChecker());
+    final AtomicLong numCallbackInvocations = new AtomicLong(0);
+
+    /**
+     * Request a check and ensure it triggered {@link FsVolumeSpi#check}.
+     */
+    checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
+      @Override
+      public void call(Set<StorageLocation> healthyVolumes,
+                       Set<StorageLocation> failedVolumes) {
+        numCallbackInvocations.incrementAndGet();
+        if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) {
+          assertThat(healthyVolumes.size(), is(1));
+          assertThat(failedVolumes.size(), is(0));
+        } else {
+          assertThat(healthyVolumes.size(), is(0));
+          assertThat(failedVolumes.size(), is(1));
+        }
+      }
+    });
+
+    // Ensure that the check was invoked at least once.
+    verify(volume, times(1)).check(anyObject());
+    assertThat(numCallbackInvocations.get(), is(1L));
+  }
+
+  /**
+   * Test {@link DatasetVolumeChecker#checkAllVolumes} propagates
+   * checks for all volumes to the delegate checker.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 10000)
+  public void testCheckAllVolumes() throws Exception {
+    LOG.info("Executing {}", testName.getMethodName());
+
+    final List<FsVolumeSpi> volumes = makeVolumes(
+        NUM_VOLUMES, expectedVolumeHealth);
+    final FsDatasetSpi<FsVolumeSpi> dataset = makeDataset(volumes);
+    final DatasetVolumeChecker checker =
+        new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+    checker.setDelegateChecker(new DummyChecker());
+
+    Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+    LOG.info("Got back {} failed volumes", failedVolumes.size());
+
+    if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
+      assertThat(failedVolumes.size(), is(NUM_VOLUMES));
+    } else {
+      assertTrue(failedVolumes.isEmpty());
+    }
+
+    // Ensure each volume's check() method was called exactly once.
+    for (FsVolumeSpi volume : volumes) {
+      verify(volume, times(1)).check(anyObject());
+    }
+  }
+
+  /**
+   * Unit test for {@link DatasetVolumeChecker#checkAllVolumesAsync}.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testCheckAllVolumesAsync() throws Exception {
+    LOG.info("Executing {}", testName.getMethodName());
+
+    final List<FsVolumeSpi> volumes = makeVolumes(
+        NUM_VOLUMES, expectedVolumeHealth);
+    final FsDatasetSpi<FsVolumeSpi> dataset = makeDataset(volumes);
+    final DatasetVolumeChecker checker =
+        new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+    checker.setDelegateChecker(new DummyChecker());
+    final AtomicLong numCallbackInvocations = new AtomicLong(0);
+
+    checker.checkAllVolumesAsync(
+        dataset, new DatasetVolumeChecker.Callback() {
+          @Override
+          public void call(
+              Set<StorageLocation> healthyVolumes,
+              Set<StorageLocation> failedVolumes) {
+            LOG.info("Got back {} failed volumes", failedVolumes.size());
+            if (expectedVolumeHealth == null ||
+                expectedVolumeHealth == FAILED) {
+              assertThat(healthyVolumes.size(), is(0));
+              assertThat(failedVolumes.size(), is(NUM_VOLUMES));
+            } else {
+              assertThat(healthyVolumes.size(), is(NUM_VOLUMES));
+              assertThat(failedVolumes.size(), is(0));
+            }
+            numCallbackInvocations.incrementAndGet();
+          }
+        });
+
+    // The callback should be invoked exactly once.
+    assertThat(numCallbackInvocations.get(), is(1L));
+
+    // Ensure each volume's check() method was called exactly once.
+    for (FsVolumeSpi volume : volumes) {
+      verify(volume, times(1)).check(anyObject());
+    }
+  }
+
+  /**
+   * A checker to wraps the result of {@link FsVolumeSpi#check} in
+   * an ImmediateFuture.
+   */
+  static class DummyChecker
+      implements AsyncChecker<VolumeCheckContext, VolumeCheckResult> {
+    @Override
+    public ListenableFuture<VolumeCheckResult> schedule(
+        Checkable<VolumeCheckContext, VolumeCheckResult> target,
+        VolumeCheckContext context) {
+      try {
+        return Futures.immediateFuture(target.check(context));
+      } catch (Exception e) {
+        LOG.info("check routine threw exception " + e);
+        return Futures.immediateFailedFuture(e);
+      }
+    }
+
+    @Override
+    public void shutdownAndWait(long timeout, TimeUnit timeUnit)
+        throws InterruptedException {
+      // Nothing to cancel.
+    }
+  }
+
+  /**
+   * Create a dataset with the given volumes.
+   */
+  static FsDatasetSpi<FsVolumeSpi> makeDataset(List<FsVolumeSpi> volumes)
+      throws Exception {
+    // Create dataset and init volume health.
+    final FsDatasetSpi<FsVolumeSpi> dataset = mock(FsDatasetSpi.class);
+    final FsDatasetSpi.FsVolumeReferences references = new
+        FsDatasetSpi.FsVolumeReferences(volumes);
+    when(dataset.getFsVolumeReferences()).thenReturn(references);
+    return dataset;
+  }
+
+  private static List<FsVolumeSpi> makeVolumes(
+      int numVolumes, VolumeCheckResult health) throws Exception {
+    final List<FsVolumeSpi> volumes = new ArrayList<>(numVolumes);
+    for (int i = 0; i < numVolumes; ++i) {
+      final FsVolumeSpi volume = mock(FsVolumeSpi.class);
+      final FsVolumeReference reference = mock(FsVolumeReference.class);
+      final StorageLocation location = mock(StorageLocation.class);
+
+      when(reference.getVolume()).thenReturn(volume);
+      when(volume.obtainReference()).thenReturn(reference);
+      when(volume.getStorageLocation()).thenReturn(location);
+
+      if (health != null) {
+        when(volume.check(anyObject())).thenReturn(health);
+      } else {
+        final DiskErrorException de = new DiskErrorException("Fake Exception");
+        when(volume.check(anyObject())).thenThrow(de);
+      }
+      volumes.add(volume);
+    }
+    return volumes;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
new file mode 100644
index 0000000..b57d84f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.TimeUnit;
+import java.util.*;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Test a few more conditions not covered by TestDatasetVolumeChecker.
+ */
+public class TestDatasetVolumeCheckerFailures {
+  public static final Logger LOG =LoggerFactory.getLogger(
+      TestDatasetVolumeCheckerFailures.class);
+
+  /**
+   * Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}.
+   * @throws Exception
+   */
+  @Test(timeout=60000)
+  public void testTimeout() throws Exception {
+    // Add a volume whose check routine hangs forever.
+    final List<FsVolumeSpi> volumes =
+        Collections.singletonList(makeHungVolume());
+
+    final FsDatasetSpi<FsVolumeSpi> dataset =
+        TestDatasetVolumeChecker.makeDataset(volumes);
+
+    // Create a disk checker with a very low timeout.
+    final HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+        1, TimeUnit.SECONDS);
+    final DatasetVolumeChecker checker =
+        new DatasetVolumeChecker(conf, new FakeTimer());
+
+    // Ensure that the hung volume is detected as failed.
+    Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+    assertThat(failedVolumes.size(), is(1));
+  }
+
+  /**
+   * Test checking a closed volume i.e. one which cannot be referenced.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=60000)
+  public void testCheckingClosedVolume() throws Exception {
+    // Add a volume that cannot be referenced.
+    final List<FsVolumeSpi> volumes =
+        Collections.singletonList(makeClosedVolume());
+
+    final FsDatasetSpi<FsVolumeSpi> dataset =
+        TestDatasetVolumeChecker.makeDataset(volumes);
+
+    DatasetVolumeChecker checker =
+        new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+    Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+    assertThat(failedVolumes.size(), is(0));
+
+    // The closed volume should not have been checked as it cannot
+    // be referenced.
+    verify(volumes.get(0), times(0)).check(anyObject());
+  }
+
+  @Test(timeout=60000)
+  public void testMinGapIsEnforcedForSyncChecks() throws Exception {
+    final FsDatasetSpi<FsVolumeSpi> dataset =
+        TestDatasetVolumeChecker.makeDataset(Collections.emptyList());
+    final FakeTimer timer = new FakeTimer();
+    final Configuration conf = new HdfsConfiguration();
+    final long minGapMs = 100;
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        minGapMs, TimeUnit.MILLISECONDS);
+    final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
+
+    checker.checkAllVolumes(dataset);
+    assertThat(checker.getNumSyncDatasetChecks(), is(1L));
+
+    // Re-check without advancing the timer. Ensure the check is skipped.
+    checker.checkAllVolumes(dataset);
+    assertThat(checker.getNumSyncDatasetChecks(), is(1L));
+    assertThat(checker.getNumSkippedChecks(), is(1L));
+
+    // Re-check after advancing the timer. Ensure the check is performed.
+    timer.advance(minGapMs);
+    checker.checkAllVolumes(dataset);
+    assertThat(checker.getNumSyncDatasetChecks(), is(2L));
+    assertThat(checker.getNumSkippedChecks(), is(1L));
+  }
+
+  @Test(timeout=60000)
+  public void testMinGapIsEnforcedForASyncChecks() throws Exception {
+    final FsDatasetSpi<FsVolumeSpi> dataset =
+        TestDatasetVolumeChecker.makeDataset(Collections.emptyList());
+    final FakeTimer timer = new FakeTimer();
+    final Configuration conf = new HdfsConfiguration();
+    final long minGapMs = 100;
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        minGapMs, TimeUnit.MILLISECONDS);
+    final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
+
+    checker.checkAllVolumesAsync(dataset, null);
+    assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
+
+    // Re-check without advancing the timer. Ensure the check is skipped.
+    checker.checkAllVolumesAsync(dataset, null);
+    assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
+    assertThat(checker.getNumSkippedChecks(), is(1L));
+
+    // Re-check after advancing the timer. Ensure the check is performed.
+    timer.advance(minGapMs);
+    checker.checkAllVolumesAsync(dataset, null);
+    assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
+    assertThat(checker.getNumSkippedChecks(), is(1L));
+  }
+
+  /**
+   * Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine
+   * hangs forever.
+   *
+   * @return volume
+   * @throws Exception
+   */
+  private static FsVolumeSpi makeHungVolume() throws Exception {
+    final FsVolumeSpi volume = mock(FsVolumeSpi.class);
+    final FsVolumeReference reference = mock(FsVolumeReference.class);
+    final StorageLocation location = mock(StorageLocation.class);
+
+    when(reference.getVolume()).thenReturn(volume);
+    when(volume.obtainReference()).thenReturn(reference);
+    when(volume.getStorageLocation()).thenReturn(location);
+    when(volume.check(anyObject())).thenAnswer(
+        new Answer<VolumeCheckResult>() {
+        @Override
+        public VolumeCheckResult answer(InvocationOnMock invocation)
+            throws Throwable {
+          Thread.sleep(Long.MAX_VALUE);     // Sleep forever.
+          return VolumeCheckResult.HEALTHY; // unreachable.
+        }
+      });
+    return volume;
+  }
+
+  /**
+   * Create a mock FsVolumeSpi which is closed and hence cannot
+   * be referenced.
+   *
+   * @return volume
+   * @throws Exception
+   */
+  private static FsVolumeSpi makeClosedVolume() throws Exception {
+    final FsVolumeSpi volume = mock(FsVolumeSpi.class);
+    final StorageLocation location = mock(StorageLocation.class);
+
+    when(volume.obtainReference()).thenThrow(new ClosedChannelException());
+    when(volume.getStorageLocation()).thenReturn(location);
+    return volume;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eaaa3295/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 83d6c4c..2753a61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -112,4 +113,10 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
       throws InterruptedException, IOException {
     return null;
   }
+
+  @Override
+  public VolumeCheckResult check(VolumeCheckContext context)
+      throws Exception {
+    return VolumeCheckResult.HEALTHY;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org