You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/12/29 06:08:45 UTC

hadoop git commit: HDFS-11274. Datanode should only check the failed volume upon IO errors. Contributed by Xiaoyu Yao.

Repository: hadoop
Updated Branches:
  refs/heads/trunk ce3613c96 -> 603f3ef13


HDFS-11274. Datanode should only check the failed volume upon IO errors. Contributed by Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/603f3ef1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/603f3ef1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/603f3ef1

Branch: refs/heads/trunk
Commit: 603f3ef1386048111940b66f3a0750ab84d0588f
Parents: ce3613c
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Wed Dec 28 22:08:13 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Dec 28 22:08:13 2016 -0800

----------------------------------------------------------------------
 .../hdfs/server/datanode/BlockReceiver.java     |  12 +-
 .../server/datanode/CountingFileIoEvents.java   |   3 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  91 ++++++++-----
 .../server/datanode/DefaultFileIoEvents.java    |   2 +-
 .../hdfs/server/datanode/FileIoEvents.java      |  36 ++++--
 .../hdfs/server/datanode/FileIoProvider.java    |  89 +++++++------
 .../server/datanode/ProfilingFileIoEvents.java  |   2 +-
 .../hdfs/server/datanode/ReplicaInfo.java       |   2 +-
 .../server/datanode/checker/AsyncChecker.java   |   5 +-
 .../datanode/checker/DatasetVolumeChecker.java  |  71 ++++++----
 .../checker/StorageLocationChecker.java         |   8 +-
 .../datanode/checker/ThrottledAsyncChecker.java |  19 ++-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |   5 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   2 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   3 +-
 .../fsdataset/impl/FsVolumeImplBuilder.java     |   4 +-
 .../server/datanode/SimulatedFSDataset.java     |   2 +-
 .../datanode/TestDataNodeHotSwapVolumes.java    |   2 +-
 .../TestDataNodeVolumeFailureReporting.java     |  15 ++-
 .../checker/TestDatasetVolumeChecker.java       |  49 ++++---
 .../TestDatasetVolumeCheckerFailures.java       |  23 ----
 .../checker/TestThrottledAsyncChecker.java      | 128 +++++++++++--------
 22 files changed, 338 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index b3aee11..567597d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -278,10 +278,9 @@ class BlockReceiver implements Closeable {
       IOException cause = DatanodeUtil.getCauseIfDiskError(ioe);
       DataNode.LOG.warn("IOException in BlockReceiver constructor"
           + (cause == null ? "" : ". Cause is "), cause);
-      
-      if (cause != null) { // possible disk error
+      if (cause != null) {
         ioe = cause;
-        datanode.checkDiskErrorAsync();
+        // Volume error check moved to FileIoProvider
       }
       
       throw ioe;
@@ -363,9 +362,8 @@ class BlockReceiver implements Closeable {
     if (measuredFlushTime) {
       datanode.metrics.addFlushNanos(flushTotalNanos);
     }
-    // disk check
     if(ioe != null) {
-      datanode.checkDiskErrorAsync();
+      // Volume error check moved to FileIoProvider
       throw ioe;
     }
   }
@@ -792,7 +790,7 @@ class BlockReceiver implements Closeable {
           manageWriterOsCache(offsetInBlock);
         }
       } catch (IOException iex) {
-        datanode.checkDiskErrorAsync();
+        // Volume error check moved to FileIoProvider
         throw iex;
       }
     }
