You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/06/20 23:14:25 UTC

hadoop git commit: HDFS-8457. Ozone: Refactor FsDatasetSpi to pull up HDFS-agnostic functionality into parent interface. (Contributed by Arpit Agarwal)

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 a229f69ac -> 197b8fb0f


HDFS-8457. Ozone: Refactor FsDatasetSpi to pull up HDFS-agnostic functionality into parent interface. (Contributed by Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/197b8fb0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/197b8fb0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/197b8fb0

Branch: refs/heads/HDFS-7240
Commit: 197b8fb0fb2688269cf17b7db4628f00e02091a0
Parents: a229f69
Author: Arpit Agarwal <ar...@apache.org>
Authored: Sat Jun 20 14:11:09 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Jun 20 14:11:09 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-7240.txt           |   3 +
 .../hdfs/server/datanode/BPOfferService.java    |   6 +-
 .../hdfs/server/datanode/BPServiceActor.java    |  19 +-
 .../hdfs/server/datanode/BlockSender.java       |   2 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   | 151 +++++++-----
 .../hdfs/server/datanode/DataXceiver.java       |  16 +-
 .../hdfs/server/datanode/DirectoryScanner.java  |  10 +-
 .../hdfs/server/datanode/VolumeScanner.java     |   3 +-
 .../server/datanode/fsdataset/DatasetSpi.java   | 232 +++++++++++++++++++
 .../server/datanode/fsdataset/FsDatasetSpi.java | 219 ++---------------
 .../server/datanode/fsdataset/FsVolumeSpi.java  |  29 +--
 .../server/datanode/fsdataset/VolumeSpi.java    |  72 ++++++
 .../fsdataset/impl/FsDatasetFactory.java        |  24 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   5 +
 .../hdfs/TestWriteBlockGetsBlockLengthHint.java |   2 +-
 .../server/datanode/SimulatedFSDataset.java     |  12 +-
 .../server/datanode/TestBPOfferService.java     |   6 +-
 .../datanode/TestDataNodeInitStorage.java       |   2 +-
 .../server/datanode/TestSimulatedFSDataset.java |   4 +-
 .../extdataset/ExternalDatasetImpl.java         |   5 +
 20 files changed, 488 insertions(+), 334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
index a259170..edd7637 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
@@ -10,3 +10,6 @@
     HDFS-8641. OzoneHandler : Add Quota Support. (Anu Engineer via
     Arpit Agarwal)
 
+    HDFS-8457. Ozone: Refactor FsDatasetSpi to pull up HDFS-agnostic
+    functionality into parent interface. (Arpit Agarwal)
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 092a8f8..0392a2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
 import org.apache.hadoop.hdfs.server.protocol.*;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 
@@ -71,7 +71,7 @@ class BPOfferService {
   
   private final DataNode dn;
 
-  private FsDatasetSpi<?> dataset = null;
+  private DatasetSpi<?> dataset = null;
 
   /**
    * A reference to the BPServiceActor associated with the currently
@@ -306,7 +306,7 @@ class BPOfferService {
    * verifies that this namespace matches (eg to prevent a misconfiguration
    * where a StandbyNode from a different cluster is specified)
    */
-  FsDatasetSpi<?> verifyAndSetNamespaceInfo(NamespaceInfo nsInfo)
+  DatasetSpi<?> verifyAndSetNamespaceInfo(NamespaceInfo nsInfo)
       throws IOException {
     writeLock();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index c906636..361ba76 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -45,7 +45,9 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -113,7 +115,7 @@ class BPServiceActor implements Runnable {
   private volatile boolean sendImmediateIBR = false;
   private volatile boolean shouldServiceRun = true;
   private final DataNode dn;
-  private FsDatasetSpi<?> dataset = null;
+  private DatasetSpi<? extends VolumeSpi> dataset = null;
   private final DNConf dnConf;
   private long prevBlockReportId;
 
@@ -225,7 +227,7 @@ class BPServiceActor implements Runnable {
     // This also initializes our block pool in the DN if we are
     // the first NN connection for this BP.
     dataset = bpos.verifyAndSetNamespaceInfo(nsInfo);
-    
+
     // Second phase of the handshake with the NN.
     register(nsInfo);
   }
@@ -508,8 +510,7 @@ class BPServiceActor implements Runnable {
   }
 
   DatanodeCommand cacheReport() throws IOException {
-    // If caching is disabled, do not send a cache report
-    if (dataset.getCacheCapacity() == 0) {
+    if (!dataset.isCachingSupported()) {
       return null;
     }
     // send cache report if timer has expired.
@@ -552,10 +553,14 @@ class BPServiceActor implements Runnable {
         dataset.getVolumeFailureSummary();
     int numFailedVolumes = volumeFailureSummary != null ?
         volumeFailureSummary.getFailedStorageLocations().length : 0;
+
+    FSDatasetMBean mbean = (dataset instanceof FSDatasetMBean) ?
+        ((FSDatasetMBean) dataset) : null;
+
     return bpNamenode.sendHeartbeat(bpRegistration,
         reports,
-        dataset.getCacheCapacity(),
-        dataset.getCacheUsed(),
+        mbean != null ? mbean.getCacheCapacity() : 0,
+        mbean != null ? mbean.getCacheUsed() : 0,
         dn.getXmitsInProgress(),
         dn.getXceiverCount(),
         numFailedVolumes,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 5f4ea10..2c8f5d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -456,7 +456,7 @@ class BlockSender implements java.io.Closeable {
   private static Replica getReplica(ExtendedBlock block, DataNode datanode)
       throws ReplicaNotFoundException {
     final FsDatasetSpi<?> dataset =
-        datanode.getFSDataset(block.getBlockPoolId());
+        (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
     Replica replica =
         dataset.getReplica(block.getBlockPoolId(), block.getBlockId());
     if (replica == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 7dba2af..35b97af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -152,8 +152,10 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -284,20 +286,18 @@ public class DataNode extends ReconfigurableBase
   private boolean shutdownInProgress = false;
   private BlockPoolManager blockPoolManager;
 
-  private final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> datasetFactory;
+  private final DatasetSpi.Factory datasetFactory;
 
   // This is an onto (many-one) mapping. Multiple block pool IDs may share
   // the same dataset.
   private volatile Map<String,
-      FsDatasetSpi<? extends FsVolumeSpi>> datasetsMap =
+      DatasetSpi<? extends VolumeSpi>> datasetsMap =
       new ConcurrentHashMap<>();
 
-  // Hash set of datasets, used to avoid having to deduplicate the values of datasetsMap
-  // every time we need to iterate over all datasets.
-  private volatile Set<FsDatasetSpi<? extends FsVolumeSpi>> datasets =
-      Collections.newSetFromMap(
-          new ConcurrentHashMap<FsDatasetSpi<? extends FsVolumeSpi>,
-                                Boolean>());
+  // Hash set of datasets, used to avoid having to deduplicate the
+  // values of datasetsMap every time we need to iterate over all datasets.
+  private volatile Map<NodeType, DatasetSpi<? extends VolumeSpi>> datasets =
+          new ConcurrentHashMap<>();
 
   private String clusterId = null;
 
@@ -340,7 +340,7 @@ public class DataNode extends ReconfigurableBase
   private boolean hasAnyBlockPoolRegistered = false;
   
   private final BlockScanner blockScanner;
-  private Map<FsDatasetSpi<?>, DirectoryScanner> directoryScannersMap =
+  private Map<DatasetSpi<?>, DirectoryScanner> directoryScannersMap =
       new ConcurrentHashMap<>();
   
   /** Activated plug-ins. */
@@ -618,7 +618,7 @@ public class DataNode extends ReconfigurableBase
             @Override
             public IOException call() {
               try {
-                for (FsDatasetSpi<?> dataset : datasets) {
+                for (DatasetSpi<?> dataset : datasets.values()) {
                   dataset.addVolume(location, nsInfos);
                 }
               } catch (IOException e) {
@@ -723,7 +723,7 @@ public class DataNode extends ReconfigurableBase
 
     IOException ioe = null;
     // Remove volumes and block infos from FsDataset.
-    for (final FsDatasetSpi<?> dataset : datasets) {
+    for (final DatasetSpi<?> dataset : datasets.values()) {
       dataset.removeVolumes(absoluteVolumePaths, clearFailure);
     }
 
@@ -896,7 +896,7 @@ public class DataNode extends ReconfigurableBase
    * See {@link DirectoryScanner}
    */
   private synchronized void initDirectoryScanners(Configuration conf) {
-    for (FsDatasetSpi<?> dataset : datasets) {
+    for (DatasetSpi<?> dataset : datasets.values()) {
       if (directoryScannersMap.get(dataset) != null) {
         continue;
       }
@@ -1029,7 +1029,7 @@ public class DataNode extends ReconfigurableBase
    */
   public void reportBadBlocks(ExtendedBlock block) throws IOException{
     BPOfferService bpos = getBPOSForBlock(block);
-    FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block);
+    VolumeSpi volume = getDataset(block.getBlockPoolId()).getVolume(block);
     bpos.reportBadBlocks(
         block, volume.getStorageID(), volume.getStorageType());
   }
@@ -1344,7 +1344,7 @@ public class DataNode extends ReconfigurableBase
 
       blockScanner.disableBlockPoolId(bpId);
 
-      FsDatasetSpi<?> dataset = getFSDataset(bpId);
+      DatasetSpi<?> dataset = getDataset(bpId);
       if (dataset != null) {
         dataset.shutdownBlockPool(bpId);
       }
@@ -1367,7 +1367,7 @@ public class DataNode extends ReconfigurableBase
    * @param bpos Block pool offer service
    * @throws IOException if the NN is inconsistent with the local storage.
    */
-  FsDatasetSpi<?> initBlockPool(BPOfferService bpos) throws IOException {
+  DatasetSpi<?> initBlockPool(BPOfferService bpos) throws IOException {
     NamespaceInfo nsInfo = bpos.getNamespaceInfo();
     if (nsInfo == null) {
       throw new IOException("NamespaceInfo not found: Block pool " + bpos
@@ -1381,14 +1381,15 @@ public class DataNode extends ReconfigurableBase
     
     // In the case that this is the first block pool to connect, initialize
     // the dataset, block scanners, etc.
-    FsDatasetSpi<?> dataset = initStorage(bpos.getBlockPoolId(), nsInfo);
+    DatasetSpi<?> dataset = initStorage(bpos.getBlockPoolId(), nsInfo);
 
     // Exclude failed disks before initializing the block pools to avoid startup
     // failures.
-    checkDiskError(getFSDataset(nsInfo.getBlockPoolID()));
+    checkDiskError(getDataset(nsInfo.getBlockPoolID()));
 
     initDirectoryScanners(conf);
-    dataset.addBlockPool(nsInfo.getBlockPoolID(), conf);
+    getDataset(nsInfo.getBlockPoolID()).addBlockPool(
+        nsInfo.getBlockPoolID(), conf);
     blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
     return dataset;
   }
@@ -1405,7 +1406,7 @@ public class DataNode extends ReconfigurableBase
    * Initializes the {@link #data}. The initialization is done only once, when
    * handshake with the the first namenode is completed.
    */
-  private FsDatasetSpi<?> initStorage(
+  private DatasetSpi<?> initStorage(
       final String blockPoolId, final NamespaceInfo nsInfo) throws IOException {
     if (!datasetFactory.isSimulated()) {
       final StartupOption startOpt = getStartupOption(conf);
@@ -1562,14 +1563,18 @@ public class DataNode extends ReconfigurableBase
     return maxNumberOfBlocksToLog;
   }
 
+  /**
+   * Only valid for blocks stored by FsDatasetSpi instances.
+   */
   @Override
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
       Token<BlockTokenIdentifier> token) throws IOException {
     checkBlockLocalPathAccess();
     checkBlockToken(block, token, BlockTokenIdentifier.AccessMode.READ);
-    FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId());
+    DatasetSpi<?> dataset = getDataset(block.getBlockPoolId());
     Preconditions.checkNotNull(dataset, "Storage not yet initialized");
-    BlockLocalPathInfo info = dataset.getBlockLocalPathInfo(block);
+    BlockLocalPathInfo info =
+        ((FsDatasetSpi<?>) dataset).getBlockLocalPathInfo(block);
     if (LOG.isDebugEnabled()) {
       if (info != null) {
         if (LOG.isTraceEnabled()) {
@@ -1604,6 +1609,9 @@ public class DataNode extends ReconfigurableBase
     }
   }
 
+  /**
+   * Only valid for blocks stored by FsDatasetSpi instances.
+   */
   FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> token, int maxVersion) 
           throws ShortCircuitFdsUnsupportedException,
@@ -1624,7 +1632,8 @@ public class DataNode extends ReconfigurableBase
     FileInputStream fis[] = new FileInputStream[2];
     
     try {
-      final FsDatasetSpi<?> dataset = getFSDataset(blk.getBlockPoolId());
+      final FsDatasetSpi<?> dataset =
+          (FsDatasetSpi<?>) getDataset(blk.getBlockPoolId());
       Preconditions.checkNotNull(dataset, "Storage not yet initialized");
       fis[0] = (FileInputStream) dataset.getBlockInputStream(blk, 0);
       fis[1] = DatanodeUtil.getMetaDataInputStream(blk, dataset);
@@ -1636,6 +1645,9 @@ public class DataNode extends ReconfigurableBase
     return fis;
   }
 
+  /**
+   * Only valid for blocks stored by FsDatasetSpi instances.
+   */
   @Override
   public HdfsBlocksMetadata getHdfsBlocksMetadata(
       String bpId, long[] blockIds,
@@ -1656,7 +1668,7 @@ public class DataNode extends ReconfigurableBase
 
     DataNodeFaultInjector.get().getHdfsBlocksMetadata();
 
-    final FsDatasetSpi<?> dataset = getFSDataset(bpId);
+    final FsDatasetSpi<?> dataset = (FsDatasetSpi<?>) getDataset(bpId);
     Preconditions.checkNotNull(dataset, "Storage not yet initialized");
     return dataset.getHdfsBlocksMetadata(bpId, blockIds);
   }
@@ -1813,7 +1825,7 @@ public class DataNode extends ReconfigurableBase
         LOG.warn("Exception when unlocking storage: " + ie, ie);
       }
     }
-    for (FsDatasetSpi<?> dataset : datasets) {
+    for (DatasetSpi<?> dataset : datasets.values()) {
       dataset.shutdown();
     }
     if (metrics != null) {
@@ -1851,7 +1863,7 @@ public class DataNode extends ReconfigurableBase
     }
   }
   
-  private void handleDiskError(final FsDatasetSpi<?> dataset,
+  private void handleDiskError(final DatasetSpi<?> dataset,
                                final String errMsgr) {
     final boolean hasEnoughResources = dataset.hasEnoughResource();
     LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
@@ -1912,12 +1924,15 @@ public class DataNode extends ReconfigurableBase
 
   private void reportBadBlock(final BPOfferService bpos,
       final ExtendedBlock block, final String msg) {
-    FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block);
+    VolumeSpi volume = getDataset(block.getBlockPoolId()).getVolume(block);
     bpos.reportBadBlocks(
         block, volume.getStorageID(), volume.getStorageType());
     LOG.warn(msg);
   }
 
+  /**
+   * Only valid for blocks stored by FsDatasetSpi instances.
+   */
   private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
       StorageType[] xferTargetStorageTypes) throws IOException {
     BPOfferService bpos = getBPOSForBlock(block);
@@ -1927,7 +1942,9 @@ public class DataNode extends ReconfigurableBase
     boolean replicaStateNotFinalized = false;
     boolean blockFileNotExist = false;
     boolean lengthTooShort = false;
-    final FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId());
+
+    final FsDatasetSpi<?> dataset =
+        (FsDatasetSpi<?>) getDataset(block.getBlockPoolId());
     Preconditions.checkNotNull(dataset, "Storage not yet initialized");
 
     try {
@@ -1964,10 +1981,8 @@ public class DataNode extends ReconfigurableBase
       // Shorter on-disk len indicates corruption so report NN
       // the corrupt block
       reportBadBlock(bpos, block, "Can't replicate block " + block
-          + " because on-disk length "
-          + getFSDataset(block.getBlockPoolId()).getLength(block)
-          + " is shorter than NameNode recorded length "
-          + block.getNumBytes());
+          + " because on-disk length " + dataset.getLength(block)
+          + " is shorter than NameNode recorded length " + block.getNumBytes());
       return;
     }
     
@@ -2174,7 +2189,7 @@ public class DataNode extends ReconfigurableBase
             DFSUtil.getSmallBufferSize(conf)));
         in = new DataInputStream(unbufIn);
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, true, DataNode.this, getFSDataset(b.getBlockPoolId()),
+            false, false, true, DataNode.this, data,
             null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
@@ -2532,17 +2547,22 @@ public class DataNode extends ReconfigurableBase
    * @return
    * @throws IOException
    */
-  private FsDatasetSpi<?> allocateFsDataset(
+  private synchronized DatasetSpi<?> allocateFsDataset(
       final String bpid, final NodeType serviceType) throws IOException {
-    FsDatasetSpi<?> dataset =
-        datasetFactory.newInstance(this, storage, conf, serviceType);
-    datasets.add(dataset);
+
+    DatasetSpi<?> dataset = datasets.get(serviceType);
+    if (dataset != null) {
+      datasetsMap.put(bpid, dataset);
+      return dataset;
+    }
+
+    dataset = datasetFactory.newInstance(this, storage, conf, serviceType);
+    datasets.put(serviceType, dataset);
     datasetsMap.put(bpid, dataset);
 
     if (serviceType == NodeType.NAME_NODE) {
       // 'data' is retained for existing mock-based HDFS unit tests.
-      Preconditions.checkState(data == null || data == dataset);
-      data = dataset;
+      data = (FsDatasetSpi<? extends FsVolumeSpi>) dataset;
     }
 
     return dataset;
@@ -2555,22 +2575,17 @@ public class DataNode extends ReconfigurableBase
    * @return the fsdataset that stores the blocks
    */
   @VisibleForTesting
-  public FsDatasetSpi<?> getFSDataset(final String bpid) {
+  public DatasetSpi<?> getDataset(final String bpid) {
     return datasetsMap.get(bpid);
   }
 
-  @VisibleForTesting
-  public Set<FsDatasetSpi<?>> getFSDatasets() {
-    return datasets;
-  }
-
   /**
    * Do NOT use this method outside of tests.
    * Retained for compatibility with existing tests and subject to removal.
    *
    * @return the fsdataset that stores the blocks
    */
-  @VisibleForTesting
+  @Deprecated
   public FsDatasetSpi<?> getFSDataset() {
     Preconditions.checkState(datasets.size() <= 1,
         "Did not expect more than one Dataset here.");
@@ -2578,7 +2593,7 @@ public class DataNode extends ReconfigurableBase
     if (datasets.size() == 0) {
       return null;
     }
-    return (FsDatasetSpi<?>) datasets.iterator().next();
+    return (FsDatasetSpi<?>) datasets.values().iterator().next();
   }
 
   @VisibleForTesting
@@ -2593,7 +2608,7 @@ public class DataNode extends ReconfigurableBase
    *
    * @return
    */
-  @VisibleForTesting
+  @Deprecated
   DirectoryScanner getDirectoryScanner() {
     return directoryScannersMap.get(getFSDataset());
   }
@@ -2651,12 +2666,16 @@ public class DataNode extends ReconfigurableBase
     return d;
   }
 
-  // InterDataNodeProtocol implementation
-  @Override // InterDatanodeProtocol
+  /**
+   * InterDatanodeProtocol implementation.
+   *
+   * Only valid for blocks stored by FsDatasetSpi instances.
+   */
+  @Override
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
   throws IOException {
     final FsDatasetSpi<?> dataset =
-        getFSDataset(rBlock.getBlock().getBlockPoolId());
+        (FsDatasetSpi<?>) getDataset(rBlock.getBlock().getBlockPoolId());
     if (dataset != null) {
       return dataset.initReplicaRecovery(rBlock);
     }
@@ -2684,10 +2703,10 @@ public class DataNode extends ReconfigurableBase
   public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
       final long recoveryId, final long newBlockId, final long newLength)
       throws IOException {
-    final String storageID =
-        getFSDataset(oldBlock.getBlockPoolId())
-            .updateReplicaUnderRecovery(oldBlock, recoveryId,
-                                        newBlockId, newLength);
+    final FsDatasetSpi<?> dataset =
+        (FsDatasetSpi<?>) getDataset(oldBlock.getBlockPoolId());
+    final String storageID = dataset.updateReplicaUnderRecovery(
+        oldBlock, recoveryId, newBlockId, newLength);
     // Notify the namenode of the updated block info. This is important
     // for HA, since otherwise the standby node may lose track of the
     // block locations until the next block report.
@@ -2924,10 +2943,14 @@ public class DataNode extends ReconfigurableBase
         + ")");
   }
 
+  /**
+   * Only valid for blocks stored by FsDatasetSpi instances.
+   */
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
     checkReadAccess(block);
-    return getFSDataset(block.getBlockPoolId()).getReplicaVisibleLength(block);
+    return ((FsDatasetSpi<?>) getDataset(block.getBlockPoolId()))
+        .getReplicaVisibleLength(block);
   }
 
   private void checkReadAccess(final ExtendedBlock block) throws IOException {
@@ -2951,6 +2974,7 @@ public class DataNode extends ReconfigurableBase
 
   /**
    * Transfer a replica to the datanode targets.
+   * Only valid for blocks stored by FsDatasetSpi instances.
    * @param b the block to transfer.
    *          The corresponding replica must be an RBW or a Finalized.
    *          Its GS and numBytes will be set to
@@ -2964,7 +2988,8 @@ public class DataNode extends ReconfigurableBase
     final long storedGS;
     final long visible;
     final BlockConstructionStage stage;
-    final FsDatasetSpi<?> dataset = getFSDataset(b.getBlockPoolId());
+    final FsDatasetSpi<?> dataset =
+        (FsDatasetSpi<?>) getDataset(b.getBlockPoolId());
 
     //get replica information
     synchronized(dataset) {
@@ -3090,7 +3115,10 @@ public class DataNode extends ReconfigurableBase
     conf = new Configuration();
     refreshNamenodes(conf);
   }
-  
+
+  /**
+   * Only valid for blocks stored by FsDatasetSpi instances.
+   */
   @Override // ClientDatanodeProtocol
   public void deleteBlockPool(String blockPoolId, boolean force)
       throws IOException {
@@ -3104,8 +3132,9 @@ public class DataNode extends ReconfigurableBase
           "The block pool is still running. First do a refreshNamenodes to " +
           "shutdown the block pool service");
     }
-   
-    getFSDataset(blockPoolId).deleteBlockPool(blockPoolId, force);
+
+    ((FsDatasetSpi<?>) getDataset(blockPoolId))
+        .deleteBlockPool(blockPoolId, force);
   }
 
   @Override // ClientDatanodeProtocol
@@ -3260,7 +3289,7 @@ public class DataNode extends ReconfigurableBase
   /**
    * Check the disk error
    */
-  private void checkDiskError(final FsDatasetSpi<?> dataset) {
+  private void checkDiskError(final DatasetSpi<?> dataset) {
     Set<File> unhealthyDataDirs = dataset.checkDataDir();
     if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
       try {
@@ -3294,7 +3323,7 @@ public class DataNode extends ReconfigurableBase
               }
               if(tempFlag) {
                 try {
-                  for (final FsDatasetSpi<?> dataset : datasets) {
+                  for (final DatasetSpi<?> dataset : datasets.values()) {
                     checkDiskError(dataset);
                   }
                 } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index fbb8897..9702691 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -314,8 +314,8 @@ class DataXceiver extends Receiver implements Runnable {
         }
         if (slotId != null) {
           final String bpid = blk.getBlockPoolId();
-          boolean isCached = datanode.getFSDataset(bpid).
-              isCached(bpid, blk.getBlockId());
+          FsDatasetSpi<?> dataset = (FsDatasetSpi<?>) datanode.getDataset(bpid);
+          boolean isCached = dataset.isCached(bpid, blk.getBlockId());
           datanode.shortCircuitRegistry.registerSlot(
               ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
           registeredSlotId = slotId;
@@ -527,7 +527,7 @@ class DataXceiver extends Receiver implements Runnable {
         Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
 
     final FsDatasetSpi<?> dataset =
-        datanode.getFSDataset(block.getBlockPoolId());
+        (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
     if (dataset == null) {
       throw new IOException(
           "Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -640,7 +640,7 @@ class DataXceiver extends Receiver implements Runnable {
         || stage == BlockConstructionStage.TRANSFER_FINALIZED;
     long size = 0;
     final FsDatasetSpi<?> dataset =
-        datanode.getFSDataset(block.getBlockPoolId());
+        (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
     if (dataset == null) {
       throw new IOException(
           "Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -907,7 +907,7 @@ class DataXceiver extends Receiver implements Runnable {
     final byte[] buffer = new byte[4*1024];
     MessageDigest digester = MD5Hash.getDigester();
     final FsDatasetSpi<?> dataset =
-        datanode.getFSDataset(block.getBlockPoolId());
+        (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
     if (dataset == null) {
       throw new IOException(
           "Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -951,7 +951,7 @@ class DataXceiver extends Receiver implements Runnable {
         Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
     // client side now can specify a range of the block for checksum
     final FsDatasetSpi<?> dataset =
-        datanode.getFSDataset(block.getBlockPoolId());
+        (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
     if (dataset == null) {
       throw new IOException(
           "Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -1015,7 +1015,7 @@ class DataXceiver extends Receiver implements Runnable {
   public void copyBlock(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     final FsDatasetSpi<?> dataset =
-        datanode.getFSDataset(block.getBlockPoolId());
+        (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
     if (dataset == null) {
       throw new IOException(
           "Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -1109,7 +1109,7 @@ class DataXceiver extends Receiver implements Runnable {
       final String delHint,
       final DatanodeInfo proxySource) throws IOException {
     final FsDatasetSpi<?> dataset =
-        datanode.getFSDataset(block.getBlockPoolId());
+        (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
     if (dataset == null) {
       throw new IOException(
           "Unknown or unitialized blockpool " + block.getBlockPoolId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 3383d0e..cb0a6ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.util.Daemon;
@@ -309,9 +310,14 @@ public class DirectoryScanner implements Runnable {
     }
   }
 
-  DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
+  DirectoryScanner(DataNode datanode, DatasetSpi<?> dataset,
+                   Configuration conf) {
+    if (!(dataset instanceof FsDatasetSpi)) {
+      throw new IllegalArgumentException(
+          "DirectoryScanner not implemented for " + dataset.getClass());
+    }
     this.datanode = datanode;
-    this.dataset = dataset;
+    this.dataset = (FsDatasetSpi<?>) dataset;
     int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
         DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
     scanPeriodMsecs = interval * 1000L; //msec

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 2cc3516..e7c6150 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -34,6 +34,7 @@ import com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -434,7 +435,7 @@ public class VolumeScanner extends Thread {
     try {
       blockSender = new BlockSender(block, 0, -1,
           false, true, true, datanode,
-          datanode.getFSDataset(block.getBlockPoolId()),
+          (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId()),
           null, CachingStrategy.newDropBehind());
       throttler.setBandwidth(bytesPerSec);
       long bytesRead = blockSender.sendBlock(nullStream, null, throttler);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/DatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/DatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/DatasetSpi.java
new file mode 100644
index 0000000..5cd52db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/DatasetSpi.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
+import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This is a service provider interface for the underlying storage that
+ * stores replicas for a data node.
+ * The default implementation stores replicas on local drives.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface DatasetSpi<V extends VolumeSpi> {
+  /**
+   * A factory for creating {@link FsDatasetSpi} objects.
+   */
+  abstract class Factory {
+    /**
+     * @return the configured factory.
+     */
+    public static Factory getFactory(Configuration conf) {
+      @SuppressWarnings("rawtypes")
+      final Class<? extends Factory> clazz = conf.getClass(
+          DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+          FsDatasetFactory.class,
+          Factory.class);
+      return ReflectionUtils.newInstance(clazz, conf);
+    }
+
+    /**
+     * Create a new dataset object for a specific service type
+     */
+    public abstract DatasetSpi<? extends VolumeSpi> newInstance(
+        DataNode datanode, DataStorage storage, Configuration conf,
+        HdfsServerConstants.NodeType serviceType) throws IOException;
+
+    /** Does the factory create simulated objects? */
+    public boolean isSimulated() {
+      return false;
+    }
+  }
+
+  /**
+   * @return the volume that contains a replica of the block.
+   */
+  V getVolume(ExtendedBlock b);
+
+  /**
+   * Does the dataset contain the block?
+   */
+  boolean contains(ExtendedBlock block);
+
+
+  /**
+   * Add a new volume to the FsDataset.<p/>
+   *
+   * If the FSDataset supports block scanning, this function registers
+   * the new volume with the block scanner.
+   *
+   * @param location      The storage location for the new volume.
+   * @param nsInfos       Namespace information for the new volume.
+   */
+  void addVolume(
+      final StorageLocation location,
+      final List<NamespaceInfo> nsInfos) throws IOException;
+
+  /**
+   * Removes a collection of volumes from FsDataset.
+   *
+   * If the FSDataset supports block scanning, this function removes
+   * the volumes from the block scanner.
+   *
+   * @param volumes  The paths of the volumes to be removed.
+   * @param clearFailure set true to clear the failure information about the
+   *                     volumes.
+   */
+  void removeVolumes(Set<File> volumes, boolean clearFailure);
+
+  /** @return a storage with the given storage ID */
+  DatanodeStorage getStorage(final String storageUuid);
+
+  /** @return one or more storage reports for attached volumes. */
+  StorageReport[] getStorageReports(String bpid)
+      throws IOException;
+
+  /**
+   * Returns one block report per volume.
+   * @param bpid Block Pool Id
+   * @return - a map of DatanodeStorage to block report for the volume.
+   */
+  Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid);
+
+  /**
+   * Invalidates the specified blocks
+   * @param bpid Block pool Id
+   * @param invalidBlks - the blocks to be invalidated
+   * @throws IOException
+   */
+  void invalidate(String bpid, Block[] invalidBlks) throws IOException;
+
+  /**
+   * Returns info about volume failures.
+   *
+   * @return info about volume failures, possibly null
+   */
+  VolumeFailureSummary getVolumeFailureSummary();
+
+  /**
+   * Check if all the data directories are healthy
+   * @return A set of unhealthy data directories.
+   */
+  Set<File> checkDataDir();
+
+  /**
+   * Shutdown the FSDataset
+   */
+  void shutdown();
+
+  /**
+   * add new block pool ID
+   * @param bpid Block pool Id
+   * @param conf Configuration
+   */
+  void addBlockPool(String bpid, Configuration conf) throws IOException;
+
+  /**
+   * Shutdown and remove the block pool from underlying storage.
+   * @param bpid Block pool Id to be removed
+   */
+  void shutdownBlockPool(String bpid);
+
+  /**
+   * Checks how many valid storage volumes there are in the DataNode.
+   * @return true if more than the minimum number of valid volumes are left
+   * in the FSDataSet.
+   */
+  boolean hasEnoughResource();
+
+  /**
+   * Does the dataset support caching blocks?
+   *
+   * @return
+   */
+  boolean isCachingSupported();
+
+  /**
+   * Caches the specified blocks
+   * @param bpid Block pool id
+   * @param blockIds - block ids to cache
+   */
+  void cache(String bpid, long[] blockIds);
+
+  /**
+   * Uncaches the specified blocks
+   * @param bpid Block pool id
+   * @param blockIds - blocks ids to uncache
+   */
+  void uncache(String bpid, long[] blockIds);
+
+
+  /**
+   * Returns the cache report - the full list of cached block IDs of a
+   * block pool.
+   * @param   bpid Block Pool Id
+   * @return  the cache report - the full list of cached block IDs.
+   */
+  List<Long> getCacheReport(String bpid);
+
+  /**
+   * Enable 'trash' for the given dataset. When trash is enabled, files are
+   * moved to a separate trash directory instead of being deleted immediately.
+   * This can be useful for example during rolling upgrades.
+   */
+  void enableTrash(String bpid);
+
+  /**
+   * Restore trash
+   */
+  void clearTrash(String bpid);
+
+  /**
+   * @return true when trash is enabled
+   */
+  boolean trashEnabled(String bpid);
+
+  /**
+   * Create a marker file indicating that a rolling upgrade is in progress.
+   */
+  void setRollingUpgradeMarker(String bpid) throws IOException;
+
+  /**
+   * Delete the rolling upgrade marker file if it exists.
+   * @param bpid
+   */
+  void clearRollingUpgradeMarker(String bpid) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index a4672b7..7052f54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -30,72 +30,33 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * This is a service provider interface for the underlying storage that
  * stores replicas for a data node.
- * The default implementation stores replicas on local drives. 
+ * The default implementation stores replicas on local drives.
  */
 @InterfaceAudience.Private
-@InterfaceStability.Unstable
-public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
-  /**
-   * A factory for creating {@link FsDatasetSpi} objects.
-   */
-  public static abstract class Factory<D extends FsDatasetSpi<?>> {
-    /** @return the configured factory. */
-    public static Factory<?> getFactory(Configuration conf) {
-      @SuppressWarnings("rawtypes")
-      final Class<? extends Factory> clazz = conf.getClass(
-          DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
-          FsDatasetFactory.class,
-          Factory.class);
-      return ReflectionUtils.newInstance(clazz, conf);
-    }
-
-    /** Create a new dataset object for a specific service type. */
-    public abstract D newInstance(DataNode datanode,
-        DataStorage storage, Configuration conf,
-        NodeType serviceType) throws IOException;
-
-    /** Does the factory create simulated objects? */
-    public boolean isSimulated() {
-      return false;
-    }
-  }
+public interface FsDatasetSpi<V extends FsVolumeSpi>
+    extends FSDatasetMBean, DatasetSpi<FsVolumeSpi> {
 
   /**
    * It behaviors as an unmodifiable list of FsVolume. Individual FsVolume can
@@ -188,51 +149,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   public FsVolumeReferences getFsVolumeReferences();
 
-  /**
-   * Add a new volume to the FsDataset.<p/>
-   *
-   * If the FSDataset supports block scanning, this function registers
-   * the new volume with the block scanner.
-   *
-   * @param location      The storage location for the new volume.
-   * @param nsInfos       Namespace information for the new volume.
-   */
-  public void addVolume(
-      final StorageLocation location,
-      final List<NamespaceInfo> nsInfos) throws IOException;
-
-  /**
-   * Removes a collection of volumes from FsDataset.
-   *
-   * If the FSDataset supports block scanning, this function removes
-   * the volumes from the block scanner.
-   *
-   * @param volumes  The paths of the volumes to be removed.
-   * @param clearFailure set true to clear the failure information about the
-   *                     volumes.
-   */
-  public void removeVolumes(Set<File> volumes, boolean clearFailure);
-
-  /** @return a storage with the given storage ID */
-  public DatanodeStorage getStorage(final String storageUuid);
-
-  /** @return one or more storage reports for attached volumes. */
-  public StorageReport[] getStorageReports(String bpid)
-      throws IOException;
-
-  /** @return the volume that contains a replica of the block. */
-  public V getVolume(ExtendedBlock b);
-
   /** @return a volume information map (name => info). */
   public Map<String, Object> getVolumeInfoMap();
 
-  /**
-   * Returns info about volume failures.
-   *
-   * @return info about volume failures, possibly null
-   */
-  VolumeFailureSummary getVolumeFailureSummary();
-
   /** @return a list of finalized blocks for the given block pool. */
   public List<FinalizedReplica> getFinalizedBlocks(String bpid);
 
@@ -400,24 +319,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   public void unfinalizeBlock(ExtendedBlock b) throws IOException;
 
   /**
-   * Returns one block report per volume.
-   * @param bpid Block Pool Id
-   * @return - a map of DatanodeStorage to block report for the volume.
-   */
-  public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid);
-
-  /**
-   * Returns the cache report - the full list of cached block IDs of a
-   * block pool.
-   * @param   bpid Block Pool Id
-   * @return  the cache report - the full list of cached block IDs.
-   */
-  public List<Long> getCacheReport(String bpid);
-
-  /** Does the dataset contain the block? */
-  public boolean contains(ExtendedBlock block);
-
-  /**
    * Check if a block is valid.
    *
    * @param b           The block to check.
@@ -453,28 +354,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   public boolean isValidRbw(ExtendedBlock b);
 
   /**
-   * Invalidates the specified blocks
-   * @param bpid Block pool Id
-   * @param invalidBlks - the blocks to be invalidated
-   * @throws IOException
-   */
-  public void invalidate(String bpid, Block invalidBlks[]) throws IOException;
-
-  /**
-   * Caches the specified blocks
-   * @param bpid Block pool id
-   * @param blockIds - block ids to cache
-   */
-  public void cache(String bpid, long[] blockIds);
-
-  /**
-   * Uncaches the specified blocks
-   * @param bpid Block pool id
-   * @param blockIds - blocks ids to uncache
-   */
-  public void uncache(String bpid, long[] blockIds);
-
-  /**
    * Determine if the specified block is cached.
    * @param bpid Block pool id
    * @param blockIds - block id
@@ -482,17 +361,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   public boolean isCached(String bpid, long blockId);
 
-    /**
-     * Check if all the data directories are healthy
-     * @return A set of unhealthy data directories.
-     */
-  public Set<File> checkDataDir();
-
-  /**
-   * Shutdown the FSDataset
-   */
-  public void shutdown();
-
   /**
    * Sets the file pointer of the checksum stream so that the last checksum
    * will be overwritten
@@ -505,46 +373,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
       ReplicaOutputStreams outs, int checksumSize) throws IOException;
 
   /**
-   * Checks how many valid storage volumes there are in the DataNode.
-   * @return true if more than the minimum number of valid volumes are left 
-   * in the FSDataSet.
-   */
-  public boolean hasEnoughResource();
-
-  /**
    * Get visible length of the specified replica.
    */
   long getReplicaVisibleLength(final ExtendedBlock block) throws IOException;
 
   /**
-   * Initialize a replica recovery.
-   * @return actual state of the replica on this data-node or 
-   * null if data-node does not have the replica.
-   */
-  public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock
-      ) throws IOException;
-
-  /**
-   * Update replica's generation stamp and length and finalize it.
-   * @return the ID of storage that stores the block
-   */
-  public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
-      long recoveryId, long newBlockId, long newLength) throws IOException;
-
-  /**
-   * add new block pool ID
-   * @param bpid Block pool Id
-   * @param conf Configuration
-   */
-  public void addBlockPool(String bpid, Configuration conf) throws IOException;
-  
-  /**
-   * Shutdown and remove the block pool from underlying storage.
-   * @param bpid Block pool Id to be removed
-   */
-  public void shutdownBlockPool(String bpid) ;
-  
-  /**
    * Deletes the block pool directories. If force is false, directories are 
    * deleted only if no block files exist for the block pool. If force 
    * is true entire directory for the blockpool is deleted along with its
@@ -576,34 +409,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
       long[] blockIds) throws IOException;
 
   /**
-   * Enable 'trash' for the given dataset. When trash is enabled, files are
-   * moved to a separate trash directory instead of being deleted immediately.
-   * This can be useful for example during rolling upgrades.
-   */
-  public void enableTrash(String bpid);
-
-  /**
-   * Clear trash
-   */
-  public void clearTrash(String bpid);
-
-  /**
-   * @return true when trash is enabled
-   */
-  public boolean trashEnabled(String bpid);
-
-  /**
-   * Create a marker file indicating that a rolling upgrade is in progress.
-   */
-  public void setRollingUpgradeMarker(String bpid) throws IOException;
-
-  /**
-   * Delete the rolling upgrade marker file if it exists.
-   * @param bpid
-   */
-  public void clearRollingUpgradeMarker(String bpid) throws IOException;
-
-  /**
    * submit a sync_file_range request to AsyncDiskService
    */
   public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
@@ -644,4 +449,20 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * Confirm whether the block is deleting
    */
   public boolean isDeletingBlock(String bpid, long blockId);
+
+  /**
+   * Initialize a replica recovery.
+   * @return actual state of the replica on this data-node or
+   * null if data-node does not have the replica.
+   */
+  ReplicaRecoveryInfo initReplicaRecovery(
+      BlockRecoveryCommand.RecoveringBlock rBlock) throws IOException;
+
+  /**
+   * Update replica's generation stamp and length and finalize it.
+   * @return the ID of storage that stores the block
+   */
+  String updateReplicaUnderRecovery(
+      ExtendedBlock oldBlock, long recoveryId,
+      long newBlockId, long newLength) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index 7c7b9a7..c083a95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -24,15 +24,14 @@ import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
- * This is an interface for the underlying volume.
+ * This is an interface for the underlying volume used by DFS.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public interface FsVolumeSpi {
+public interface FsVolumeSpi extends VolumeSpi {
   /**
    * Obtain a reference object that had increased 1 reference count of the
    * volume.
@@ -42,29 +41,9 @@ public interface FsVolumeSpi {
    */
   FsVolumeReference obtainReference() throws ClosedChannelException;
 
-  /** @return the StorageUuid of the volume */
-  public String getStorageID();
-
-  /** @return a list of block pools. */
-  public String[] getBlockPoolList();
-
-  /** @return the available storage space in bytes. */
-  public long getAvailable() throws IOException;
-
-  /** @return the base path to the volume */
-  public String getBasePath();
-
-  /** @return the path to the volume */
-  public String getPath(String bpid) throws IOException;
-
   /** @return the directory for the finalized blocks in the block pool. */
   public File getFinalizedDir(String bpid) throws IOException;
   
-  public StorageType getStorageType();
-
-  /** Returns true if the volume is NOT backed by persistent storage. */
-  public boolean isTransientStorage();
-
   /**
    * Reserve disk space for an RBW block so a writer does not run out of
    * space before the block is full.
@@ -187,8 +166,6 @@ public interface FsVolumeSpi {
   public BlockIterator loadBlockIterator(String bpid, String name)
       throws IOException;
 
-  /**
-   * Get the FSDatasetSpi which this volume is a part of.
-   */
+  @Override
   public FsDatasetSpi getDataset();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeSpi.java
new file mode 100644
index 0000000..e7fd741
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeSpi.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+
+/**
+ * This is an interface for the underlying volume.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface VolumeSpi {
+  /**
+   * @return the available storage space in bytes.
+   */
+  long getAvailable() throws IOException;
+
+  /**
+   * @return the base path to the volume
+   */
+  String getBasePath();
+
+  /**
+   * @return the StorageUuid of the volume
+   */
+  String getStorageID();
+
+  /**
+   * Returns true if the volume is NOT backed by persistent storage.
+   */
+  boolean isTransientStorage();
+
+  /**
+   * @return a list of block pools.
+   */
+  String[] getBlockPoolList();
+
+  /**
+   * @return the path to the volume
+   */
+  String getPath(String bpid) throws IOException;
+
+  /**
+   * Return the StorageType i.e. media type of this volume.
+   * @return
+   */
+  StorageType getStorageType();
+
+  /**
+   * Get the DatasetSpi which this volume is a part of.
+   */
+  DatasetSpi getDataset();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
index 01c3830..4efc0b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
@@ -18,38 +18,30 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeSpi;
 
 /**
  * A factory for creating {@link FsDatasetImpl} objects.
  */
-public class FsDatasetFactory extends FsDatasetSpi.Factory<FsDatasetImpl> {
+public class FsDatasetFactory extends DatasetSpi.Factory {
 
-  private final Map<NodeType, FsDatasetImpl> datasetMap = new HashMap<>();
 
   @Override
-  public synchronized FsDatasetImpl newInstance(DataNode datanode,
-      DataStorage storage, Configuration conf,
+  public synchronized DatasetSpi<? extends VolumeSpi> newInstance(
+      DataNode datanode, DataStorage storage, Configuration conf,
       NodeType serviceType) throws IOException {
-    FsDatasetImpl dataset = datasetMap.get(serviceType);
-    if (dataset != null) {
-      return dataset;
-    }
     switch (serviceType) {
     case NAME_NODE:
-      dataset = new FsDatasetImpl(datanode, storage, conf);
-      break;
+      return new FsDatasetImpl(datanode, storage, conf);
     default:
-      throw new IllegalArgumentException("Unsupported node type " + serviceType);
+      throw new IllegalArgumentException(
+          "Unsupported node type " + serviceType);
     }
-    datasetMap.put(serviceType, dataset);
-    return dataset;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 999b827..20d1c06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2000,6 +2000,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         blockFileName, length, genstamp, volumeExecutor);
   }
 
+  @Override
+  public boolean isCachingSupported() {
+    return true;
+  }
+
   @Override // FsDatasetSpi
   public void cache(String bpid, long[] blockIds) {
     for (int i=0; i < blockIds.length; i++) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index d925b93..12e6741 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -71,7 +71,7 @@ public class TestWriteBlockGetsBlockLengthHint {
   }
 
   static class FsDatasetChecker extends SimulatedFSDataset {
-    static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
+    static class Factory extends FsDatasetSpi.Factory {
       @Override
       public SimulatedFSDataset newInstance(DataNode datanode,
           DataStorage storage, Configuration conf,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index a48582b..9404d74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -81,7 +82,7 @@ import org.apache.hadoop.util.DataChecksum;
  */
 public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public final static int BYTE_MASK = 0xff;
-  static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
+  static class Factory extends DatasetSpi.Factory {
     @Override
     public SimulatedFSDataset newInstance(DataNode datanode,
         DataStorage storage, Configuration conf,
@@ -788,13 +789,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
   }
 
-  @Override // FSDatasetSpi
+  @Override // DatasetSpi
+  public boolean isCachingSupported() {
+    return false;
+  }
+
+  @Override // DatasetSpi
   public void cache(String bpid, long[] cacheBlks) {
     throw new UnsupportedOperationException(
         "SimulatedFSDataset does not support cache operation!");
   }
 
-  @Override // FSDatasetSpi
+  @Override // DatasetSpi
   public void uncache(String bpid, long[] uncacheBlks) {
     throw new UnsupportedOperationException(
         "SimulatedFSDataset does not support uncache operation!");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 6ba044d..aa148a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -121,7 +121,7 @@ public class TestBPOfferService {
 
     // Wire the dataset to the DN.
     Mockito.doReturn(mockFSDataset).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
-    Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+    Mockito.doReturn(mockFSDataset).when(mockDn).getDataset(anyString());
   }
 
   /**
@@ -330,7 +330,7 @@ public class TestBPOfferService {
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
       when(mockDn).getMetrics();
     final AtomicInteger count = new AtomicInteger();
-    Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+    Mockito.doReturn(mockFSDataset).when(mockDn).getDataset(anyString());
     Mockito.doAnswer(new Answer<FsDatasetSpi<?>>() {
       @Override
       public FsDatasetSpi<?> answer(InvocationOnMock invocation) throws Throwable {
@@ -338,7 +338,7 @@ public class TestBPOfferService {
           throw new IOException("faked initBlockPool exception");
         }
         // The initBlockPool is called again. Now mock init is done.
-        Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+        Mockito.doReturn(mockFSDataset).when(mockDn).getDataset(anyString());
         return mockFSDataset;
       }
     }).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
index ad4135b..017223c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
@@ -42,7 +42,7 @@ public class TestDataNodeInitStorage {
   public static final Log LOG = LogFactory.getLog(TestDataNodeInitStorage.class);
 
   static private class SimulatedFsDatasetVerifier extends SimulatedFSDataset {
-    static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
+    static class Factory extends FsDatasetSpi.Factory {
       @Override
       public SimulatedFsDatasetVerifier newInstance(
           DataNode datanode, DataStorage storage,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 8dc80d5..461d5ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -114,12 +114,12 @@ public class TestSimulatedFSDataset {
   @Test
   public void testFSDatasetFactory() {
     final Configuration conf = new Configuration();
-    FsDatasetSpi.Factory<?> f = FsDatasetSpi.Factory.getFactory(conf);
+    FsDatasetSpi.Factory f = FsDatasetSpi.Factory.getFactory(conf);
     assertEquals(FsDatasetFactory.class, f.getClass());
     assertFalse(f.isSimulated());
 
     SimulatedFSDataset.setFactory(conf);
-    FsDatasetSpi.Factory<?> s = FsDatasetSpi.Factory.getFactory(conf);
+    FsDatasetSpi.Factory s = FsDatasetSpi.Factory.getFactory(conf);
     assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
     assertTrue(s.isSimulated());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 28666a0..0c2c610 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -229,6 +229,11 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
+  public boolean isCachingSupported() {
+    return false;
+  }
+
+  @Override
   public void cache(String bpid, long[] blockIds) {
   }