You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/06/01 23:58:03 UTC

hadoop git commit: HDFS-8392. DataNode support for multiple datasets. (Arpit Agarwal)

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 770ed9262 -> e721a39e2


HDFS-8392. DataNode support for multiple datasets. (Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e721a39e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e721a39e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e721a39e

Branch: refs/heads/HDFS-7240
Commit: e721a39e275ae4ee34af99b2c2450e1793b695ac
Parents: 770ed92
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Jun 1 14:57:07 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Mon Jun 1 14:57:07 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-7240.txt           |   6 +
 .../hadoop/hdfs/server/common/StorageInfo.java  |   9 +
 .../hdfs/server/datanode/BPOfferService.java    |  23 +-
 .../hdfs/server/datanode/BPServiceActor.java    |  24 +-
 .../hdfs/server/datanode/BlockReceiver.java     |  35 +--
 .../hdfs/server/datanode/BlockSender.java       |  20 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   | 269 +++++++++++++------
 .../hdfs/server/datanode/DataXceiver.java       |  70 ++++-
 .../hdfs/server/datanode/VolumeScanner.java     |   5 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |  10 +-
 .../server/datanode/fsdataset/FsVolumeSpi.java  |   4 +
 .../fsdataset/impl/FsDatasetFactory.java        |  25 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   9 +-
 .../impl/RamDiskAsyncLazyPersistService.java    |   6 +-
 .../hdfs/TestWriteBlockGetsBlockLengthHint.java |   4 +-
 .../server/datanode/SimulatedFSDataset.java     |   4 +-
 .../server/datanode/TestBPOfferService.java     |  19 +-
 .../datanode/TestDataNodeInitStorage.java       |   3 +-
 18 files changed, 384 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
new file mode 100644
index 0000000..a98d407
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
@@ -0,0 +1,6 @@
+  Breakdown of HDFS-7240 sub-tasks:
+
+    HDFS-8210. Ozone: Implement storage container manager. (Jitendra Pandey)
+
+    HDFS-8392. Ozone: DataNode support for multiple datasets. (Arpit Agarwal)
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
index 50363c9..5a19dae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
@@ -220,6 +220,15 @@ public class StorageInfo {
     this.layoutVersion = lv;
   }
 
+  /**
+   * Return the type of node serviced by this storage.
+   *
+   * @return type of node serviced by this storage.
+   */
+  public NodeType getNodeType() {
+    return storageType;
+  }
+
   public int getServiceLayoutVersion() {
     return storageType == NodeType.DATA_NODE ? HdfsServerConstants.DATANODE_LAYOUT_VERSION
         : HdfsServerConstants.NAMENODE_LAYOUT_VERSION;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 92323f1..092a8f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.protocol.*;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 
@@ -70,6 +71,8 @@ class BPOfferService {
   
   private final DataNode dn;
 
+  private FsDatasetSpi<?> dataset = null;
+
   /**
    * A reference to the BPServiceActor associated with the currently
    * ACTIVE NN. In the case that all NameNodes are in STANDBY mode,
@@ -303,7 +306,8 @@ class BPOfferService {
    * verifies that this namespace matches (eg to prevent a misconfiguration
    * where a StandbyNode from a different cluster is specified)
    */
-  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+  FsDatasetSpi<?> verifyAndSetNamespaceInfo(NamespaceInfo nsInfo)
+      throws IOException {
     writeLock();
     try {
       if (this.bpNSInfo == null) {
@@ -314,7 +318,7 @@ class BPOfferService {
         // The DN can now initialize its local storage if we are the
         // first BP to handshake, etc.
         try {
-          dn.initBlockPool(this);
+          dataset = dn.initBlockPool(this);
           success = true;
         } finally {
           if (!success) {
@@ -335,6 +339,7 @@ class BPOfferService {
     } finally {
       writeUnlock();
     }
+    return dataset;
   }
 
   /**
@@ -480,11 +485,11 @@ class BPOfferService {
     }
     String bpid = getBlockPoolId();
     if (!rollingUpgradeStatus.isFinalized()) {
-      dn.getFSDataset().enableTrash(bpid);
-      dn.getFSDataset().setRollingUpgradeMarker(bpid);
+      dataset.enableTrash(bpid);
+      dataset.setRollingUpgradeMarker(bpid);
     } else {
-      dn.getFSDataset().clearTrash(bpid);
-      dn.getFSDataset().clearRollingUpgradeMarker(bpid);
+      dataset.clearTrash(bpid);
+      dataset.clearRollingUpgradeMarker(bpid);
     }
   }
 
@@ -665,7 +670,7 @@ class BPOfferService {
       Block toDelete[] = bcmd.getBlocks();
       try {
         // using global fsdataset
-        dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
+        dataset.invalidate(bcmd.getBlockPoolId(), toDelete);
       } catch(IOException e) {
         // Exceptions caught here are not expected to be disk-related.
         throw e;
@@ -676,13 +681,13 @@ class BPOfferService {
       LOG.info("DatanodeCommand action: DNA_CACHE for " +
         blockIdCmd.getBlockPoolId() + " of [" +
           blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
-      dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+      dataset.cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
       break;
     case DatanodeProtocol.DNA_UNCACHE:
       LOG.info("DatanodeCommand action: DNA_UNCACHE for " +
         blockIdCmd.getBlockPoolId() + " of [" +
           blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
-      dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+      dataset.uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
       break;
     case DatanodeProtocol.DNA_SHUTDOWN:
       // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 63a0bb6..1fe1c9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -110,6 +111,7 @@ class BPServiceActor implements Runnable {
   private volatile boolean sendImmediateIBR = false;
   private volatile boolean shouldServiceRun = true;
   private final DataNode dn;
+  private FsDatasetSpi<?> dataset = null;
   private final DNConf dnConf;
   private long prevBlockReportId;
 
@@ -220,7 +222,7 @@ class BPServiceActor implements Runnable {
     // Verify that this matches the other NN in this HA pair.
     // This also initializes our block pool in the DN if we are
     // the first NN connection for this BP.
-    bpos.verifyAndSetNamespaceInfo(nsInfo);
+    dataset = bpos.verifyAndSetNamespaceInfo(nsInfo);
     
     // Second phase of the handshake with the NN.
     register(nsInfo);
@@ -330,7 +332,7 @@ class BPServiceActor implements Runnable {
       String storageUuid, boolean now) {
     synchronized (pendingIncrementalBRperStorage) {
       addPendingReplicationBlockInfo(
-          bInfo, dn.getFSDataset().getStorage(storageUuid));
+          bInfo, dataset.getStorage(storageUuid));
       sendImmediateIBR = true;
       // If now is true, the report is sent right away.
       // Otherwise, it will be sent out in the next heartbeat.
@@ -344,7 +346,7 @@ class BPServiceActor implements Runnable {
       ReceivedDeletedBlockInfo bInfo, String storageUuid) {
     synchronized (pendingIncrementalBRperStorage) {
       addPendingReplicationBlockInfo(
-          bInfo, dn.getFSDataset().getStorage(storageUuid));
+          bInfo, dataset.getStorage(storageUuid));
     }
   }
 
@@ -435,7 +437,7 @@ class BPServiceActor implements Runnable {
 
     long brCreateStartTime = monotonicNow();
     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
-        dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
+        dataset.getBlockReports(bpos.getBlockPoolId());
 
     // Convert the reports to the format expected by the NN.
     int i = 0;
@@ -508,7 +510,7 @@ class BPServiceActor implements Runnable {
 
   DatanodeCommand cacheReport() throws IOException {
     // If caching is disabled, do not send a cache report
-    if (dn.getFSDataset().getCacheCapacity() == 0) {
+    if (dataset.getCacheCapacity() == 0) {
       return null;
     }
     // send cache report if timer has expired.
@@ -521,7 +523,7 @@ class BPServiceActor implements Runnable {
       lastCacheReport = startTime;
 
       String bpid = bpos.getBlockPoolId();
-      List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
+      List<Long> blockIds = dataset.getCacheReport(bpid);
       long createTime = monotonicNow();
 
       cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
@@ -540,20 +542,20 @@ class BPServiceActor implements Runnable {
   
   HeartbeatResponse sendHeartBeat() throws IOException {
     StorageReport[] reports =
-        dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
+        dataset.getStorageReports(bpos.getBlockPoolId());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending heartbeat with " + reports.length +
                 " storage reports from service actor: " + this);
     }
     
-    VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
-        .getVolumeFailureSummary();
+    VolumeFailureSummary volumeFailureSummary =
+        dataset.getVolumeFailureSummary();
     int numFailedVolumes = volumeFailureSummary != null ?
         volumeFailureSummary.getFailedStorageLocations().length : 0;
     return bpNamenode.sendHeartbeat(bpRegistration,
         reports,
-        dn.getFSDataset().getCacheCapacity(),
-        dn.getFSDataset().getCacheUsed(),
+        dataset.getCacheCapacity(),
+        dataset.getCacheUsed(),
         dn.getXmitsInProgress(),
         dn.getXceiverCount(),
         numFailedVolumes,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 2e11600..686541b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -99,6 +100,7 @@ class BlockReceiver implements Closeable {
   private ReplicaOutputStreams streams;
   private DatanodeInfo srcDataNode = null;
   private final DataNode datanode;
+  private final FsDatasetSpi<?> dataset;
   volatile private boolean mirrorError;
 
   // Cache management state
@@ -141,8 +143,8 @@ class BlockReceiver implements Closeable {
       final BlockConstructionStage stage, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
       final String clientname, final DatanodeInfo srcDataNode,
-      final DataNode datanode, DataChecksum requestedChecksum,
-      CachingStrategy cachingStrategy,
+      final DataNode datanode, final FsDatasetSpi<?> dataset,
+      DataChecksum requestedChecksum, CachingStrategy cachingStrategy,
       final boolean allowLazyPersist,
       final boolean pinning) throws IOException {
     try{
@@ -152,6 +154,7 @@ class BlockReceiver implements Closeable {
       this.myAddr = myAddr;
       this.srcDataNode = srcDataNode;
       this.datanode = datanode;
+      this.dataset = dataset;
 
       this.clientname = clientname;
       this.isDatanode = clientname.length() == 0;
@@ -183,27 +186,27 @@ class BlockReceiver implements Closeable {
       // Open local disk out
       //
       if (isDatanode) { //replication or move
-        replicaHandler = datanode.data.createTemporary(storageType, block);
+        replicaHandler = dataset.createTemporary(storageType, block);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
+          replicaHandler = dataset.createRbw(storageType, block, allowLazyPersist);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
-          replicaHandler = datanode.data.recoverRbw(
+          replicaHandler = dataset.recoverRbw(
               block, newGs, minBytesRcvd, maxBytesRcvd);
           block.setGenerationStamp(newGs);
           break;
         case PIPELINE_SETUP_APPEND:
-          replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
+          replicaHandler = dataset.append(block, newGs, minBytesRcvd);
           block.setGenerationStamp(newGs);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
-          replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+          replicaHandler = dataset.recoverAppend(block, newGs, minBytesRcvd);
           block.setGenerationStamp(newGs);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());
@@ -212,7 +215,7 @@ class BlockReceiver implements Closeable {
         case TRANSFER_FINALIZED:
           // this is a transfer destination
           replicaHandler =
-              datanode.data.createTemporary(storageType, block);
+              dataset.createTemporary(storageType, block);
           break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);
@@ -717,7 +720,7 @@ class BlockReceiver implements Closeable {
         //
         if (syncBehindWrites) {
           if (syncBehindWritesInBackground) {
-            this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
+            dataset.submitBackgroundSyncFileRangeRequest(
                 block, outFd, lastCacheManagementOffset,
                 offsetInBlock - lastCacheManagementOffset,
                 NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
@@ -807,11 +810,11 @@ class BlockReceiver implements Closeable {
 
           if (stage == BlockConstructionStage.TRANSFER_RBW) {
             // for TRANSFER_RBW, convert temporary to RBW
-            datanode.data.convertTemporaryToRbw(block);
+            dataset.convertTemporaryToRbw(block);
           } else {
             // for isDatnode or TRANSFER_FINALIZED
             // Finalize the block.
-            datanode.data.finalizeBlock(block);
+            dataset.finalizeBlock(block);
           }
         }
         datanode.metrics.incrBlocksWritten();
@@ -904,7 +907,7 @@ class BlockReceiver implements Closeable {
    */
   private void cleanupBlock() throws IOException {
     if (isDatanode) {
-      datanode.data.unfinalizeBlock(block);
+      dataset.unfinalizeBlock(block);
     }
   }
 
@@ -921,7 +924,7 @@ class BlockReceiver implements Closeable {
     }
 
     // rollback the position of the meta file
-    datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
+    dataset.adjustCrcChannelPosition(block, streams, checksumSize);
   }
 
   /**
@@ -959,7 +962,7 @@ class BlockReceiver implements Closeable {
     byte[] buf = new byte[sizePartialChunk];
     byte[] crcbuf = new byte[checksumSize];
     try (ReplicaInputStreams instr =
-        datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
+        dataset.getTmpInputStreams(block, blkoff, ckoff)) {
       IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
 
       // open meta file and read in crc value computer earlier
@@ -1298,11 +1301,11 @@ class BlockReceiver implements Closeable {
         BlockReceiver.this.close();
         endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
         block.setNumBytes(replicaInfo.getNumBytes());
-        datanode.data.finalizeBlock(block);
+        dataset.finalizeBlock(block);
       }
 
       if (pinning) {
-        datanode.data.setPinning(block);
+        dataset.setPinning(block);
       }
       
       datanode.closeBlock(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 79f4dd7..5f4ea10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -147,6 +148,7 @@ class BlockSender implements java.io.Closeable {
   private final String clientTraceFmt;
   private volatile ChunkChecksum lastChunkChecksum = null;
   private DataNode datanode;
+  private final FsDatasetSpi<?> dataset;
   
   /** The file descriptor of the block being sent */
   private FileDescriptor blockInFd;
@@ -190,7 +192,8 @@ class BlockSender implements java.io.Closeable {
    */
   BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean verifyChecksum,
-              boolean sendChecksum, DataNode datanode, String clientTraceFmt,
+              boolean sendChecksum, DataNode datanode,
+              final FsDatasetSpi<?> dataset, String clientTraceFmt,
               CachingStrategy cachingStrategy)
       throws IOException {
     try {
@@ -227,6 +230,7 @@ class BlockSender implements java.io.Closeable {
         this.readaheadLength = cachingStrategy.getReadahead().longValue();
       }
       this.datanode = datanode;
+      this.dataset = dataset;
       
       if (verifyChecksum) {
         // To simplify implementation, callers may not specify verification
@@ -237,7 +241,7 @@ class BlockSender implements java.io.Closeable {
       
       final Replica replica;
       final long replicaVisibleLength;
-      synchronized(datanode.data) { 
+      synchronized(dataset) {
         replica = getReplica(block, datanode);
         replicaVisibleLength = replica.getVisibleLength();
       }
@@ -274,7 +278,7 @@ class BlockSender implements java.io.Closeable {
         (!is32Bit || length <= Integer.MAX_VALUE);
 
       // Obtain a reference before reading data
-      this.volumeRef = datanode.data.getVolume(block).obtainReference();
+      this.volumeRef = dataset.getVolume(block).obtainReference();
 
       /* 
        * (corruptChecksumOK, meta_file_exist): operation
@@ -288,7 +292,7 @@ class BlockSender implements java.io.Closeable {
         LengthInputStream metaIn = null;
         boolean keepMetaInOpen = false;
         try {
-          metaIn = datanode.data.getMetaDataInputStream(block);
+          metaIn = dataset.getMetaDataInputStream(block);
           if (!corruptChecksumOk || metaIn != null) {
             if (metaIn == null) {
               //need checksum but meta-data not found
@@ -387,7 +391,7 @@ class BlockSender implements java.io.Closeable {
       if (DataNode.LOG.isDebugEnabled()) {
         DataNode.LOG.debug("replica=" + replica);
       }
-      blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+      blockIn = dataset.getBlockInputStream(block, offset); // seek to offset
       if (blockIn instanceof FileInputStream) {
         blockInFd = ((FileInputStream)blockIn).getFD();
       } else {
@@ -451,8 +455,10 @@ class BlockSender implements java.io.Closeable {
   
   private static Replica getReplica(ExtendedBlock block, DataNode datanode)
       throws ReplicaNotFoundException {
-    Replica replica = datanode.data.getReplica(block.getBlockPoolId(),
-        block.getBlockId());
+    final FsDatasetSpi<?> dataset =
+        datanode.getFSDataset(block.getBlockPoolId());
+    Replica replica =
+        dataset.getReplica(block.getBlockPoolId(), block.getBlockId());
     if (replica == null) {
       throw new ReplicaNotFoundException(block);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index d2b2939..70109a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -79,6 +79,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -285,9 +286,33 @@ public class DataNode extends ReconfigurableBase
   volatile boolean shutdownForUpgrade = false;
   private boolean shutdownInProgress = false;
   private BlockPoolManager blockPoolManager;
-  volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
+
+  private final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> datasetFactory;
+
+  // This is an onto (many-one) mapping. Multiple block pool IDs may share
+  // the same dataset.
+  private volatile Map<String,
+      FsDatasetSpi<? extends FsVolumeSpi>> datasetsMap =
+      new ConcurrentHashMap<>();
+
+  // Hash set of datasets, used to avoid having to deduplicate the values of datasetsMap
+  // every time we need to iterate over all datasets.
+  private volatile Set<FsDatasetSpi<? extends FsVolumeSpi>> datasets =
+      Collections.newSetFromMap(
+          new ConcurrentHashMap<FsDatasetSpi<? extends FsVolumeSpi>,
+                                Boolean>());
+
   private String clusterId = null;
 
+  /**
+   * Do NOT reference this field outside of tests.
+   * It is retained to avoid breaking existing tests and subject to removal.
+   * In existing HDFS unit tests we are guaranteed not to have more than one
+   * dataset instance.
+   */
+  @VisibleForTesting
+  volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
+
   public final static String EMPTY_DEL_HINT = "";
   final AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
@@ -319,7 +344,8 @@ public class DataNode extends ReconfigurableBase
   private boolean hasAnyBlockPoolRegistered = false;
   
   private final BlockScanner blockScanner;
-  private DirectoryScanner directoryScanner = null;
+  private Map<FsDatasetSpi<?>, DirectoryScanner> directoryScannersMap =
+      new ConcurrentHashMap<>();
   
   /** Activated plug-ins. */
   private List<ServicePlugin> plugins;
@@ -373,6 +399,7 @@ public class DataNode extends ReconfigurableBase
     this.getHdfsBlockLocationsEnabled = false;
     this.blockScanner = new BlockScanner(this, conf);
     this.pipelineSupportECN = false;
+    this.datasetFactory = null;
   }
 
   /**
@@ -387,7 +414,7 @@ public class DataNode extends ReconfigurableBase
     this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
         DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
-
+    datasetFactory = FsDatasetSpi.Factory.getFactory(conf);
     this.usersWithLocalPathAccess = Arrays.asList(
         conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
     this.connectToDnViaHostname = conf.getBoolean(
@@ -595,7 +622,9 @@ public class DataNode extends ReconfigurableBase
             @Override
             public IOException call() {
               try {
-                data.addVolume(location, nsInfos);
+                for (FsDatasetSpi<?> dataset : datasets) {
+                  dataset.addVolume(location, nsInfos);
+                }
               } catch (IOException e) {
                 return e;
               }
@@ -698,7 +727,9 @@ public class DataNode extends ReconfigurableBase
 
     IOException ioe = null;
     // Remove volumes and block infos from FsDataset.
-    data.removeVolumes(absoluteVolumePaths, clearFailure);
+    for (final FsDatasetSpi<?> dataset : datasets) {
+      dataset.removeVolumes(absoluteVolumePaths, clearFailure);
+    }
 
     // Remove volumes from DataStorage.
     try {
@@ -878,36 +909,42 @@ public class DataNode extends ReconfigurableBase
   }
 
   private void shutdownPeriodicScanners() {
-    shutdownDirectoryScanner();
+    shutdownDirectoryScanners();
     blockScanner.removeAllVolumeScanners();
   }
 
   /**
    * See {@link DirectoryScanner}
    */
-  private synchronized void initDirectoryScanner(Configuration conf) {
-    if (directoryScanner != null) {
-      return;
-    }
-    String reason = null;
-    if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 
-                    DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) {
-      reason = "verification is turned off by configuration";
-    } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
-      reason = "verifcation is not supported by SimulatedFSDataset";
-    } 
-    if (reason == null) {
-      directoryScanner = new DirectoryScanner(this, data, conf);
-      directoryScanner.start();
-    } else {
-      LOG.info("Periodic Directory Tree Verification scan is disabled because " +
-                   reason);
+  private synchronized void initDirectoryScanners(Configuration conf) {
+    for (FsDatasetSpi<?> dataset : datasets) {
+      if (directoryScannersMap.get(dataset) != null) {
+        continue;
+      }
+
+      String reason = null;
+      if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
+                      DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) {
+        reason = "verification is turned off by configuration";
+      } else if ("SimulatedFSDataset".equals(
+          dataset.getClass().getSimpleName())) {
+        reason = "verifcation is not supported by SimulatedFSDataset";
+      }
+      if (reason == null) {
+        DirectoryScanner scanner = new DirectoryScanner(this, dataset, conf);
+        directoryScannersMap.put(dataset, scanner);
+        scanner.start();
+      } else {
+        LOG.info(
+            "Periodic Directory Tree Verification scan is disabled because " +
+            reason);
+      }
     }
   }
   
-  private synchronized void shutdownDirectoryScanner() {
-    if (directoryScanner != null) {
-      directoryScanner.shutdown();
+  private synchronized void shutdownDirectoryScanners() {
+    for (DirectoryScanner scanner : directoryScannersMap.values()) {
+      scanner.shutdown();
     }
   }
   
@@ -1013,7 +1050,7 @@ public class DataNode extends ReconfigurableBase
    */
   public void reportBadBlocks(ExtendedBlock block) throws IOException{
     BPOfferService bpos = getBPOSForBlock(block);
-    FsVolumeSpi volume = getFSDataset().getVolume(block);
+    FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block);
     bpos.reportBadBlocks(
         block, volume.getStorageID(), volume.getStorageType());
   }
@@ -1328,8 +1365,9 @@ public class DataNode extends ReconfigurableBase
 
       blockScanner.disableBlockPoolId(bpId);
 
-      if (data != null) {
-        data.shutdownBlockPool(bpId);
+      FsDatasetSpi<?> dataset = getFSDataset(bpId);
+      if (dataset != null) {
+        dataset.shutdownBlockPool(bpId);
       }
 
       if (storage != null) {
@@ -1350,7 +1388,7 @@ public class DataNode extends ReconfigurableBase
    * @param bpos Block pool offer service
    * @throws IOException if the NN is inconsistent with the local storage.
    */
-  void initBlockPool(BPOfferService bpos) throws IOException {
+  FsDatasetSpi<?> initBlockPool(BPOfferService bpos) throws IOException {
     NamespaceInfo nsInfo = bpos.getNamespaceInfo();
     if (nsInfo == null) {
       throw new IOException("NamespaceInfo not found: Block pool " + bpos
@@ -1364,15 +1402,16 @@ public class DataNode extends ReconfigurableBase
     
     // In the case that this is the first block pool to connect, initialize
     // the dataset, block scanners, etc.
-    initStorage(nsInfo);
+    FsDatasetSpi<?> dataset = initStorage(bpos.getBlockPoolId(), nsInfo);
 
     // Exclude failed disks before initializing the block pools to avoid startup
     // failures.
-    checkDiskError();
+    checkDiskError(getFSDataset(nsInfo.getBlockPoolID()));
 
-    initDirectoryScanner(conf);
-    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
+    initDirectoryScanners(conf);
+    dataset.addBlockPool(nsInfo.getBlockPoolID(), conf);
     blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
+    return dataset;
   }
 
   List<BPOfferService> getAllBpOs() {
@@ -1387,11 +1426,9 @@ public class DataNode extends ReconfigurableBase
    * Initializes the {@link #data}. The initialization is done only once, when
    * handshake with the the first namenode is completed.
    */
-  private void initStorage(final NamespaceInfo nsInfo) throws IOException {
-    final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
-        = FsDatasetSpi.Factory.getFactory(conf);
-    
-    if (!factory.isSimulated()) {
+  private FsDatasetSpi<?> initStorage(
+      final String blockPoolId, final NamespaceInfo nsInfo) throws IOException {
+    if (!datasetFactory.isSimulated()) {
       final StartupOption startOpt = getStartupOption(conf);
       if (startOpt == null) {
         throw new IOException("Startup option not set.");
@@ -1409,12 +1446,7 @@ public class DataNode extends ReconfigurableBase
 
     // If this is a newly formatted DataNode then assign a new DatanodeUuid.
     checkDatanodeUuid();
-
-    synchronized(this)  {
-      if (data == null) {
-        data = factory.newInstance(this, storage, conf);
-      }
-    }
+    return allocateFsDataset(blockPoolId, nsInfo.getNodeType());
   }
 
   /**
@@ -1556,8 +1588,9 @@ public class DataNode extends ReconfigurableBase
       Token<BlockTokenIdentifier> token) throws IOException {
     checkBlockLocalPathAccess();
     checkBlockToken(block, token, BlockTokenIdentifier.AccessMode.READ);
-    Preconditions.checkNotNull(data, "Storage not yet initialized");
-    BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+    FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId());
+    Preconditions.checkNotNull(dataset, "Storage not yet initialized");
+    BlockLocalPathInfo info = dataset.getBlockLocalPathInfo(block);
     if (LOG.isDebugEnabled()) {
       if (info != null) {
         if (LOG.isTraceEnabled()) {
@@ -1612,8 +1645,10 @@ public class DataNode extends ReconfigurableBase
     FileInputStream fis[] = new FileInputStream[2];
     
     try {
-      fis[0] = (FileInputStream)data.getBlockInputStream(blk, 0);
-      fis[1] = DatanodeUtil.getMetaDataInputStream(blk, data);
+      final FsDatasetSpi<?> dataset = getFSDataset(blk.getBlockPoolId());
+      Preconditions.checkNotNull(dataset, "Storage not yet initialized");
+      fis[0] = (FileInputStream) dataset.getBlockInputStream(blk, 0);
+      fis[1] = DatanodeUtil.getMetaDataInputStream(blk, dataset);
     } catch (ClassCastException e) {
       LOG.debug("requestShortCircuitFdsForRead failed", e);
       throw new ShortCircuitFdsUnsupportedException("This DataNode's " +
@@ -1642,7 +1677,9 @@ public class DataNode extends ReconfigurableBase
 
     DataNodeFaultInjector.get().getHdfsBlocksMetadata();
 
-    return data.getHdfsBlocksMetadata(bpId, blockIds);
+    final FsDatasetSpi<?> dataset = getFSDataset(bpId);
+    Preconditions.checkNotNull(dataset, "Storage not yet initialized");
+    return dataset.getHdfsBlocksMetadata(bpId, blockIds);
   }
   
   private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
@@ -1804,8 +1841,8 @@ public class DataNode extends ReconfigurableBase
         LOG.warn("Exception when unlocking storage: " + ie, ie);
       }
     }
-    if (data != null) {
-      data.shutdown();
+    for (FsDatasetSpi<?> dataset : datasets) {
+      dataset.shutdown();
     }
     if (metrics != null) {
       metrics.shutdown();
@@ -1842,8 +1879,9 @@ public class DataNode extends ReconfigurableBase
     }
   }
   
-  private void handleDiskError(String errMsgr) {
-    final boolean hasEnoughResources = data.hasEnoughResource();
+  private void handleDiskError(final FsDatasetSpi<?> dataset,
+                               final String errMsgr) {
+    final boolean hasEnoughResources = dataset.hasEnoughResource();
     LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
     
     // If we have enough active valid volumes then we do not want to 
@@ -1902,7 +1940,7 @@ public class DataNode extends ReconfigurableBase
 
   private void reportBadBlock(final BPOfferService bpos,
       final ExtendedBlock block, final String msg) {
-    FsVolumeSpi volume = getFSDataset().getVolume(block);
+    FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block);
     bpos.reportBadBlocks(
         block, volume.getStorageID(), volume.getStorageType());
     LOG.warn(msg);
@@ -1917,9 +1955,11 @@ public class DataNode extends ReconfigurableBase
     boolean replicaStateNotFinalized = false;
     boolean blockFileNotExist = false;
     boolean lengthTooShort = false;
+    final FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId());
+    Preconditions.checkNotNull(dataset, "Storage not yet initialized");
 
     try {
-      data.checkBlock(block, block.getNumBytes(), ReplicaState.FINALIZED);
+      dataset.checkBlock(block, block.getNumBytes(), ReplicaState.FINALIZED);
     } catch (ReplicaNotFoundException e) {
       replicaNotExist = true;
     } catch (UnexpectedReplicaStateException e) {
@@ -1949,10 +1989,13 @@ public class DataNode extends ReconfigurableBase
     }
     if (lengthTooShort) {
       // Check if NN recorded length matches on-disk length 
-      // Shorter on-disk len indicates corruption so report NN the corrupt block
+      // Shorter on-disk len indicates corruption so report NN
+      // the corrupt block
       reportBadBlock(bpos, block, "Can't replicate block " + block
-          + " because on-disk length " + data.getLength(block) 
-          + " is shorter than NameNode recorded length " + block.getNumBytes());
+          + " because on-disk length "
+          + getFSDataset(block.getBlockPoolId()).getLength(block)
+          + " is shorter than NameNode recorded length "
+          + block.getNumBytes());
       return;
     }
     
@@ -2159,7 +2202,8 @@ public class DataNode extends ReconfigurableBase
             DFSUtil.getSmallBufferSize(conf)));
         in = new DataInputStream(unbufIn);
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, true, DataNode.this, null, cachingStrategy);
+            false, false, true, DataNode.this, getFSDataset(b.getBlockPoolId()),
+            null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
         new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
@@ -2447,8 +2491,10 @@ public class DataNode extends ReconfigurableBase
 
   @Override
   public String toString() {
-    return "DataNode{data=" + data + ", localName='" + getDisplayName()
-        + "', datanodeUuid='" + storage.getDatanodeUuid() + "', xmitsInProgress="
+    return "DataNode{datasets=" + datasets.toString()
+        + ", localName='" + getDisplayName()
+        + "', datanodeUuid='" + storage.getDatanodeUuid()
+        + "', xmitsInProgress="
         + xmitsInProgress.get() + "}";
   }
 
@@ -2506,14 +2552,61 @@ public class DataNode extends ReconfigurableBase
   }
 
   /**
+   * Allocate a new dataset for the given serviceType. This may return a
+   * previously allocated dataset.
+   *
+   * @param bpid
+   * @param serviceType
+   * @return
+   * @throws IOException
+   */
+  private FsDatasetSpi<?> allocateFsDataset(
+      final String bpid, final NodeType serviceType) throws IOException {
+    FsDatasetSpi<?> dataset =
+        datasetFactory.newInstance(this, storage, conf, serviceType);
+    datasets.add(dataset);
+    datasetsMap.put(bpid, dataset);
+
+    if (serviceType == NodeType.NAME_NODE) {
+      // 'data' is retained for existing mock-based HDFS unit tests.
+      Preconditions.checkState(data == null || data == dataset);
+      data = dataset;
+    }
+
+    return dataset;
+  }
+
+  /**
    * Examples are adding and deleting blocks directly.
    * The most common usage will be when the data node's storage is simulated.
    * 
    * @return the fsdataset that stores the blocks
    */
   @VisibleForTesting
+  public FsDatasetSpi<?> getFSDataset(final String bpid) {
+    return datasetsMap.get(bpid);
+  }
+
+  @VisibleForTesting
+  public Set<FsDatasetSpi<?>> getFSDatasets() {
+    return datasets;
+  }
+
+  /**
+   * Do NOT use this method outside of tests.
+   * Retained for compatibility with existing tests and subject to removal.
+   *
+   * @return the fsdataset that stores the blocks
+   */
+  @VisibleForTesting
   public FsDatasetSpi<?> getFSDataset() {
-    return data;
+    Preconditions.checkState(datasets.size() <= 1,
+        "Did not expect more than one Dataset here.");
+
+    if (datasets.size() == 0) {
+      return null;
+    }
+    return (FsDatasetSpi<?>) datasets.iterator().next();
   }
 
   @VisibleForTesting
@@ -2522,9 +2615,15 @@ public class DataNode extends ReconfigurableBase
     return blockScanner;
   }
 
+  /**
+   * Do NOT use this method outside of tests.
+   * Retained for compatibility with existing tests and subject to removal.
+   *
+   * @return
+   */
   @VisibleForTesting
   DirectoryScanner getDirectoryScanner() {
-    return directoryScanner;
+    return directoryScannersMap.get(getFSDataset());
   }
 
   public static void secureMain(String args[], SecureResources resources) {
@@ -2584,7 +2683,12 @@ public class DataNode extends ReconfigurableBase
   @Override // InterDatanodeProtocol
   public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
   throws IOException {
-    return data.initReplicaRecovery(rBlock);
+    final FsDatasetSpi<?> dataset =
+        getFSDataset(rBlock.getBlock().getBlockPoolId());
+    if (dataset != null) {
+      return dataset.initReplicaRecovery(rBlock);
+    }
+    return null;
   }
 
   /**
@@ -2608,8 +2712,10 @@ public class DataNode extends ReconfigurableBase
   public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
       final long recoveryId, final long newBlockId, final long newLength)
       throws IOException {
-    final String storageID = data.updateReplicaUnderRecovery(oldBlock,
-        recoveryId, newBlockId, newLength);
+    final String storageID =
+        getFSDataset(oldBlock.getBlockPoolId())
+            .updateReplicaUnderRecovery(oldBlock, recoveryId,
+                                        newBlockId, newLength);
     // Notify the namenode of the updated block info. This is important
     // for HA, since otherwise the standby node may lose track of the
     // block locations until the next block report.
@@ -2849,7 +2955,7 @@ public class DataNode extends ReconfigurableBase
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
     checkReadAccess(block);
-    return data.getReplicaVisibleLength(block);
+    return getFSDataset(block.getBlockPoolId()).getReplicaVisibleLength(block);
   }
 
   private void checkReadAccess(final ExtendedBlock block) throws IOException {
@@ -2886,10 +2992,11 @@ public class DataNode extends ReconfigurableBase
     final long storedGS;
     final long visible;
     final BlockConstructionStage stage;
+    final FsDatasetSpi<?> dataset = getFSDataset(b.getBlockPoolId());
 
     //get replica information
-    synchronized(data) {
-      Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
+    synchronized(dataset) {
+      Block storedBlock = dataset.getStoredBlock(b.getBlockPoolId(),
           b.getBlockId());
       if (null == storedBlock) {
         throw new IOException(b + " not found in datanode.");
@@ -2901,15 +3008,16 @@ public class DataNode extends ReconfigurableBase
       }
       // Update the genstamp with storedGS
       b.setGenerationStamp(storedGS);
-      if (data.isValidRbw(b)) {
+      if (dataset.isValidRbw(b)) {
         stage = BlockConstructionStage.TRANSFER_RBW;
-      } else if (data.isValidBlock(b)) {
+      } else if (dataset.isValidBlock(b)) {
         stage = BlockConstructionStage.TRANSFER_FINALIZED;
       } else {
-        final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
+        final String r = dataset.getReplicaString(
+            b.getBlockPoolId(), b.getBlockId());
         throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
       }
-      visible = data.getReplicaVisibleLength(b);
+      visible = dataset.getReplicaVisibleLength(b);
     }
     //set visible length
     b.setNumBytes(visible);
@@ -2990,6 +3098,7 @@ public class DataNode extends ReconfigurableBase
    */
   @Override // DataNodeMXBean
   public String getVolumeInfo() {
+    // Default implementation for backwards compatibility.
     Preconditions.checkNotNull(data, "Storage not yet initialized");
     return JSON.toString(data.getVolumeInfoMap());
   }
@@ -3024,7 +3133,7 @@ public class DataNode extends ReconfigurableBase
           "shutdown the block pool service");
     }
    
-    data.deleteBlockPool(blockPoolId, force);
+    getFSDataset(blockPoolId).deleteBlockPool(blockPoolId, force);
   }
 
   @Override // ClientDatanodeProtocol
@@ -3179,8 +3288,8 @@ public class DataNode extends ReconfigurableBase
   /**
    * Check the disk error
    */
-  private void checkDiskError() {
-    Set<File> unhealthyDataDirs = data.checkDataDir();
+  private void checkDiskError(final FsDatasetSpi<?> dataset) {
+    Set<File> unhealthyDataDirs = dataset.checkDataDir();
     if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
       try {
         // Remove all unhealthy volumes from DataNode.
@@ -3193,7 +3302,7 @@ public class DataNode extends ReconfigurableBase
       for (File dataDir : unhealthyDataDirs) {
         sb.append(dataDir.getAbsolutePath() + ";");
       }
-      handleDiskError(sb.toString());
+      handleDiskError(dataset, sb.toString());
     }
   }
 
@@ -3213,7 +3322,9 @@ public class DataNode extends ReconfigurableBase
               }
               if(tempFlag) {
                 try {
-                  checkDiskError();
+                  for (final FsDatasetSpi<?> dataset : datasets) {
+                    checkDiskError(dataset);
+                  }
                 } catch (Exception e) {
                   LOG.warn("Unexpected exception occurred while checking disk error  " + e);
                   checkDiskErrorThread = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 26d669c..fbb8897 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
@@ -312,8 +313,9 @@ class DataXceiver extends Receiver implements Runnable {
               "anything but a UNIX domain socket.");
         }
         if (slotId != null) {
-          boolean isCached = datanode.data.
-              isCached(blk.getBlockPoolId(), blk.getBlockId());
+          final String bpid = blk.getBlockPoolId();
+          boolean isCached = datanode.getFSDataset(bpid).
+              isCached(bpid, blk.getBlockId());
           datanode.shortCircuitRegistry.registerSlot(
               ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
           registeredSlotId = slotId;
@@ -523,7 +525,14 @@ class DataXceiver extends Receiver implements Runnable {
         baseStream, smallBufferSize));
     checkAccess(out, true, block, blockToken,
         Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
-  
+
+    final FsDatasetSpi<?> dataset =
+        datanode.getFSDataset(block.getBlockPoolId());
+    if (dataset == null) {
+      throw new IOException(
+          "Unknown or unitialized blockpool " + block.getBlockPoolId());
+    }
+
     // send the block
     BlockSender blockSender = null;
     DatanodeRegistration dnR = 
@@ -540,7 +549,7 @@ class DataXceiver extends Receiver implements Runnable {
     try {
       try {
         blockSender = new BlockSender(block, blockOffset, length,
-            true, false, sendChecksum, datanode, clientTraceFmt,
+            true, false, sendChecksum, datanode, dataset, clientTraceFmt,
             cachingStrategy);
       } catch(IOException e) {
         String msg = "opReadBlock " + block + " received exception " + e; 
@@ -630,6 +639,13 @@ class DataXceiver extends Receiver implements Runnable {
     final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
         || stage == BlockConstructionStage.TRANSFER_FINALIZED;
     long size = 0;
+    final FsDatasetSpi<?> dataset =
+        datanode.getFSDataset(block.getBlockPoolId());
+    if (dataset == null) {
+      throw new IOException(
+          "Unknown or unitialized blockpool " + block.getBlockPoolId());
+    }
+
     // check single target for transfer-RBW/Finalized 
     if (isTransfer && targets.length > 0) {
       throw new IOException(stage + " does not support multiple targets "
@@ -683,12 +699,12 @@ class DataXceiver extends Receiver implements Runnable {
             peer.getRemoteAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
-            clientname, srcDataNode, datanode, requestedChecksum,
+            clientname, srcDataNode, datanode, dataset, requestedChecksum,
             cachingStrategy, allowLazyPersist, pinning);
 
         storageUuid = blockReceiver.getStorageUuid();
       } else {
-        storageUuid = datanode.data.recoverClose(
+        storageUuid = dataset.recoverClose(
             block, latestGenerationStamp, minBytesRcvd);
       }
 
@@ -890,6 +906,12 @@ class DataXceiver extends Receiver implements Runnable {
     final int csize = checksum.getChecksumSize();
     final byte[] buffer = new byte[4*1024];
     MessageDigest digester = MD5Hash.getDigester();
+    final FsDatasetSpi<?> dataset =
+        datanode.getFSDataset(block.getBlockPoolId());
+    if (dataset == null) {
+      throw new IOException(
+          "Unknown or unitialized blockpool " + block.getBlockPoolId());
+    }
 
     long remaining = requestLength / bytesPerCRC * csize;
     for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
@@ -904,7 +926,7 @@ class DataXceiver extends Receiver implements Runnable {
     int partialLength = (int) (requestLength % bytesPerCRC);
     if (partialLength > 0) {
       byte[] buf = new byte[partialLength];
-      final InputStream blockIn = datanode.data.getBlockInputStream(block,
+      final InputStream blockIn = dataset.getBlockInputStream(block,
           requestLength - partialLength);
       try {
         // Get the CRC of the partialLength.
@@ -928,14 +950,20 @@ class DataXceiver extends Receiver implements Runnable {
     checkAccess(out, true, block, blockToken,
         Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
     // client side now can specify a range of the block for checksum
+    final FsDatasetSpi<?> dataset =
+        datanode.getFSDataset(block.getBlockPoolId());
+    if (dataset == null) {
+      throw new IOException(
+          "Unknown or unitialized blockpool " + block.getBlockPoolId());
+    }
+
     long requestLength = block.getNumBytes();
     Preconditions.checkArgument(requestLength >= 0);
-    long visibleLength = datanode.data.getReplicaVisibleLength(block);
+    long visibleLength = dataset.getReplicaVisibleLength(block);
     boolean partialBlk = requestLength < visibleLength;
 
     updateCurrentThreadName("Reading metadata for block " + block);
-    final LengthInputStream metadataIn = datanode.data
-        .getMetaDataInputStream(block);
+    final LengthInputStream metadataIn = dataset.getMetaDataInputStream(block);
     
     final DataInputStream checksumIn = new DataInputStream(
         new BufferedInputStream(metadataIn, ioFileBufferSize));
@@ -986,6 +1014,13 @@ class DataXceiver extends Receiver implements Runnable {
   @Override
   public void copyBlock(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
+    final FsDatasetSpi<?> dataset =
+        datanode.getFSDataset(block.getBlockPoolId());
+    if (dataset == null) {
+      throw new IOException(
+          "Unknown or unitialized blockpool " + block.getBlockPoolId());
+    }
+
     updateCurrentThreadName("Copying block " + block);
     // Read in the header
     if (datanode.isBlockTokenEnabled) {
@@ -1002,7 +1037,7 @@ class DataXceiver extends Receiver implements Runnable {
 
     }
     
-    if (datanode.data.getPinning(block)) {
+    if (dataset.getPinning(block)) {
       String msg = "Not able to copy block " + block.getBlockId() + " " +
           "to " + peer.getRemoteAddressString() + " because it's pinned ";
       LOG.info(msg);
@@ -1025,7 +1060,7 @@ class DataXceiver extends Receiver implements Runnable {
     try {
       // check if the block exists or not
       blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, 
-          null, CachingStrategy.newDropBehind());
+          dataset, null, CachingStrategy.newDropBehind());
 
       // set up response stream
       OutputStream baseStream = getOutputStream();
@@ -1073,6 +1108,13 @@ class DataXceiver extends Receiver implements Runnable {
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
       final DatanodeInfo proxySource) throws IOException {
+    final FsDatasetSpi<?> dataset =
+        datanode.getFSDataset(block.getBlockPoolId());
+    if (dataset == null) {
+      throw new IOException(
+          "Unknown or unitialized blockpool " + block.getBlockPoolId());
+    }
+
     updateCurrentThreadName("Replacing block " + block + " from " + delHint);
 
     /* read header */
@@ -1109,7 +1151,7 @@ class DataXceiver extends Receiver implements Runnable {
     try {
       // Move the block to different storage in the same datanode
       if (proxySource.equals(datanode.getDatanodeId())) {
-        ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block,
+        ReplicaInfo oldReplica = dataset.moveBlockAcrossStorage(block,
             storageType);
         if (oldReplica != null) {
           LOG.info("Moved " + block + " from StorageType "
@@ -1164,7 +1206,7 @@ class DataXceiver extends Receiver implements Runnable {
         blockReceiver = new BlockReceiver(block, storageType,
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
-            null, 0, 0, 0, "", null, datanode, remoteChecksum,
+            null, 0, 0, 0, "", null, datanode, dataset, remoteChecksum,
             CachingStrategy.newDropBehind(), false, false);
         
         // receive a block

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 615abe9..2cc3516 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -433,8 +433,9 @@ public class VolumeScanner extends Thread {
     BlockSender blockSender = null;
     try {
       blockSender = new BlockSender(block, 0, -1,
-          false, true, true, datanode, null,
-          CachingStrategy.newDropBehind());
+          false, true, true, datanode,
+          datanode.getFSDataset(block.getBlockPoolId()),
+          null, CachingStrategy.newDropBehind());
       throttler.setBandwidth(bytesPerSec);
       long bytesRead = blockSender.sendBlock(nullStream, null, throttler);
       resultHandler.handle(block, null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 76c4f02..a4672b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@@ -68,6 +70,7 @@ import org.apache.hadoop.util.ReflectionUtils;
  * The default implementation stores replicas on local drives. 
  */
 @InterfaceAudience.Private
+@InterfaceStability.Unstable
 public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   /**
    * A factory for creating {@link FsDatasetSpi} objects.
@@ -83,9 +86,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
       return ReflectionUtils.newInstance(clazz, conf);
     }
 
-    /** Create a new object. */
-    public abstract D newInstance(DataNode datanode, DataStorage storage,
-        Configuration conf) throws IOException;
+    /** Create a new dataset object for a specific service type. */
+    public abstract D newInstance(DataNode datanode,
+        DataStorage storage, Configuration conf,
+        NodeType serviceType) throws IOException;
 
     /** Does the factory create simulated objects? */
     public boolean isSimulated() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index 8d1bb2a..7c7b9a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -22,12 +22,16 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
  * This is an interface for the underlying volume.
  */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
 public interface FsVolumeSpi {
   /**
    * Obtain a reference object that had increased 1 reference count of the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
index 52e385b..01c3830 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
@@ -18,8 +18,11 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -28,9 +31,25 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
  * A factory for creating {@link FsDatasetImpl} objects.
  */
 public class FsDatasetFactory extends FsDatasetSpi.Factory<FsDatasetImpl> {
+
+  private final Map<NodeType, FsDatasetImpl> datasetMap = new HashMap<>();
+
   @Override
-  public FsDatasetImpl newInstance(DataNode datanode,
-      DataStorage storage, Configuration conf) throws IOException {
-    return new FsDatasetImpl(datanode, storage, conf);
+  public synchronized FsDatasetImpl newInstance(DataNode datanode,
+      DataStorage storage, Configuration conf,
+      NodeType serviceType) throws IOException {
+    FsDatasetImpl dataset = datasetMap.get(serviceType);
+    if (dataset != null) {
+      return dataset;
+    }
+    switch (serviceType) {
+    case NAME_NODE:
+      dataset = new FsDatasetImpl(datanode, storage, conf);
+      break;
+    default:
+      throw new IllegalArgumentException("Unsupported node type " + serviceType);
+    }
+    datasetMap.put(serviceType, dataset);
+    return dataset;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8ebd214..999b827 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -308,7 +308,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
         blockChooserImpl);
     asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
-    asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
+    asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, this);
     deletingBlock = new HashMap<String, Set<Long>>();
 
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
@@ -347,9 +347,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   /**
    * Gets initial volume failure information for all volumes that failed
-   * immediately at startup.  The method works by determining the set difference
-   * between all configured storage locations and the actual storage locations in
-   * use after attempting to put all of them into service.
+   * immediately at startup.  The method works by determining the set
+   * difference between all configured storage locations and the actual
+   * storage locations in use after attempting to put all of them into
+   * service.
    *
    * @return each storage location that has failed
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 884df2e..effbd4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -53,6 +53,7 @@ class RamDiskAsyncLazyPersistService {
   private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
 
   private final DataNode datanode;
+  private final FsDatasetImpl dataset;
   private final ThreadGroup threadGroup;
   private Map<File, ThreadPoolExecutor> executors
       = new HashMap<File, ThreadPoolExecutor>();
@@ -65,8 +66,10 @@ class RamDiskAsyncLazyPersistService {
    * The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async
    * disk operations.
    */
-  RamDiskAsyncLazyPersistService(DataNode datanode) {
+  RamDiskAsyncLazyPersistService(DataNode datanode,
+                                 final FsDatasetImpl dataset) {
     this.datanode = datanode;
+    this.dataset = dataset;
     this.threadGroup = new ThreadGroup(getClass().getSimpleName());
   }
 
@@ -234,7 +237,6 @@ class RamDiskAsyncLazyPersistService {
     @Override
     public void run() {
       boolean succeeded = false;
-      final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
       try (FsVolumeReference ref = this.targetVolume) {
         int smallBufferSize = DFSUtil.getSmallBufferSize(EMPTY_HDFS_CONF);
         // No FsDatasetImpl lock for the file copy

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index 5c1b38f..d925b93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.*;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -73,7 +74,8 @@ public class TestWriteBlockGetsBlockLengthHint {
     static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
       @Override
       public SimulatedFSDataset newInstance(DataNode datanode,
-          DataStorage storage, Configuration conf) throws IOException {
+          DataStorage storage, Configuration conf,
+          HdfsServerConstants.NodeType serviceType) throws IOException {
         return new FsDatasetChecker(storage, conf);
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 778dd28..d957767 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@@ -83,7 +84,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
     @Override
     public SimulatedFSDataset newInstance(DataNode datanode,
-        DataStorage storage, Configuration conf) throws IOException {
+        DataStorage storage, Configuration conf,
+        HdfsServerConstants.NodeType serviceType) throws IOException {
       return new SimulatedFSDataset(datanode, storage, conf);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 64cc78b..2059f1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
 
 import java.io.File;
 import java.io.IOException;
@@ -118,7 +119,8 @@ public class TestBPOfferService {
     mockFSDataset.addBlockPool(FAKE_BPID, conf);
 
     // Wire the dataset to the DN.
-    Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
+    Mockito.doReturn(mockFSDataset).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
+    Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
   }
 
   /**
@@ -325,15 +327,16 @@ public class TestBPOfferService {
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
       when(mockDn).getMetrics();
     final AtomicInteger count = new AtomicInteger();
-    Mockito.doAnswer(new Answer<Void>() {
+    Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+    Mockito.doAnswer(new Answer<FsDatasetSpi<?>>() {
       @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
+      public FsDatasetSpi<?> answer(InvocationOnMock invocation) throws Throwable {
         if (count.getAndIncrement() == 0) {
           throw new IOException("faked initBlockPool exception");
         }
         // The initBlockPool is called again. Now mock init is done.
-        Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
-        return null;
+        Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+        return mockFSDataset;
       }
     }).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
     BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
@@ -563,10 +566,10 @@ public class TestBPOfferService {
       assertSame(mockNN1, bpos.getActiveNN());
       Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
           .when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
-          Mockito.anyInt(), Mockito.anyString());
+          Mockito.anyInt(), anyString());
       Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
           .when(mockNN2).errorReport(Mockito.any(DatanodeRegistration.class),
-          Mockito.anyInt(), Mockito.anyString());
+          Mockito.anyInt(), anyString());
       String errorString = "Can't send invalid block " + FAKE_BLOCK;
       bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
       bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
@@ -614,7 +617,7 @@ public class TestBPOfferService {
           }
         }
       }).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
-          Mockito.anyInt(), Mockito.anyString());
+          Mockito.anyInt(), anyString());
       String errorString = "Can't send invalid block " + FAKE_BLOCK;
       bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
       Thread.sleep(10000);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
index 07a26cc..ad4135b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.junit.Test;
 
@@ -45,7 +46,7 @@ public class TestDataNodeInitStorage {
       @Override
       public SimulatedFsDatasetVerifier newInstance(
           DataNode datanode, DataStorage storage,
-          Configuration conf) throws IOException {
+          Configuration conf, HdfsServerConstants.NodeType serviceType) throws IOException {
         return new SimulatedFsDatasetVerifier(storage, conf);
       }