@@ -1430,7 +1428,7 @@ class BlockReceiver implements Closeable {
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
-            datanode.checkDiskErrorAsync();
+            // Volume error check moved to FileIoProvider
             LOG.info(myString, e);
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
index a70c151..7c6bfd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class CountingFileIoEvents implements FileIoEvents {
+public class CountingFileIoEvents extends FileIoEvents {
   private final Map<OPERATION, Counts> counts;
 
   private static class Counts {
@@ -90,7 +90,6 @@ public class CountingFileIoEvents implements FileIoEvents {
   public void onFailure(
       @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
     counts.get(op).failures.incrementAndGet();
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 4436e58..e893c5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -372,6 +372,7 @@ public class DataNode extends ReconfigurableBase
   SaslDataTransferClient saslClient;
   SaslDataTransferServer saslServer;
   private ObjectName dataNodeInfoBeanName;
+  // Test verification only
   private volatile long lastDiskErrorCheck;
   private String supergroup;
   private boolean isPermissionEnabled;
@@ -412,7 +413,7 @@ public class DataNode extends ReconfigurableBase
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
         new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
-    this.fileIoProvider = new FileIoProvider(conf);
+    this.fileIoProvider = new FileIoProvider(conf, this);
     this.fileDescriptorPassingDisabledReason = null;
     this.maxNumberOfBlocksToLog = 0;
     this.confVersion = null;
@@ -438,7 +439,7 @@ public class DataNode extends ReconfigurableBase
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
         new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
-    this.fileIoProvider = new FileIoProvider(conf);
+    this.fileIoProvider = new FileIoProvider(conf, this);
     this.blockScanner = new BlockScanner(this);
     this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -786,7 +787,7 @@ public class DataNode extends ReconfigurableBase
 
   /**
    * Remove volumes from DataNode.
-   * See {@link #removeVolumes(Set, boolean)} for details.
+   * See {@link #removeVolumes(Collection, boolean)} for details.
    *
    * @param locations the StorageLocations of the volumes to be removed.
    * @throws IOException
@@ -809,7 +810,7 @@ public class DataNode extends ReconfigurableBase
    *   <ul>Reset configuration DATA_DIR and {@link #dataDirs} to represent
    *   active volumes.</ul>
    * </li>
-   * @param absoluteVolumePaths the absolute path of volumes.
+   * @param storageLocations the absolute path of volumes.
    * @param clearFailure if true, clears the failure information related to the
    *                     volumes.
    * @throws IOException
@@ -1293,7 +1294,7 @@ public class DataNode extends ReconfigurableBase
    * If conf's CONFIG_PROPERTY_SIMULATED property is set
    * then a simulated storage based data node is created.
    * 
-   * @param dataDirs - only for a non-simulated storage data node
+   * @param dataDirectories - only for a non-simulated storage data node
    * @throws IOException
    */
   void startDataNode(List<StorageLocation> dataDirectories,
@@ -2045,14 +2046,33 @@ public class DataNode extends ReconfigurableBase
     }
     tracer.close();
   }
-  
-  
+
   /**
-   * Check if there is a disk failure asynchronously and if so, handle the error
+   * Check if there is a disk failure asynchronously
+   * and if so, handle the error.
    */
+  @VisibleForTesting
   public void checkDiskErrorAsync() {
     volumeChecker.checkAllVolumesAsync(
         data, (healthyVolumes, failedVolumes) -> {
+        if (failedVolumes.size() > 0) {
+          LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
+              failedVolumes.size(), failedVolumes);
+        } else {
+          LOG.debug("checkDiskErrorAsync: no volume failures detected");
+        }
+        lastDiskErrorCheck = Time.monotonicNow();
+        handleVolumeFailures(failedVolumes);
+      });
+  }
+
+  /**
+   * Check if there is a disk failure asynchronously
+   * and if so, handle the error.
+   */
+  public void checkDiskErrorAsync(FsVolumeSpi volume) {
+    volumeChecker.checkVolume(
+        volume, (healthyVolumes, failedVolumes) -> {
           if (failedVolumes.size() > 0) {
             LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
                 failedVolumes.size(), failedVolumes);
@@ -2064,9 +2084,10 @@ public class DataNode extends ReconfigurableBase
         });
   }
 
-  private void handleDiskError(String errMsgr) {
+  private void handleDiskError(String failedVolumes) {
     final boolean hasEnoughResources = data.hasEnoughResource();
-    LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
+    LOG.warn("DataNode.handleDiskError on : [" + failedVolumes +
+        "] Keep Running: " + hasEnoughResources);
     
     // If we have enough active valid volumes then we do not want to 
     // shutdown the DN completely.
@@ -2076,7 +2097,7 @@ public class DataNode extends ReconfigurableBase
 
     //inform NameNodes
     for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
-      bpos.trySendErrorReport(dpError, errMsgr);
+      bpos.trySendErrorReport(dpError, failedVolumes);
     }
     
     if(hasEnoughResources) {
@@ -2084,7 +2105,8 @@ public class DataNode extends ReconfigurableBase
       return; // do not shutdown
     }
     
-    LOG.warn("DataNode is shutting down: " + errMsgr);
+    LOG.warn("DataNode is shutting down due to failed volumes: ["
+        + failedVolumes + "]");
     shouldRun = false;
   }
     
@@ -2447,8 +2469,11 @@ public class DataNode extends ReconfigurableBase
         }
         LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
             targets[0] + " got ", ie);
-        // check if there are any disk problem
-        checkDiskErrorAsync();
+        // disk check moved to FileIoProvider
+        IOException cause = DatanodeUtil.getCauseIfDiskError(ie);
+        if (cause != null) { // possible disk error
+          LOG.warn("IOException in DataTransfer#run(). Cause is ", cause);
+        }
       } finally {
         decrementXmitsInProgress();
         IOUtils.closeStream(blockSender);
@@ -3234,29 +3259,37 @@ public class DataNode extends ReconfigurableBase
   }
 
   private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
+    if (unhealthyVolumes.isEmpty()) {
+      LOG.debug("handleVolumeFailures done with empty " +
+          "unhealthyVolumes");
+      return;
+    }
+
     data.handleVolumeFailures(unhealthyVolumes);
     Set<StorageLocation> unhealthyLocations = new HashSet<>(
         unhealthyVolumes.size());
 
-    if (!unhealthyVolumes.isEmpty()) {
-      StringBuilder sb = new StringBuilder("DataNode failed volumes:");
-      for (FsVolumeSpi vol : unhealthyVolumes) {
-        unhealthyLocations.add(vol.getStorageLocation());
-        sb.append(vol.getStorageLocation()).append(";");
-      }
+    StringBuilder sb = new StringBuilder("DataNode failed volumes:");
+    for (FsVolumeSpi vol : unhealthyVolumes) {
+      unhealthyLocations.add(vol.getStorageLocation());
+      sb.append(vol.getStorageLocation()).append(";");
+    }
 
-      try {
-        // Remove all unhealthy volumes from DataNode.
-        removeVolumes(unhealthyLocations, false);
-      } catch (IOException e) {
-        LOG.warn("Error occurred when removing unhealthy storage dirs: "
-            + e.getMessage(), e);
-      }
-      LOG.info(sb.toString());
-      handleDiskError(sb.toString());
+    try {
+      // Remove all unhealthy volumes from DataNode.
+      removeVolumes(unhealthyLocations, false);
+    } catch (IOException e) {
+      LOG.warn("Error occurred when removing unhealthy storage dirs: "
+          + e.getMessage(), e);
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sb.toString());
+    }
+      // send blockreport regarding volume failure
+    handleDiskError(sb.toString());
   }
 
