You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by at...@apache.org on 2014/08/29 22:01:53 UTC

git commit: HDFS-6774. Make FsDataset and DataStore support removing volumes. Contributed by Lei Xu. (cherry picked from commit 7eab2a29a5706ce10912c12fa225ef6b27a82cbe)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 27086f594 -> 135315b66


HDFS-6774. Make FsDataset and DataStore support removing volumes. Contributed by Lei Xu.
(cherry picked from commit 7eab2a29a5706ce10912c12fa225ef6b27a82cbe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/135315b6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/135315b6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/135315b6

Branch: refs/heads/branch-2
Commit: 135315b66fba5d248a983ad5d05d7ab7da42b5fb
Parents: 27086f5
Author: Aaron T. Myers <at...@apache.org>
Authored: Fri Aug 29 12:59:23 2014 -0700
Committer: Aaron T. Myers <at...@apache.org>
Committed: Fri Aug 29 13:00:36 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../server/datanode/BlockPoolSliceStorage.java  | 14 +++
 .../hdfs/server/datanode/DataStorage.java       | 27 ++++++
 .../server/datanode/fsdataset/FsDatasetSpi.java |  3 +
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  2 +-
 .../impl/FsDatasetAsyncDiskService.java         | 18 ++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 69 +++++++++++++++
 .../datanode/fsdataset/impl/FsVolumeList.java   | 19 ++++
 .../server/datanode/SimulatedFSDataset.java     |  5 ++
 .../fsdataset/impl/TestFsDatasetImpl.java       | 92 ++++++++++++++++++--
 10 files changed, 245 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 95feb33..5414aea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -168,6 +168,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6879. Adding tracing to Hadoop RPC (Masatake Iwasaki via Colin Patrick
     McCabe)
 
+    HDFS-6774. Make FsDataset and DataStore support removing volumes. (Lei Xu
+    via atm)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index bcee1df..45ca0be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -202,6 +202,20 @@ public class BlockPoolSliceStorage extends Storage {
   }
 
   /**
+   * Remove storage directories.
+   * @param storageDirs a set of storage directories to be removed.
+   */
+  void removeVolumes(Set<File> storageDirs) {
+    for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
+         it.hasNext(); ) {
+      StorageDirectory sd = it.next();
+      if (storageDirs.contains(sd.getRoot())) {
+        it.remove();
+      }
+    }
+  }
+
+  /**
    * Set layoutVersion, namespaceID and blockpoolID into block pool storage
    * VERSION file
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 29616e7..9929199 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -313,6 +313,33 @@ public class DataStorage extends Storage {
   }
 
   /**
+   * Remove volumes from DataStorage.
+   * @param locations a collection of volumes.
+   */
+  synchronized void removeVolumes(Collection<StorageLocation> locations) {
+    if (locations.isEmpty()) {
+      return;
+    }
+
+    Set<File> dataDirs = new HashSet<File>();
+    for (StorageLocation sl : locations) {
+      dataDirs.add(sl.getFile());
+    }
+
+    for (BlockPoolSliceStorage bpsStorage : this.bpStorageMap.values()) {
+      bpsStorage.removeVolumes(dataDirs);
+    }
+
+    for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
+         it.hasNext(); ) {
+      StorageDirectory sd = it.next();
+      if (dataDirs.contains(sd.getRoot())) {
+        it.remove();
+      }
+    }
+  }
+
+  /**
    * Analyze storage directories.
    * Recover from previous transitions if required.
    * Perform fs state transition if necessary depending on the namespace info.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index a64f9c0..0fbfe19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -97,6 +97,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   public void addVolumes(Collection<StorageLocation> volumes)
       throws IOException;
 
+  /** Removes a collection of volumes from FsDataset. */
+  public void removeVolumes(Collection<StorageLocation> volumes);
+
   /** @return a storage with the given storage ID */
   public DatanodeStorage getStorage(final String storageUuid);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index e47a302..658fe27 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -302,7 +302,7 @@ class BlockPoolSlice {
             loadRwr = false;
           }
           sc.close();
-          if (restartMeta.delete()) {
+          if (!restartMeta.delete()) {
             FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
               restartMeta.getPath());
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 539e97b..bee7bf7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -118,6 +118,24 @@ class FsDatasetAsyncDiskService {
     }
     addExecutorForVolume(volume);
   }
