You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by at...@apache.org on 2014/08/29 22:01:53 UTC
git commit: HDFS-6774. Make FsDataset and DataStore support removing
volumes. Contributed by Lei Xu. (cherry picked from commit
7eab2a29a5706ce10912c12fa225ef6b27a82cbe)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 27086f594 -> 135315b66
HDFS-6774. Make FsDataset and DataStore support removing volumes. Contributed by Lei Xu.
(cherry picked from commit 7eab2a29a5706ce10912c12fa225ef6b27a82cbe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/135315b6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/135315b6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/135315b6
Branch: refs/heads/branch-2
Commit: 135315b66fba5d248a983ad5d05d7ab7da42b5fb
Parents: 27086f5
Author: Aaron T. Myers <at...@apache.org>
Authored: Fri Aug 29 12:59:23 2014 -0700
Committer: Aaron T. Myers <at...@apache.org>
Committed: Fri Aug 29 13:00:36 2014 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../server/datanode/BlockPoolSliceStorage.java | 14 +++
.../hdfs/server/datanode/DataStorage.java | 27 ++++++
.../server/datanode/fsdataset/FsDatasetSpi.java | 3 +
.../datanode/fsdataset/impl/BlockPoolSlice.java | 2 +-
.../impl/FsDatasetAsyncDiskService.java | 18 ++++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 69 +++++++++++++++
.../datanode/fsdataset/impl/FsVolumeList.java | 19 ++++
.../server/datanode/SimulatedFSDataset.java | 5 ++
.../fsdataset/impl/TestFsDatasetImpl.java | 92 ++++++++++++++++++--
10 files changed, 245 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 95feb33..5414aea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -168,6 +168,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6879. Adding tracing to Hadoop RPC (Masatake Iwasaki via Colin Patrick
McCabe)
+ HDFS-6774. Make FsDataset and DataStore support removing volumes. (Lei Xu
+ via atm)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index bcee1df..45ca0be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -202,6 +202,20 @@ public class BlockPoolSliceStorage extends Storage {
}
/**
+ * Remove storage directories.
+ * @param storageDirs a set of storage directories to be removed.
+ */
+ void removeVolumes(Set<File> storageDirs) {
+ for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
+ it.hasNext(); ) {
+ StorageDirectory sd = it.next();
+ if (storageDirs.contains(sd.getRoot())) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
* Set layoutVersion, namespaceID and blockpoolID into block pool storage
* VERSION file
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 29616e7..9929199 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -313,6 +313,33 @@ public class DataStorage extends Storage {
}
/**
+ * Remove volumes from DataStorage.
+ * @param locations a collection of volumes.
+ */
+ synchronized void removeVolumes(Collection<StorageLocation> locations) {
+ if (locations.isEmpty()) {
+ return;
+ }
+
+ Set<File> dataDirs = new HashSet<File>();
+ for (StorageLocation sl : locations) {
+ dataDirs.add(sl.getFile());
+ }
+
+ for (BlockPoolSliceStorage bpsStorage : this.bpStorageMap.values()) {
+ bpsStorage.removeVolumes(dataDirs);
+ }
+
+ for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
+ it.hasNext(); ) {
+ StorageDirectory sd = it.next();
+ if (dataDirs.contains(sd.getRoot())) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
* Analyze storage directories.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index a64f9c0..0fbfe19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -97,6 +97,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
public void addVolumes(Collection<StorageLocation> volumes)
throws IOException;
+ /** Removes a collection of volumes from FsDataset. */
+ public void removeVolumes(Collection<StorageLocation> volumes);
+
/** @return a storage with the given storage ID */
public DatanodeStorage getStorage(final String storageUuid);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index e47a302..658fe27 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -302,7 +302,7 @@ class BlockPoolSlice {
loadRwr = false;
}
sc.close();
- if (restartMeta.delete()) {
+ if (!restartMeta.delete()) {
FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 539e97b..bee7bf7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -118,6 +118,24 @@ class FsDatasetAsyncDiskService {
}
addExecutorForVolume(volume);
}
+
+ /**
+ * Stops AsyncDiskService for a volume.
+ * @param volume the root of the volume.
+ */
+ synchronized void removeVolume(File volume) {
+ if (executors == null) {
+ throw new RuntimeException("AsyncDiskService is already shutdown");
+ }
+ ThreadPoolExecutor executor = executors.get(volume);
+ if (executor == null) {
+ throw new RuntimeException("Can not find volume " + volume
+ + " to remove.");
+ } else {
+ executor.shutdown();
+ executors.remove(volume);
+ }
+ }
synchronized long countPendingDeletions() {
long count = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 0c872a5..67ff9b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -30,9 +30,11 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import javax.management.NotCompliantMBeanException;
@@ -314,6 +316,51 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
+ /**
+ * Removes a collection of volumes from FsDataset.
+ * @param volumes the root directories of the volumes.
+ *
+ * DataNode should call this function before calling
+ * {@link DataStorage#removeVolumes(java.util.Collection)}.
+ */
+ @Override
+ public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
+ Set<File> volumeSet = new HashSet<File>();
+ for (StorageLocation sl : volumes) {
+ volumeSet.add(sl.getFile());
+ }
+ for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
+ Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
+ if (volumeSet.contains(sd.getRoot())) {
+ String volume = sd.getRoot().toString();
+ LOG.info("Removing " + volume + " from FsDataset.");
+
+ this.volumes.removeVolume(volume);
+ storageMap.remove(sd.getStorageUuid());
+ asyncDiskService.removeVolume(sd.getCurrentDir());
+
+ // Removed all replica information for the blocks on the volume. Unlike
+ // updating the volumeMap in addVolume(), this operation does not scan
+ // disks.
+ for (String bpid : volumeMap.getBlockPoolList()) {
+ List<Block> blocks = new ArrayList<Block>();
+ for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
+ it.hasNext(); ) {
+ ReplicaInfo block = it.next();
+ if (block.getVolume().getBasePath().equals(volume)) {
+ invalidate(bpid, block.getBlockId());
+ blocks.add(block);
+ it.remove();
+ }
+ }
+ // Delete blocks from the block scanner in batch.
+ datanode.getBlockScanner().deleteBlocks(bpid,
+ blocks.toArray(new Block[blocks.size()]));
+ }
+ }
+ }
+ }
+
private StorageType getStorageTypeFromLocations(
Collection<StorageLocation> dataLocations, File dir) {
for (StorageLocation dataLocation : dataLocations) {
@@ -1302,6 +1349,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
/**
+ * Invalidate a block but does not delete the actual on-disk block file.
+ *
+ * It should only be used for decommissioning disks.
+ *
+ * @param bpid the block pool ID.
+ * @param blockId the ID of the block.
+ */
+ public void invalidate(String bpid, long blockId) {
+ // If a DFSClient has the replica in its cache of short-circuit file
+ // descriptors (and the client is using ShortCircuitShm), invalidate it.
+ // The short-circuit registry is null in the unit tests, because the
+ // datanode is mock object.
+ if (datanode.getShortCircuitRegistry() != null) {
+ datanode.getShortCircuitRegistry().processBlockInvalidation(
+ new ExtendedBlockId(blockId, bpid));
+
+ // If the block is cached, start uncaching it.
+ cacheManager.uncacheBlock(bpid, blockId);
+ }
+ }
+
+ /**
* Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
*/
private void cacheBlock(String bpid, long blockId) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index d4f8adc..90739c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -212,6 +212,25 @@ class FsVolumeList {
FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
}
+ /**
+ * Dynamically remove volume to the list.
+ * @param volume the volume to be removed.
+ */
+ synchronized void removeVolume(String volume) {
+ // Make a copy of volumes to remove one volume.
+ final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
+ for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
+ FsVolumeImpl fsVolume = it.next();
+ if (fsVolume.getBasePath().equals(volume)) {
+ fsVolume.shutdown();
+ it.remove();
+ volumes = Collections.unmodifiableList(volumeList);
+ FsDatasetImpl.LOG.info("Removed volume: " + volume);
+ break;
+ }
+ }
+ }
+
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
long totalStartTime = Time.monotonicNow();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 109a039..a51342e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1121,6 +1121,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
+ public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/135315b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index d9e9907..2c4c401 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -18,12 +18,20 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
@@ -35,25 +43,44 @@ import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestFsDatasetImpl {
private static final String BASE_DIR =
- System.getProperty("test.build.dir") + "/fsdatasetimpl";
+ new FileSystemTestHelper().getTestRootDir();
private static final int NUM_INIT_VOLUMES = 2;
+ private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
+ // Use to generate storageUuid
+ private static final DataStorage dsForStorageUuid = new DataStorage(
+ new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE));
+
+ private Configuration conf;
private DataStorage storage;
+ private DataBlockScanner scanner;
private FsDatasetImpl dataset;
+ private static Storage.StorageDirectory createStorageDirectory(File root) {
+ Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
+ dsForStorageUuid.createStorageID(sd);
+ return sd;
+ }
+
private static void createStorageDirs(DataStorage storage, Configuration conf,
int numDirs) throws IOException {
List<Storage.StorageDirectory> dirs =
new ArrayList<Storage.StorageDirectory>();
List<String> dirStrings = new ArrayList<String>();
for (int i = 0; i < numDirs; i++) {
- String loc = BASE_DIR + "/data" + i;
- dirStrings.add(loc);
- dirs.add(new Storage.StorageDirectory(new File(loc)));
+ File loc = new File(BASE_DIR + "/data" + i);
+ dirStrings.add(loc.toString());
+ loc.mkdirs();
+ dirs.add(createStorageDirectory(loc));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
@@ -66,14 +93,19 @@ public class TestFsDatasetImpl {
public void setUp() throws IOException {
final DataNode datanode = Mockito.mock(DataNode.class);
storage = Mockito.mock(DataStorage.class);
- Configuration conf = new Configuration();
+ scanner = Mockito.mock(DataBlockScanner.class);
+ this.conf = new Configuration();
final DNConf dnConf = new DNConf(conf);
when(datanode.getConf()).thenReturn(conf);
when(datanode.getDnConf()).thenReturn(dnConf);
+ when(datanode.getBlockScanner()).thenReturn(scanner);
createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
dataset = new FsDatasetImpl(datanode, storage, conf);
+ for (String bpid : BLOCK_POOL_IDS) {
+ dataset.addBlockPool(bpid, conf);
+ }
assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size());
assertEquals(0, dataset.getNumFailedVolumes());
@@ -89,15 +121,63 @@ public class TestFsDatasetImpl {
String path = BASE_DIR + "/newData" + i;
newLocations.add(StorageLocation.parse(path));
when(storage.getStorageDir(numExistingVolumes + i))
- .thenReturn(new Storage.StorageDirectory(new File(path)));
+ .thenReturn(createStorageDirectory(new File(path)));
}
when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
dataset.addVolumes(newLocations);
assertEquals(totalVolumes, dataset.getVolumes().size());
+ assertEquals(totalVolumes, dataset.storageMap.size());
for (int i = 0; i < numNewVolumes; i++) {
assertEquals(newLocations.get(i).getFile().getPath(),
dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
}
}
+
+ @Test
+ public void testRemoveVolumes() throws IOException {
+ // Feed FsDataset with block metadata.
+ final int NUM_BLOCKS = 100;
+ for (int i = 0; i < NUM_BLOCKS; i++) {
+ String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
+ ExtendedBlock eb = new ExtendedBlock(bpid, i);
+ dataset.createRbw(StorageType.DEFAULT, eb);
+ }
+ final String[] dataDirs =
+ conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
+ final String volumePathToRemove = dataDirs[0];
+ List<StorageLocation> volumesToRemove = new ArrayList<StorageLocation>();
+ volumesToRemove.add(StorageLocation.parse(volumePathToRemove));
+
+ dataset.removeVolumes(volumesToRemove);
+ int expectedNumVolumes = dataDirs.length - 1;
+ assertEquals("The volume has been removed from the volumeList.",
+ expectedNumVolumes, dataset.getVolumes().size());
+ assertEquals("The volume has been removed from the storageMap.",
+ expectedNumVolumes, dataset.storageMap.size());
+
+ try {
+ dataset.asyncDiskService.execute(volumesToRemove.get(0).getFile(),
+ new Runnable() {
+ @Override
+ public void run() {}
+ });
+ fail("Expect RuntimeException: the volume has been removed from the "
+ + "AsyncDiskService.");
+ } catch (RuntimeException e) {
+ GenericTestUtils.assertExceptionContains("Cannot find root", e);
+ }
+
+ int totalNumReplicas = 0;
+ for (String bpid : dataset.volumeMap.getBlockPoolList()) {
+ totalNumReplicas += dataset.volumeMap.size(bpid);
+ }
+ assertEquals("The replica infos on this volume has been removed from the "
+ + "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
+ totalNumReplicas);
+
+ // Verify that every BlockPool deletes the removed blocks from the volume.
+ verify(scanner, times(BLOCK_POOL_IDS.length))
+ .deleteBlocks(anyString(), any(Block[].class));
+ }
}