+  @VisibleForTesting
   public long getLastDiskErrorCheck() {
     return lastDiskErrorCheck;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
index bd4932b..6a12aae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public final class DefaultFileIoEvents implements FileIoEvents {
+public final class DefaultFileIoEvents extends FileIoEvents {
   @Override
   public long beforeMetadataOp(
       @Nullable FsVolumeSpi volume, OPERATION op) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
index 48e703f..10f2a0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public interface FileIoEvents {
+public abstract class FileIoEvents {
 
   /**
    * Invoked before a filesystem metadata operation.
@@ -42,7 +42,7 @@ public interface FileIoEvents {
    * @return  timestamp at which the operation was started. 0 if
    *          unavailable.
    */
-  long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
+  abstract long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
 
   /**
    * Invoked after a filesystem metadata operation has completed.
@@ -52,7 +52,8 @@ public interface FileIoEvents {
    * @param begin  timestamp at which the operation was started. 0
    *               if unavailable.
    */
-  void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin);
+  abstract void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op,
+                                long begin);
 
   /**
    * Invoked before a read/write/flush/channel transfer operation.
@@ -63,7 +64,8 @@ public interface FileIoEvents {
    * @return  timestamp at which the operation was started. 0 if
    *          unavailable.
    */
-  long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len);
+  abstract long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+                             long len);
 
 
   /**
@@ -76,22 +78,38 @@ public interface FileIoEvents {
    * @return  timestamp at which the operation was started. 0 if
    *          unavailable.
    */
-  void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
-                   long begin, long len);
+  abstract void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+                            long begin, long len);
 
   /**
    * Invoked if an operation fails with an exception.
-   *  @param volume  target volume for the operation. Null if unavailable.
+   * @param volume  target volume for the operation. Null if unavailable.
    * @param op  type of operation.
    * @param e  Exception encountered during the operation.
    * @param begin  time at which the operation was started.
    */
-  void onFailure(
+  abstract void onFailure(
       @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
 
   /**
+   * Invoked by FileIoProvider if an operation fails with an exception.
+   * @param datanode datanode that runs volume check upon volume io failure
+   * @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @param e  Exception encountered during the operation.
+   * @param begin  time at which the operation was started.
+   */
+  void onFailure(DataNode datanode,
+      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+    onFailure(volume, op, e, begin);
+    if (datanode != null && volume != null) {
+      datanode.checkDiskErrorAsync(volume);
+    }
+  }
+
+  /**
    * Return statistics as a JSON string.
    * @return
    */
-  @Nullable String getStatistics();
+  @Nullable abstract String getStatistics();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
index 2344114..f961049 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
@@ -79,12 +79,16 @@ public class FileIoProvider {
       FileIoProvider.class);
 
   private final FileIoEvents eventHooks;
+  private final DataNode datanode;
 
   /**
    * @param conf  Configuration object. May be null. When null,
    *              the event handlers are no-ops.
+   * @param datanode datanode that owns this FileIoProvider. Used for
+   *               IO error based volume checker callback
    */
-  public FileIoProvider(@Nullable Configuration conf) {
+  public FileIoProvider(@Nullable Configuration conf,
+                        final DataNode datanode) {
     if (conf != null) {
       final Class<? extends FileIoEvents> clazz = conf.getClass(
           DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
@@ -94,6 +98,7 @@ public class FileIoProvider {
     } else {
       eventHooks = new DefaultFileIoEvents();
     }
+    this.datanode = datanode;
   }
 
   /**
@@ -139,7 +144,7 @@ public class FileIoProvider {
       f.flush();
       eventHooks.afterFileIo(volume, FLUSH, begin, 0);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, FLUSH, e, begin);
+      eventHooks.onFailure(datanode, volume, FLUSH, e, begin);
       throw e;
     }
   }
@@ -157,7 +162,7 @@ public class FileIoProvider {
       fos.getChannel().force(true);
       eventHooks.afterFileIo(volume, SYNC, begin, 0);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, SYNC, e, begin);
+      eventHooks.onFailure(datanode, volume, SYNC, e, begin);
       throw e;
     }
   }
@@ -176,7 +181,7 @@ public class FileIoProvider {
       NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
       eventHooks.afterFileIo(volume, SYNC, begin, 0);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, SYNC, e, begin);
+      eventHooks.onFailure(datanode, volume, SYNC, e, begin);
       throw e;
     }
   }
@@ -196,7 +201,7 @@ public class FileIoProvider {
           identifier, outFd, offset, length, flags);
       eventHooks.afterMetadataOp(volume, FADVISE, begin);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, FADVISE, e, begin);
+      eventHooks.onFailure(datanode, volume, FADVISE, e, begin);
       throw e;
     }
   }
@@ -214,7 +219,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, DELETE, begin);
       return deleted;
     } catch (Exception e) {
-      eventHooks.onFailure(volume, DELETE, e, begin);
+      eventHooks.onFailure(datanode, volume, DELETE, e, begin);
       throw e;
     }
   }
@@ -236,7 +241,7 @@ public class FileIoProvider {
       }
       return deleted;
     } catch (Exception e) {
-      eventHooks.onFailure(volume, DELETE, e, begin);
+      eventHooks.onFailure(datanode, volume, DELETE, e, begin);
       throw e;
     }
   }
@@ -264,7 +269,7 @@ public class FileIoProvider {
           waitTime, transferTime);
       eventHooks.afterFileIo(volume, TRANSFER, begin, count);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, TRANSFER, e, begin);
+      eventHooks.onFailure(datanode, volume, TRANSFER, e, begin);
       throw e;
     }
   }
@@ -285,7 +290,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, OPEN, begin);
       return created;
     } catch (Exception e) {
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -312,7 +317,7 @@ public class FileIoProvider {
       return fis;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fis);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -328,7 +333,7 @@ public class FileIoProvider {
    * @param f  File object.
    * @param append  if true, then bytes will be written to the end of the
    *                file rather than the beginning.
-   * @param  FileOutputStream to the given file object.
+   * @return  FileOutputStream to the given file object.
    * @throws FileNotFoundException
    */
   public FileOutputStream getFileOutputStream(
@@ -342,7 +347,7 @@ public class FileIoProvider {
       return fos;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fos);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -372,7 +377,7 @@ public class FileIoProvider {
    * before delegating to the wrapped stream.
    *
    * @param volume  target volume. null if unavailable.
-   * @param f  File object.
+   * @param fd  File descriptor object.
    * @return  FileOutputStream to the given file object.
    * @throws  FileNotFoundException
    */
@@ -407,7 +412,7 @@ public class FileIoProvider {
       return fis;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fis);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -438,7 +443,7 @@ public class FileIoProvider {
       return fis;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fis);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -468,7 +473,7 @@ public class FileIoProvider {
       return raf;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(raf);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -487,7 +492,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, DELETE, begin);
       return deleted;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, DELETE, e, begin);
+      eventHooks.onFailure(datanode, volume, DELETE, e, begin);
       throw e;
     }
   }
@@ -508,7 +513,7 @@ public class FileIoProvider {
       FileUtil.replaceFile(src, target);
       eventHooks.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MOVE, e, begin);
+      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
       throw e;
     }
   }
@@ -530,7 +535,7 @@ public class FileIoProvider {
       Storage.rename(src, target);
       eventHooks.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MOVE, e, begin);
+      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
       throw e;
     }
   }