+
+  /**
+   * Stops AsyncDiskService for a volume.
+   * @param volume the root of the volume.
+   */
+  synchronized void removeVolume(File volume) {
+    if (executors == null) {
+      throw new RuntimeException("AsyncDiskService is already shutdown");
+    }
+    ThreadPoolExecutor executor = executors.get(volume);
+    if (executor == null) {
+      throw new RuntimeException("Can not find volume " + volume
+          + " to remove.");
+    } else {
+      executor.shutdown();
+      executors.remove(volume);
+    }
+  }
   
   synchronized long countPendingDeletions() {
     long count = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 0c872a5..67ff9b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -30,9 +30,11 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 
 import javax.management.NotCompliantMBeanException;
@@ -314,6 +316,51 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
+  /**
+   * Removes a collection of volumes from FsDataset.
+   * @param volumes the root directories of the volumes.
+   *
+   * DataNode should call this function before calling
+   * {@link DataStorage#removeVolumes(java.util.Collection)}.
+   */
+  @Override
+  public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
+    Set<File> volumeSet = new HashSet<File>();
+    for (StorageLocation sl : volumes) {
+      volumeSet.add(sl.getFile());
+    }
+    for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
+      Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
+      if (volumeSet.contains(sd.getRoot())) {
+        String volume = sd.getRoot().toString();
+        LOG.info("Removing " + volume + " from FsDataset.");
+
+        this.volumes.removeVolume(volume);
+        storageMap.remove(sd.getStorageUuid());
+        asyncDiskService.removeVolume(sd.getCurrentDir());
+
+        // Removed all replica information for the blocks on the volume. Unlike
+        // updating the volumeMap in addVolume(), this operation does not scan
+        // disks.
+        for (String bpid : volumeMap.getBlockPoolList()) {
+          List<Block> blocks = new ArrayList<Block>();
+          for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
+              it.hasNext(); ) {
+            ReplicaInfo block = it.next();
+            if (block.getVolume().getBasePath().equals(volume)) {
+              invalidate(bpid, block.getBlockId());
+              blocks.add(block);
+              it.remove();
+            }
+          }
+          // Delete blocks from the block scanner in batch.
+          datanode.getBlockScanner().deleteBlocks(bpid,
+              blocks.toArray(new Block[blocks.size()]));
+        }
+      }
+    }
+  }
+
   private StorageType getStorageTypeFromLocations(
       Collection<StorageLocation> dataLocations, File dir) {
     for (StorageLocation dataLocation : dataLocations) {
@@ -1302,6 +1349,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   /**
+   * Invalidate a block but does not delete the actual on-disk block file.
+   *
+   * It should only be used for decommissioning disks.
+   *
+   * @param bpid the block pool ID.
+   * @param blockId the ID of the block.
+   */
+  public void invalidate(String bpid, long blockId) {
+    // If a DFSClient has the replica in its cache of short-circuit file
+    // descriptors (and the client is using ShortCircuitShm), invalidate it.
+    // The short-circuit registry is null in the unit tests, because the
+    // datanode is mock object.
+    if (datanode.getShortCircuitRegistry() != null) {
+      datanode.getShortCircuitRegistry().processBlockInvalidation(
+          new ExtendedBlockId(blockId, bpid));
+
+      // If the block is cached, start uncaching it.
+      cacheManager.uncacheBlock(bpid, blockId);
+    }
+  }
+
+  /**
    * Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
    */
   private void cacheBlock(String bpid, long blockId) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index d4f8adc..90739c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -212,6 +212,25 @@ class FsVolumeList {
     FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
   }
 
+  /**
+   * Dynamically remove volume to the list.
+   * @param volume the volume to be removed.
+   */
+  synchronized void removeVolume(String volume) {
+    // Make a copy of volumes to remove one volume.
+    final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
+    for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
+      FsVolumeImpl fsVolume = it.next();
+      if (fsVolume.getBasePath().equals(volume)) {
+        fsVolume.shutdown();
+        it.remove();
+        volumes = Collections.unmodifiableList(volumeList);
+        FsDatasetImpl.LOG.info("Removed volume: " + volume);
+        break;
+      }
+    }
+  }
+
   void addBlockPool(final String bpid, final Configuration conf) throws IOException {
     long totalStartTime = Time.monotonicNow();
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 109a039..a51342e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1121,6 +1121,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
+  public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
       FileDescriptor fd, long offset, long nbytes, int flags) {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index d9e9907..2c4c401 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -18,12 +18,20 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,25 +43,44 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestFsDatasetImpl {
   private static final String BASE_DIR =
-      System.getProperty("test.build.dir") + "/fsdatasetimpl";
+      new FileSystemTestHelper().getTestRootDir();
   private static final int NUM_INIT_VOLUMES = 2;
+  private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
 
+  // Use to generate storageUuid
+  private static final DataStorage dsForStorageUuid = new DataStorage(
+      new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE));
+
+  private Configuration conf;
   private DataStorage storage;
+  private DataBlockScanner scanner;
   private FsDatasetImpl dataset;
 
+  private static Storage.StorageDirectory createStorageDirectory(File root) {
+    Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
+    dsForStorageUuid.createStorageID(sd);
+    return sd;
+  }
+
   private static void createStorageDirs(DataStorage storage, Configuration conf,
       int numDirs) throws IOException {
     List<Storage.StorageDirectory> dirs =
         new ArrayList<Storage.StorageDirectory>();
     List<String> dirStrings = new ArrayList<String>();
     for (int i = 0; i < numDirs; i++) {
-      String loc = BASE_DIR + "/data" + i;
-      dirStrings.add(loc);
-      dirs.add(new Storage.StorageDirectory(new File(loc)));
+      File loc = new File(BASE_DIR + "/data" + i);
+      dirStrings.add(loc.toString());
+      loc.mkdirs();
+      dirs.add(createStorageDirectory(loc));
       when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
     }
 
@@ -66,14 +93,19 @@ public class TestFsDatasetImpl {
   public void setUp() throws IOException {
     final DataNode datanode = Mockito.mock(DataNode.class);
     storage = Mockito.mock(DataStorage.class);
-    Configuration conf = new Configuration();
+    scanner = Mockito.mock(DataBlockScanner.class);
+    this.conf = new Configuration();
     final DNConf dnConf = new DNConf(conf);
 
     when(datanode.getConf()).thenReturn(conf);
     when(datanode.getDnConf()).thenReturn(dnConf);
+    when(datanode.getBlockScanner()).thenReturn(scanner);
 
     createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
     dataset = new FsDatasetImpl(datanode, storage, conf);
+    for (String bpid : BLOCK_POOL_IDS) {
+      dataset.addBlockPool(bpid, conf);
+    }
 
     assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size());
     assertEquals(0, dataset.getNumFailedVolumes());
@@ -89,15 +121,63 @@ public class TestFsDatasetImpl {
       String path = BASE_DIR + "/newData" + i;
       newLocations.add(StorageLocation.parse(path));
       when(storage.getStorageDir(numExistingVolumes + i))
-          .thenReturn(new Storage.StorageDirectory(new File(path)));
+          .thenReturn(createStorageDirectory(new File(path)));
     }
     when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
 
     dataset.addVolumes(newLocations);
     assertEquals(totalVolumes, dataset.getVolumes().size());
+    assertEquals(totalVolumes, dataset.storageMap.size());
     for (int i = 0; i < numNewVolumes; i++) {
       assertEquals(newLocations.get(i).getFile().getPath(),
           dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
     }
   }
+
+  @Test
+  public void testRemoveVolumes() throws IOException {
+    // Feed FsDataset with block metadata.
+    final int NUM_BLOCKS = 100;
+    for (int i = 0; i < NUM_BLOCKS; i++) {
+      String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
+      ExtendedBlock eb = new ExtendedBlock(bpid, i);
+      dataset.createRbw(StorageType.DEFAULT, eb);
+    }
+    final String[] dataDirs =
+        conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
+    final String volumePathToRemove = dataDirs[0];
+    List<StorageLocation> volumesToRemove = new ArrayList<StorageLocation>();
+    volumesToRemove.add(StorageLocation.parse(volumePathToRemove));
+
+    dataset.removeVolumes(volumesToRemove);
+    int expectedNumVolumes = dataDirs.length - 1;
+    assertEquals("The volume has been removed from the volumeList.",
+        expectedNumVolumes, dataset.getVolumes().size());
+    assertEquals("The volume has been removed from the storageMap.",
+        expectedNumVolumes, dataset.storageMap.size());
+
+    try {
+      dataset.asyncDiskService.execute(volumesToRemove.get(0).getFile(),
+          new Runnable() {
+            @Override
+            public void run() {}
+          });
+      fail("Expect RuntimeException: the volume has been removed from the "
+           + "AsyncDiskService.");
+    } catch (RuntimeException e) {
+      GenericTestUtils.assertExceptionContains("Cannot find root", e);
+    }
+
+    int totalNumReplicas = 0;
+    for (String bpid : dataset.volumeMap.getBlockPoolList()) {
+      totalNumReplicas += dataset.volumeMap.size(bpid);
+    }
+    assertEquals("The replica infos on this volume has been removed from the "
+                 + "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
+                 totalNumReplicas);
+
+    // Verify that every BlockPool deletes the removed blocks from the volume.
+    verify(scanner, times(BLOCK_POOL_IDS.length))
+        .deleteBlocks(anyString(), any(Block[].class));
+  }
 }