@@ -552,7 +557,7 @@ public class FileIoProvider {
       FileUtils.moveFile(src, target);
       eventHooks.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MOVE, e, begin);
+      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
       throw e;
     }
   }
@@ -576,7 +581,7 @@ public class FileIoProvider {
       Files.move(src, target, options);
       eventHooks.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MOVE, e, begin);
+      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
       throw e;
     }
   }
@@ -600,7 +605,7 @@ public class FileIoProvider {
       Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
       eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, NATIVE_COPY, e, begin);
+      eventHooks.onFailure(datanode, volume, NATIVE_COPY, e, begin);
       throw e;
     }
   }
@@ -625,7 +630,7 @@ public class FileIoProvider {
       isDirectory = !created && dir.isDirectory();
       eventHooks.afterMetadataOp(volume, MKDIRS, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MKDIRS, e, begin);
+      eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
       throw e;
     }
 
@@ -651,7 +656,7 @@ public class FileIoProvider {
       succeeded = dir.isDirectory() || dir.mkdirs();
       eventHooks.afterMetadataOp(volume, MKDIRS, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MKDIRS, e, begin);
+      eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
       throw e;
     }
 
@@ -677,7 +682,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, LIST, begin);
       return children;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, LIST, e, begin);
+      eventHooks.onFailure(datanode, volume, LIST, e, begin);
       throw e;
     }
   }
@@ -687,7 +692,7 @@ public class FileIoProvider {
    * {@link FileUtil#listFiles(File)}.
    *
    * @param volume  target volume. null if unavailable.
-   * @param   Driectory to be listed.
+   * @param   dir directory to be listed.
    * @return  array of strings representing the directory entries.
    * @throws IOException
    */
@@ -699,7 +704,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, LIST, begin);
       return children;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, LIST, e, begin);
+      eventHooks.onFailure(datanode, volume, LIST, e, begin);
       throw e;
     }
   }
@@ -722,7 +727,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, LIST, begin);
       return children;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, LIST, e, begin);
+      eventHooks.onFailure(datanode, volume, LIST, e, begin);
       throw e;
     }
   }
@@ -744,7 +749,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, LIST, begin);
       return count;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, LIST, e, begin);
+      eventHooks.onFailure(datanode, volume, LIST, e, begin);
       throw e;
     }
   }
@@ -763,7 +768,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, EXISTS, begin);
       return exists;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, EXISTS, e, begin);
+      eventHooks.onFailure(datanode, volume, EXISTS, e, begin);
       throw e;
     }
   }
@@ -804,7 +809,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, 1);
         return b;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -820,7 +825,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -836,7 +841,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -878,7 +883,7 @@ public class FileIoProvider {
         super.write(b);
         eventHooks.afterFileIo(volume, WRITE, begin, 1);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -893,7 +898,7 @@ public class FileIoProvider {
         super.write(b);
         eventHooks.afterFileIo(volume, WRITE, begin, b.length);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -908,7 +913,7 @@ public class FileIoProvider {
         super.write(b, off, len);
         eventHooks.afterFileIo(volume, WRITE, begin, len);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -936,7 +941,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, 1);
         return b;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -949,7 +954,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -962,7 +967,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -974,7 +979,7 @@ public class FileIoProvider {
         super.write(b);
         eventHooks.afterFileIo(volume, WRITE, begin, 1);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -986,7 +991,7 @@ public class FileIoProvider {
         super.write(b);
         eventHooks.afterFileIo(volume, WRITE, begin, b.length);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -998,7 +1003,7 @@ public class FileIoProvider {
         super.write(b, off, len);
         eventHooks.afterFileIo(volume, WRITE, begin, len);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
index 5835fe8..affd093 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
@@ -30,7 +30,7 @@ import javax.annotation.Nullable;
  * related operations on datanode volumes.
  */
 @InterfaceAudience.Private
-class ProfilingFileIoEvents implements FileIoEvents {
+class ProfilingFileIoEvents extends FileIoEvents {
 
   @Override
   public long beforeMetadataOp(@Nullable FsVolumeSpi volume,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index d3006c8..65e9ba7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -47,7 +47,7 @@ abstract public class ReplicaInfo extends Block
 
   /** This is used by some tests and FsDatasetUtil#computeChecksum. */
   private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
-      new FileIoProvider(null);
+      new FileIoProvider(null, null);
 
   /**
   * Constructor

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
index 1d534a3..997c0cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -43,10 +44,10 @@ public interface AsyncChecker<K, V> {
    * @param context the interpretation of the context depends on the
    *                target.
    *
-   * @return returns a {@link ListenableFuture} that can be used to
+   * @return returns a {@link Optional of ListenableFuture} that can be used to
    *         retrieve the result of the asynchronous check.
    */
-  ListenableFuture<V> schedule(Checkable<K, V> target, K context);
+  Optional<ListenableFuture<V>> schedule(Checkable<K, V> target, K context);
 
   /**
    * Cancel all executing checks and wait for them to complete.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
index ba09d23..cab6122 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
@@ -191,18 +192,26 @@ public class DatasetVolumeChecker {
 
     for (int i = 0; i < references.size(); ++i) {
       final FsVolumeReference reference = references.getReference(i);
-      allVolumes.add(reference.getVolume());
-      ListenableFuture<VolumeCheckResult> future =
+      Optional<ListenableFuture<VolumeCheckResult>> olf =
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
       LOG.info("Scheduled health check for volume {}", reference.getVolume());
-      Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, numVolumes, new Callback() {
-        @Override
-        public void call(Set<FsVolumeSpi> ignored1,
-                         Set<FsVolumeSpi> ignored2) {
+      if (olf.isPresent()) {
+        allVolumes.add(reference.getVolume());
+        Futures.addCallback(olf.get(),
+            new ResultHandler(reference, healthyVolumes, failedVolumes,
+                numVolumes, new Callback() {
+              @Override
+              public void call(Set<FsVolumeSpi> ignored1,
+                               Set<FsVolumeSpi> ignored2) {
+                latch.countDown();
+              }
+            }));
+      } else {
+        IOUtils.cleanup(null, reference);
+        if (numVolumes.decrementAndGet() == 0) {
           latch.countDown();
         }
-      }));
+      }
     }
 
     // Wait until our timeout elapses, after which we give up on
@@ -263,18 +272,26 @@ public class DatasetVolumeChecker {
     final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
     final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
     final AtomicLong numVolumes = new AtomicLong(references.size());
+    boolean added = false;
 
     LOG.info("Checking {} volumes", references.size());
     for (int i = 0; i < references.size(); ++i) {
       final FsVolumeReference reference = references.getReference(i);
       // The context parameter is currently ignored.
-      ListenableFuture<VolumeCheckResult> future =
+      Optional<ListenableFuture<VolumeCheckResult>> olf =
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
-      Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, numVolumes, callback));
+      if (olf.isPresent()) {
+        added = true;
+        Futures.addCallback(olf.get(),
+            new ResultHandler(reference, healthyVolumes, failedVolumes,
+                numVolumes, callback));
+      } else {
+        IOUtils.cleanup(null, reference);
+        numVolumes.decrementAndGet();
+      }
     }
     numAsyncDatasetChecks.incrementAndGet();
-    return true;
+    return added;
   }
 
   /**
@@ -291,7 +308,7 @@ public class DatasetVolumeChecker {
   }
 
   /**
-   * Check a single volume, returning a {@link ListenableFuture}
+   * Check a single volume asynchronously, returning a {@link ListenableFuture}
    * that can be used to retrieve the final result.
    *
    * If the volume cannot be referenced then it is already closed and
@@ -305,21 +322,31 @@ public class DatasetVolumeChecker {
   public boolean checkVolume(
       final FsVolumeSpi volume,
       Callback callback) {
+    if (volume == null) {
+      LOG.debug("Cannot schedule check on null volume");
+      return false;
+    }
+
     FsVolumeReference volumeReference;
     try {
       volumeReference = volume.obtainReference();
     } catch (ClosedChannelException e) {
       // The volume has already been closed.
-      callback.call(new HashSet<>(), new HashSet<>());
       return false;
     }
-    ListenableFuture<VolumeCheckResult> future =
+
+    Optional<ListenableFuture<VolumeCheckResult>> olf =
         delegateChecker.schedule(volume, IGNORED_CONTEXT);
-    numVolumeChecks.incrementAndGet();
-    Futures.addCallback(future, new ResultHandler(
-        volumeReference, new HashSet<>(), new HashSet<>(),
-        new AtomicLong(1), callback));
-    return true;
+    if (olf.isPresent()) {
+      numVolumeChecks.incrementAndGet();
+      Futures.addCallback(olf.get(),
+          new ResultHandler(volumeReference, new HashSet<>(), new HashSet<>(),
+          new AtomicLong(1), callback));
+      return true;
+    } else {
+      IOUtils.cleanup(null, volumeReference);
+    }
+    return false;
   }
 
   /**
@@ -343,8 +370,8 @@ public class DatasetVolumeChecker {
      *                       successful, add the volume here.
      * @param failedVolumes set of failed volumes. If the disk check fails,
      *                      add the volume here.
-     * @param semaphore semaphore used to trigger callback invocation.
-     * @param callback invoked when the semaphore can be successfully acquired.
+     * @param volumeCounter volumeCounter used to trigger callback invocation.
+     * @param callback invoked when the volumeCounter reaches 0.
      */
     ResultHandler(FsVolumeReference reference,
                   Set<FsVolumeSpi> healthyVolumes,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
index 7337ad0..6e323e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -158,8 +159,11 @@ public class StorageLocationChecker {
     // Start parallel disk check operations on all StorageLocations.
     for (StorageLocation location : dataDirs) {
       goodLocations.put(location, true);
-      futures.put(location,
-          delegateChecker.schedule(location, context));
+      Optional<ListenableFuture<VolumeCheckResult>> olf =
+          delegateChecker.schedule(location, context);
+      if (olf.isPresent()) {
+        futures.put(location, olf.get());
+      }
     }
 
     if (maxVolumeFailuresTolerated >= dataDirs.size()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
index d0ee3d2..83c554d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -101,13 +102,11 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
    * will receive the same Future.
    */
   @Override
-  public synchronized ListenableFuture<V> schedule(
-      final Checkable<K, V> target,
-      final K context) {
-    LOG.debug("Scheduling a check of {}", target);
-
+  public Optional<ListenableFuture<V>> schedule(Checkable<K, V> target,
+                                                K context) {
+    LOG.info("Scheduling a check for {}", target);
     if (checksInProgress.containsKey(target)) {
-      return checksInProgress.get(target);
+      return Optional.absent();
     }
 
     if (completedChecks.containsKey(target)) {
@@ -115,11 +114,9 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
       final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
       if (msSinceLastCheck < minMsBetweenChecks) {
         LOG.debug("Skipped checking {}. Time since last check {}ms " +
-            "is less than the min gap {}ms.",
+                "is less than the min gap {}ms.",
             target, msSinceLastCheck, minMsBetweenChecks);
-        return result.result != null ?
-            Futures.immediateFuture(result.result) :
-            Futures.immediateFailedFuture(result.exception);
+        return Optional.absent();
       }
     }
 
@@ -132,7 +129,7 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
         });
     checksInProgress.put(target, lf);
     addResultCachingCallback(target, lf);
-    return lf;
+    return Optional.of(lf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 8273ebb..c8df300 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -271,7 +271,10 @@ class BlockPoolSlice {
           new FileOutputStream(outFile), "UTF-8")) {
         // mtime is written last, so that truncated writes won't be valid.
         out.write(Long.toString(used) + " " + Long.toString(timer.now()));
-        fileIoProvider.flush(volume, out);
+        // This is only called as part of the volume shutdown.
+        // We explicitly avoid calling flush with fileIoProvider which triggers
+        // volume check upon io exception to avoid cyclic volume checks.
+        out.flush();
       }
     } catch (IOException ioe) {
       // If write failed, the volume might be bad. Since the cache file is

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 0d5a12c..d1f8f05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1828,7 +1828,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         return r;
       }
       // if file is not null, but doesn't exist - possibly disk failed
-      datanode.checkDiskErrorAsync();
+      datanode.checkDiskErrorAsync(r.getVolume());
     }
 
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 753c083..042ef6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -1321,8 +1321,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
           this, dir, BlockDirFilter.INSTANCE);
     } catch (IOException ioe) {
       LOG.warn("Exception occured while compiling report: ", ioe);
-      // Initiate a check on disk failure.
-      dataset.datanode.checkDiskErrorAsync();
+      // Volume error check moved to FileIoProvider.
       // Ignore this directory and proceed.
       return report;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
index 5371eda..427f81b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
@@ -69,7 +69,7 @@ public class FsVolumeImplBuilder {
   FsVolumeImpl build() throws IOException {
     return new FsVolumeImpl(
         dataset, storageID, sd,
-        fileIoProvider != null ? fileIoProvider : new FileIoProvider(null),
-        conf);
+        fileIoProvider != null ? fileIoProvider :
+            new FileIoProvider(null, null), conf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 8472eca..cd3befd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -624,7 +624,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
 
     registerMBean(datanodeUuid);
-    this.fileIoProvider = new FileIoProvider(conf);
+    this.fileIoProvider = new FileIoProvider(conf, datanode);
     this.storage = new SimulatedStorage(
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
         conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index e31e783..96d1a28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -807,7 +807,7 @@ public class TestDataNodeHotSwapVolumes {
     DataNodeTestUtils.injectDataDirFailure(dirToFail);
     // Call and wait DataNode to detect disk failure.
     long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
-    dn.checkDiskErrorAsync();
+    dn.checkDiskErrorAsync(failedVolume);
     while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
       Thread.sleep(100);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
index 3d37b10..3015e61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
 import static org.junit.Assert.assertArrayEquals;
@@ -319,6 +319,12 @@ public class TestDataNodeVolumeFailureReporting {
     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
     DFSTestUtil.waitReplication(fs, file1, (short)3);
 
+    // Create additional file to trigger failure based volume check on dn1Vol2
+    // and dn2Vol2.
+    Path file2 = new Path("/test2");
+    DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
+    DFSTestUtil.waitReplication(fs, file2, (short)3);
+
     ArrayList<DataNode> dns = cluster.getDataNodes();
     assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
     assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
@@ -538,8 +544,6 @@ public class TestDataNodeVolumeFailureReporting {
   private void checkFailuresAtDataNode(DataNode dn,
       long expectedVolumeFailuresCounter, boolean expectCapacityKnown,
       String... expectedFailedVolumes) throws Exception {
-    assertCounter("VolumeFailures", expectedVolumeFailuresCounter,
-        getMetrics(dn.getMetrics().name()));
     FsDatasetSpi<?> fsd = dn.getFSDataset();
     StringBuilder strBuilder = new StringBuilder();
     strBuilder.append("expectedFailedVolumes is ");
@@ -551,6 +555,11 @@ public class TestDataNodeVolumeFailureReporting {
       strBuilder.append(expected + ",");
     }
     LOG.info(strBuilder.toString());
+    final long actualVolumeFailures =
+        getLongCounter("VolumeFailures", getMetrics(dn.getMetrics().name()));
+    assertTrue("Actual async detected volume failures should be greater or " +
+        "equal than " + expectedFailedVolumes,
+        actualVolumeFailures >= expectedVolumeFailuresCounter);
     assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
     assertArrayEquals(expectedFailedVolumes,
         convertToAbsolutePaths(fsd.getFailedStorageLocations()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
index 50096ba..f5bb807 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -104,24 +105,28 @@ public class TestDatasetVolumeChecker {
     /**
      * Request a check and ensure it triggered {@link FsVolumeSpi#check}.
      */
-    checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
-      @Override
-      public void call(Set<FsVolumeSpi> healthyVolumes,
-                       Set<FsVolumeSpi> failedVolumes) {
-        numCallbackInvocations.incrementAndGet();
-        if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) {
-          assertThat(healthyVolumes.size(), is(1));
-          assertThat(failedVolumes.size(), is(0));
-        } else {
-          assertThat(healthyVolumes.size(), is(0));
-          assertThat(failedVolumes.size(), is(1));
-        }
-      }
-    });
+    boolean result =
+        checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
+          @Override
+          public void call(Set<FsVolumeSpi> healthyVolumes,
+                           Set<FsVolumeSpi> failedVolumes) {
+            numCallbackInvocations.incrementAndGet();
+            if (expectedVolumeHealth != null &&
+                expectedVolumeHealth != FAILED) {
+              assertThat(healthyVolumes.size(), is(1));
+              assertThat(failedVolumes.size(), is(0));
+            } else {
+              assertThat(healthyVolumes.size(), is(0));
+              assertThat(failedVolumes.size(), is(1));
+            }
+          }
+        });
 
     // Ensure that the check was invoked at least once.
     verify(volume, times(1)).check(anyObject());
-    assertThat(numCallbackInvocations.get(), is(1L));
+    if (result) {
+      assertThat(numCallbackInvocations.get(), is(1L));
+    }
   }
 
   /**
@@ -173,7 +178,7 @@ public class TestDatasetVolumeChecker {
     checker.setDelegateChecker(new DummyChecker());
     final AtomicLong numCallbackInvocations = new AtomicLong(0);
 
-    checker.checkAllVolumesAsync(
+    boolean result = checker.checkAllVolumesAsync(
         dataset, new DatasetVolumeChecker.Callback() {
           @Override
           public void call(
@@ -193,7 +198,9 @@ public class TestDatasetVolumeChecker {
         });
 
     // The callback should be invoked exactly once.
-    assertThat(numCallbackInvocations.get(), is(1L));
+    if (result) {
+      assertThat(numCallbackInvocations.get(), is(1L));
+    }
 
     // Ensure each volume's check() method was called exactly once.
     for (FsVolumeSpi volume : volumes) {
@@ -207,15 +214,17 @@ public class TestDatasetVolumeChecker {
    */
   static class DummyChecker
       implements AsyncChecker<VolumeCheckContext, VolumeCheckResult> {
+
     @Override
-    public ListenableFuture<VolumeCheckResult> schedule(
+    public Optional<ListenableFuture<VolumeCheckResult>> schedule(
         Checkable<VolumeCheckContext, VolumeCheckResult> target,
         VolumeCheckContext context) {
       try {
-        return Futures.immediateFuture(target.check(context));
+        return Optional.of(
+            Futures.immediateFuture(target.check(context)));
       } catch (Exception e) {
         LOG.info("check routine threw exception " + e);
-        return Futures.immediateFailedFuture(e);
+        return Optional.of(Futures.immediateFailedFuture(e));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
index 16c333b..0fe892d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
@@ -132,29 +132,6 @@ public class TestDatasetVolumeCheckerFailures {
     assertThat(checker.getNumSkippedChecks(), is(1L));
   }
 
-  @Test(timeout=60000)
-  public void testMinGapIsEnforcedForASyncChecks() throws Exception {
-    final List<FsVolumeSpi> volumes =
-        TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
-    final FsDatasetSpi<FsVolumeSpi> dataset =
-        TestDatasetVolumeChecker.makeDataset(volumes);
-    final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
-
-    checker.checkAllVolumesAsync(dataset, null);
-    assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
-
-    // Re-check without advancing the timer. Ensure the check is skipped.
-    checker.checkAllVolumesAsync(dataset, null);
-    assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
-    assertThat(checker.getNumSkippedChecks(), is(1L));
-
-    // Re-check after advancing the timer. Ensure the check is performed.
-    timer.advance(MIN_DISK_CHECK_GAP_MS);
-    checker.checkAllVolumesAsync(dataset, null);
-    assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
-    assertThat(checker.getNumSkippedChecks(), is(1L));
-  }
-
   /**
    * Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine
    * hangs forever.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
index 70795ca..c171c0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
@@ -18,15 +18,14 @@
 
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.FakeTimer;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,10 +37,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.core.Is.isA;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -53,9 +49,6 @@ public class TestThrottledAsyncChecker {
       LoggerFactory.getLogger(TestThrottledAsyncChecker.class);
   private static final long MIN_ERROR_CHECK_GAP = 1000;
 
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
   /**
    * Test various scheduling combinations to ensure scheduling and
    * throttling behave as expected.
@@ -70,34 +63,34 @@ public class TestThrottledAsyncChecker {
                                     getExecutorService());
 
     // check target1 and ensure we get back the expected result.
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
+    assertTrue(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
 
     // Check target1 again without advancing the timer. target1 should not
-    // be checked again and the cached result should be returned.
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
+    // be checked again.
+    assertFalse(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
 
     // Schedule target2 scheduled without advancing the timer.
     // target2 should be checked as it has never been checked before.
-    assertTrue(checker.schedule(target2, true).get());
-    assertThat(target2.numChecks.get(), is(1L));
+    assertTrue(checker.schedule(target2, true).isPresent());
+    waitTestCheckableCheckCount(target2, 1L);
 
     // Advance the timer but just short of the min gap.
     // Neither target1 nor target2 should be checked again.
     timer.advance(MIN_ERROR_CHECK_GAP - 1);
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
-    assertTrue(checker.schedule(target2, true).get());
-    assertThat(target2.numChecks.get(), is(1L));
+    assertFalse(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
+    assertFalse(checker.schedule(target2, true).isPresent());
+    waitTestCheckableCheckCount(target2, 1L);
 
     // Advance the timer again.
     // Both targets should be checked now.
     timer.advance(MIN_ERROR_CHECK_GAP);
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(2L));
-    assertTrue(checker.schedule(target2, true).get());
-    assertThat(target1.numChecks.get(), is(2L));
+    assertTrue(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 2L);
+    assertTrue(checker.schedule(target2, true).isPresent());
+    waitTestCheckableCheckCount(target2, 2L);
   }
 
   @Test (timeout=60000)
@@ -109,13 +102,16 @@ public class TestThrottledAsyncChecker {
         new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
                                     getExecutorService());
 
-    ListenableFuture<Boolean> lf = checker.schedule(target, true);
-    Futures.addCallback(lf, callback);
+    Optional<ListenableFuture<Boolean>> olf =
+        checker.schedule(target, true);
+    if (olf.isPresent()) {
+      Futures.addCallback(olf.get(), callback);
+    }
 
     // Request immediate cancellation.
     checker.shutdownAndWait(0, TimeUnit.MILLISECONDS);
     try {
-      assertFalse(lf.get());
+      assertFalse(olf.get().get());
       fail("Failed to get expected InterruptedException");
     } catch (ExecutionException ee) {
       assertTrue(ee.getCause() instanceof InterruptedException);
@@ -130,27 +126,33 @@ public class TestThrottledAsyncChecker {
     ThrottledAsyncChecker<Boolean, Boolean> checker =
         new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
                                     getExecutorService());
-    final ListenableFuture<Boolean> lf1 = checker.schedule(target, true);
-    final ListenableFuture<Boolean> lf2 = checker.schedule(target, true);
+    final Optional<ListenableFuture<Boolean>> olf1 =
+        checker.schedule(target, true);
 
-    // Ensure that concurrent requests return the same future object.
-    assertTrue(lf1 == lf2);
+    final Optional<ListenableFuture<Boolean>> olf2 =
+        checker.schedule(target, true);
+
+    // Ensure that concurrent requests return the future object
+    // for the first caller.
+    assertTrue(olf1.isPresent());
+    assertFalse(olf2.isPresent());
 
     // Unblock the latch and wait for it to finish execution.
     target.latch.countDown();
-    lf1.get();
+    olf1.get().get();
 
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        // We should not get back the same future as before.
+        // We should get an absent Optional.
         // This can take a short while until the internal callback in
         // ThrottledAsyncChecker is scheduled for execution.
         // Also this should not trigger a new check operation as the timer
         // was not advanced. If it does trigger a new check then the test
         // will fail with a timeout.
-        final ListenableFuture<Boolean> lf3 = checker.schedule(target, true);
-        return lf3 != lf2;
+        final Optional<ListenableFuture<Boolean>> olf3 =
+            checker.schedule(target, true);
+        return !olf3.isPresent();
       }
     }, 100, 10000);
   }
@@ -168,15 +170,29 @@ public class TestThrottledAsyncChecker {
         new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
             getExecutorService());
 
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
+    assertTrue(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
     timer.advance(MIN_ERROR_CHECK_GAP + 1);
-    assertFalse(checker.schedule(target1, false).get());
-    assertThat(target1.numChecks.get(), is(2L));
+    assertTrue(checker.schedule(target1, false).isPresent());
+    waitTestCheckableCheckCount(target1, 2L);
+
   }
 
+  private void waitTestCheckableCheckCount(TestCheckableBase target,
+      long expectedChecks) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        // This can take a short while until the internal callback in
+        // ThrottledAsyncChecker is scheduled for execution.
+        // If it does trigger a new check then the test
+        // will fail with a timeout.
+        return target.getTotalChecks() == expectedChecks;
+      }
+    }, 100, 10000);
+  }
   /**
-   * Ensure that the exeption from a failed check is cached
+   * Ensure that the exception from a failed check is cached
    * and returned without re-running the check when the minimum
    * gap has not elapsed.
    *
@@ -190,13 +206,11 @@ public class TestThrottledAsyncChecker {
         new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
             getExecutorService());
 
-    thrown.expectCause(isA(DummyException.class));
-    checker.schedule(target1, true).get();
-    assertThat(target1.numChecks.get(), is(1L));
+    assertTrue(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
 
-    thrown.expectCause(isA(DummyException.class));
-    checker.schedule(target1, true).get();
-    assertThat(target1.numChecks.get(), is(2L));
+    assertFalse(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
   }
 
   /**
@@ -206,28 +220,38 @@ public class TestThrottledAsyncChecker {
     return new ScheduledThreadPoolExecutor(1);
   }
 
+  private abstract static class TestCheckableBase
+      implements Checkable<Boolean, Boolean> {
+    protected final AtomicLong numChecks = new AtomicLong(0);
+
+    public long getTotalChecks() {
+      return numChecks.get();
+    }
+
+    public void incrTotalChecks() {
+      numChecks.incrementAndGet();
+    }
+  }
+
   /**
    * A Checkable that just returns its input.
    */
   private static class NoOpCheckable
-      implements Checkable<Boolean, Boolean> {
-    private final AtomicLong numChecks = new AtomicLong(0);
+      extends TestCheckableBase {
     @Override
     public Boolean check(Boolean context) {
-      numChecks.incrementAndGet();
+      incrTotalChecks();
       return context;
     }
   }
 
   private static class ThrowingCheckable
-      implements Checkable<Boolean, Boolean> {
-    private final AtomicLong numChecks = new AtomicLong(0);
+      extends TestCheckableBase {
     @Override
     public Boolean check(Boolean context) throws DummyException {
-      numChecks.incrementAndGet();
+      incrTotalChecks();
       throw new DummyException();
     }
-
   }
 
   private static class DummyException extends Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org