You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by le...@apache.org on 2016/10/10 22:30:10 UTC
[2/2] hadoop git commit: HDFS-10637. Modifications to remove the
assumption that FsVolumes are backed by java.io.File. (Virajith Jalaparti via
lei)
HDFS-10637. Modifications to remove the assumption that FsVolumes are backed by java.io.File. (Virajith Jalaparti via lei)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/96b12662
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/96b12662
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/96b12662
Branch: refs/heads/trunk
Commit: 96b12662ea76e3ded4ef13944fc8df206cfb4613
Parents: 0773ffd
Author: Lei Xu <le...@apache.org>
Authored: Mon Oct 10 15:28:19 2016 -0700
Committer: Lei Xu <le...@apache.org>
Committed: Mon Oct 10 15:30:03 2016 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/server/common/Storage.java | 22 ++
.../server/datanode/BlockPoolSliceStorage.java | 20 +-
.../hdfs/server/datanode/BlockScanner.java | 8 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 34 +-
.../hdfs/server/datanode/DataStorage.java | 34 +-
.../hdfs/server/datanode/DirectoryScanner.java | 320 +------------------
.../hdfs/server/datanode/DiskBalancer.java | 25 +-
.../hdfs/server/datanode/LocalReplica.java | 2 +-
.../hdfs/server/datanode/ReplicaInfo.java | 2 +-
.../hdfs/server/datanode/StorageLocation.java | 32 +-
.../hdfs/server/datanode/VolumeScanner.java | 27 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 5 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 234 +++++++++++++-
.../impl/FsDatasetAsyncDiskService.java | 40 ++-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 136 ++++----
.../datanode/fsdataset/impl/FsVolumeImpl.java | 233 ++++++++++++--
.../fsdataset/impl/FsVolumeImplBuilder.java | 65 ++++
.../datanode/fsdataset/impl/FsVolumeList.java | 44 +--
.../impl/RamDiskAsyncLazyPersistService.java | 79 +++--
.../fsdataset/impl/VolumeFailureInfo.java | 13 +-
.../hdfs/server/namenode/FSNamesystem.java | 2 +-
.../TestNameNodePrunesMissingStorages.java | 15 +-
.../server/datanode/SimulatedFSDataset.java | 46 ++-
.../hdfs/server/datanode/TestBlockScanner.java | 3 +-
.../datanode/TestDataNodeHotSwapVolumes.java | 15 +-
.../datanode/TestDataNodeVolumeFailure.java | 12 +-
.../TestDataNodeVolumeFailureReporting.java | 10 +
.../server/datanode/TestDirectoryScanner.java | 76 +++--
.../hdfs/server/datanode/TestDiskError.java | 2 +-
.../extdataset/ExternalDatasetImpl.java | 10 +-
.../datanode/extdataset/ExternalVolumeImpl.java | 44 ++-
.../fsdataset/impl/FsDatasetImplTestUtils.java | 9 +-
.../fsdataset/impl/TestFsDatasetImpl.java | 69 ++--
.../fsdataset/impl/TestFsVolumeList.java | 83 +++--
.../TestDiskBalancerWithMockMover.java | 4 +-
35 files changed, 1062 insertions(+), 713 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 9218e9d..e55de35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.ToolRunner;
@@ -269,11 +270,17 @@ public abstract class Storage extends StorageInfo {
private String storageUuid = null; // Storage directory identifier.
+ private final StorageLocation location;
public StorageDirectory(File dir) {
// default dirType is null
this(dir, null, false);
}
+ public StorageDirectory(StorageLocation location) {
+ // default dirType is null
+ this(location.getFile(), null, false, location);
+ }
+
public StorageDirectory(File dir, StorageDirType dirType) {
this(dir, dirType, false);
}
@@ -294,11 +301,22 @@ public abstract class Storage extends StorageInfo {
* disables locking on the storage directory, false enables locking
*/
public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) {
+ this(dir, dirType, isShared, null);
+ }
+
+ public StorageDirectory(File dir, StorageDirType dirType,
+ boolean isShared, StorageLocation location) {
this.root = dir;
this.lock = null;
this.dirType = dirType;
this.isShared = isShared;
+ this.location = location;
+ assert location == null ||
+ dir.getAbsolutePath().startsWith(
+ location.getFile().getAbsolutePath()):
+ "The storage location and directory should be equal";
}
+
/**
* Get root directory of this storage
@@ -861,6 +879,10 @@ public abstract class Storage extends StorageInfo {
}
return false;
}
+
+ public StorageLocation getStorageLocation() {
+ return location;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index fd89611..e3b6da1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -147,10 +147,10 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException
*/
private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
- File dataDir, StartupOption startOpt,
+ File dataDir, StorageLocation location, StartupOption startOpt,
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
- StorageDirectory sd = new StorageDirectory(dataDir, null, true);
+ StorageDirectory sd = new StorageDirectory(dataDir, null, true, location);
try {
StorageState curState = sd.analyzeStorage(startOpt, this, true);
// sd is locked but not opened
@@ -208,9 +208,9 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error
*/
List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
- Collection<File> dataDirs, StartupOption startOpt,
- List<Callable<StorageDirectory>> callables, Configuration conf)
- throws IOException {
+ Collection<File> dataDirs, StorageLocation location,
+ StartupOption startOpt, List<Callable<StorageDirectory>> callables,
+ Configuration conf) throws IOException {
List<StorageDirectory> succeedDirs = Lists.newArrayList();
try {
for (File dataDir : dataDirs) {
@@ -220,7 +220,7 @@ public class BlockPoolSliceStorage extends Storage {
"attempt to load an used block storage: " + dataDir);
}
final StorageDirectory sd = loadStorageDirectory(
- nsInfo, dataDir, startOpt, callables, conf);
+ nsInfo, dataDir, location, startOpt, callables, conf);
succeedDirs.add(sd);
}
} catch (IOException e) {
@@ -244,12 +244,12 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error
*/
List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
- Collection<File> dataDirs, StartupOption startOpt,
- List<Callable<StorageDirectory>> callables, Configuration conf)
- throws IOException {
+ Collection<File> dataDirs, StorageLocation location,
+ StartupOption startOpt, List<Callable<StorageDirectory>> callables,
+ Configuration conf) throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
final List<StorageDirectory> loaded = loadBpStorageDirectories(
- nsInfo, dataDirs, startOpt, callables, conf);
+ nsInfo, dataDirs, location, startOpt, callables, conf);
for (StorageDirectory sd : loaded) {
addStorageDir(sd);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
index 456dcc1..21484fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -201,17 +201,17 @@ public class BlockScanner {
FsVolumeSpi volume = ref.getVolume();
if (!isEnabled()) {
LOG.debug("Not adding volume scanner for {}, because the block " +
- "scanner is disabled.", volume.getBasePath());
+ "scanner is disabled.", volume);
return;
}
VolumeScanner scanner = scanners.get(volume.getStorageID());
if (scanner != null) {
LOG.error("Already have a scanner for volume {}.",
- volume.getBasePath());
+ volume);
return;
}
LOG.debug("Adding scanner for volume {} (StorageID {})",
- volume.getBasePath(), volume.getStorageID());
+ volume, volume.getStorageID());
scanner = new VolumeScanner(conf, datanode, ref);
scanner.start();
scanners.put(volume.getStorageID(), scanner);
@@ -245,7 +245,7 @@ public class BlockScanner {
return;
}
LOG.info("Removing scanner for volume {} (StorageID {})",
- volume.getBasePath(), volume.getStorageID());
+ volume, volume.getStorageID());
scanner.shutdown();
scanners.remove(volume.getStorageID());
Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index dd7e426..cb8e308 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -58,7 +58,6 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
-import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -78,7 +77,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -791,11 +789,7 @@ public class DataNode extends ReconfigurableBase
if (locations.isEmpty()) {
return;
}
- Set<File> volumesToRemove = new HashSet<>();
- for (StorageLocation loc : locations) {
- volumesToRemove.add(loc.getFile().getAbsoluteFile());
- }
- removeVolumes(volumesToRemove, true);
+ removeVolumes(locations, true);
}
/**
@@ -814,26 +808,22 @@ public class DataNode extends ReconfigurableBase
* @throws IOException
*/
private synchronized void removeVolumes(
- final Set<File> absoluteVolumePaths, boolean clearFailure)
+ final Collection<StorageLocation> storageLocations, boolean clearFailure)
throws IOException {
- for (File vol : absoluteVolumePaths) {
- Preconditions.checkArgument(vol.isAbsolute());
- }
-
- if (absoluteVolumePaths.isEmpty()) {
+ if (storageLocations.isEmpty()) {
return;
}
LOG.info(String.format("Deactivating volumes (clear failure=%b): %s",
- clearFailure, Joiner.on(",").join(absoluteVolumePaths)));
+ clearFailure, Joiner.on(",").join(storageLocations)));
IOException ioe = null;
// Remove volumes and block infos from FsDataset.
- data.removeVolumes(absoluteVolumePaths, clearFailure);
+ data.removeVolumes(storageLocations, clearFailure);
// Remove volumes from DataStorage.
try {
- storage.removeVolumes(absoluteVolumePaths);
+ storage.removeVolumes(storageLocations);
} catch (IOException e) {
ioe = e;
}
@@ -841,7 +831,7 @@ public class DataNode extends ReconfigurableBase
// Set configuration and dataDirs to reflect volume changes.
for (Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext(); ) {
StorageLocation loc = it.next();
- if (absoluteVolumePaths.contains(loc.getFile().getAbsoluteFile())) {
+ if (storageLocations.contains(loc)) {
it.remove();
}
}
@@ -3242,18 +3232,18 @@ public class DataNode extends ReconfigurableBase
* Check the disk error
*/
private void checkDiskError() {
- Set<File> unhealthyDataDirs = data.checkDataDir();
- if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
+ Set<StorageLocation> unhealthyLocations = data.checkDataDir();
+ if (unhealthyLocations != null && !unhealthyLocations.isEmpty()) {
try {
// Remove all unhealthy volumes from DataNode.
- removeVolumes(unhealthyDataDirs, false);
+ removeVolumes(unhealthyLocations, false);
} catch (IOException e) {
LOG.warn("Error occurred when removing unhealthy storage dirs: "
+ e.getMessage(), e);
}
StringBuilder sb = new StringBuilder("DataNode failed volumes:");
- for (File dataDir : unhealthyDataDirs) {
- sb.append(dataDir.getAbsolutePath() + ";");
+ for (StorageLocation location : unhealthyLocations) {
+ sb.append(location + ";");
}
handleDiskError(sb.toString());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 7e620c2..7c9bea5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -263,9 +263,10 @@ public class DataStorage extends Storage {
}
private StorageDirectory loadStorageDirectory(DataNode datanode,
- NamespaceInfo nsInfo, File dataDir, StartupOption startOpt,
- List<Callable<StorageDirectory>> callables) throws IOException {
- StorageDirectory sd = new StorageDirectory(dataDir, null, false);
+ NamespaceInfo nsInfo, File dataDir, StorageLocation location,
+ StartupOption startOpt, List<Callable<StorageDirectory>> callables)
+ throws IOException {
+ StorageDirectory sd = new StorageDirectory(dataDir, null, false, location);
try {
StorageState curState = sd.analyzeStorage(startOpt, this, true);
// sd is locked but not opened
@@ -310,7 +311,7 @@ public class DataStorage extends Storage {
* builder later.
*
* @param datanode DataNode object.
- * @param volume the root path of a storage directory.
+ * @param location the StorageLocation for the storage directory.
* @param nsInfos an array of namespace infos.
* @return a VolumeBuilder that holds the metadata of this storage directory
* and can be added to DataStorage later.
@@ -318,8 +319,10 @@ public class DataStorage extends Storage {
*
* Note that if there is IOException, the state of DataStorage is not modified.
*/
- public VolumeBuilder prepareVolume(DataNode datanode, File volume,
- List<NamespaceInfo> nsInfos) throws IOException {
+ public VolumeBuilder prepareVolume(DataNode datanode,
+ StorageLocation location, List<NamespaceInfo> nsInfos)
+ throws IOException {
+ File volume = location.getFile();
if (containsStorageDir(volume)) {
final String errorMessage = "Storage directory is in use";
LOG.warn(errorMessage + ".");
@@ -327,7 +330,8 @@ public class DataStorage extends Storage {
}
StorageDirectory sd = loadStorageDirectory(
- datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null);
+ datanode, nsInfos.get(0), volume, location,
+ StartupOption.HOTSWAP, null);
VolumeBuilder builder =
new VolumeBuilder(this, sd);
for (NamespaceInfo nsInfo : nsInfos) {
@@ -338,7 +342,8 @@ public class DataStorage extends Storage {
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
- nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf());
+ nsInfo, bpDataDirs, location, StartupOption.HOTSWAP,
+ null, datanode.getConf());
builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
}
return builder;
@@ -407,7 +412,7 @@ public class DataStorage extends Storage {
final List<Callable<StorageDirectory>> callables
= Lists.newArrayList();
final StorageDirectory sd = loadStorageDirectory(
- datanode, nsInfo, root, startOpt, callables);
+ datanode, nsInfo, root, dataDir, startOpt, callables);
if (callables.isEmpty()) {
addStorageDir(sd);
success.add(dataDir);
@@ -458,7 +463,8 @@ public class DataStorage extends Storage {
final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
- nsInfo, bpDataDirs, startOpt, callables, datanode.getConf());
+ nsInfo, bpDataDirs, dataDir, startOpt,
+ callables, datanode.getConf());
if (callables.isEmpty()) {
for(StorageDirectory sd : dirs) {
success.add(sd);
@@ -498,9 +504,10 @@ public class DataStorage extends Storage {
* @param dirsToRemove a set of storage directories to be removed.
* @throws IOException if I/O error when unlocking storage directory.
*/
- synchronized void removeVolumes(final Set<File> dirsToRemove)
+ synchronized void removeVolumes(
+ final Collection<StorageLocation> storageLocations)
throws IOException {
- if (dirsToRemove.isEmpty()) {
+ if (storageLocations.isEmpty()) {
return;
}
@@ -508,7 +515,8 @@ public class DataStorage extends Storage {
for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
it.hasNext(); ) {
StorageDirectory sd = it.next();
- if (dirsToRemove.contains(sd.getRoot())) {
+ StorageLocation sdLocation = sd.getStorageLocation();
+ if (storageLocations.contains(sdLocation)) {
// Remove the block pool level storage first.
for (Map.Entry<String, BlockPoolSliceStorage> entry :
this.bpStorageMap.entrySet()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index c50bfaf..58071dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -37,9 +36,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -47,10 +43,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
@@ -209,200 +204,6 @@ public class DirectoryScanner implements Runnable {
}
}
- /**
- * Tracks the files and other information related to a block on the disk
- * Missing file is indicated by setting the corresponding member
- * to null.
- *
- * Because millions of these structures may be created, we try to save
- * memory here. So instead of storing full paths, we store path suffixes.
- * The block file, if it exists, will have a path like this:
- * <volume_base_path>/<block_path>
- * So we don't need to store the volume path, since we already know what the
- * volume is.
- *
- * The metadata file, if it exists, will have a path like this:
- * <volume_base_path>/<block_path>_<genstamp>.meta
- * So if we have a block file, there isn't any need to store the block path
- * again.
- *
- * The accessor functions take care of these manipulations.
- */
- static class ScanInfo implements Comparable<ScanInfo> {
- private final long blockId;
-
- /**
- * The block file path, relative to the volume's base directory.
- * If there was no block file found, this may be null. If 'vol'
- * is null, then this is the full path of the block file.
- */
- private final String blockSuffix;
-
- /**
- * The suffix of the meta file path relative to the block file.
- * If blockSuffix is null, then this will be the entire path relative
- * to the volume base directory, or an absolute path if vol is also
- * null.
- */
- private final String metaSuffix;
-
- private final FsVolumeSpi volume;
-
- /**
- * Get the file's length in async block scan
- */
- private final long blockFileLength;
-
- private final static Pattern CONDENSED_PATH_REGEX =
- Pattern.compile("(?<!^)(\\\\|/){2,}");
-
- private final static String QUOTED_FILE_SEPARATOR =
- Matcher.quoteReplacement(File.separator);
-
- /**
- * Get the most condensed version of the path.
- *
- * For example, the condensed version of /foo//bar is /foo/bar
- * Unlike {@link File#getCanonicalPath()}, this will never perform I/O
- * on the filesystem.
- *
- * @param path the path to condense
- * @return the condensed path
- */
- private static String getCondensedPath(String path) {
- return CONDENSED_PATH_REGEX.matcher(path).
- replaceAll(QUOTED_FILE_SEPARATOR);
- }
-
- /**
- * Get a path suffix.
- *
- * @param f The file to get the suffix for.
- * @param prefix The prefix we're stripping off.
- *
- * @return A suffix such that prefix + suffix = path to f
- */
- private static String getSuffix(File f, String prefix) {
- String fullPath = getCondensedPath(f.getAbsolutePath());
- if (fullPath.startsWith(prefix)) {
- return fullPath.substring(prefix.length());
- }
- throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
- }
-
- /**
- * Create a ScanInfo object for a block. This constructor will examine
- * the block data and meta-data files.
- *
- * @param blockId the block ID
- * @param blockFile the path to the block data file
- * @param metaFile the path to the block meta-data file
- * @param vol the volume that contains the block
- */
- ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
- this.blockId = blockId;
- String condensedVolPath = vol == null ? null :
- getCondensedPath(vol.getBasePath());
- this.blockSuffix = blockFile == null ? null :
- getSuffix(blockFile, condensedVolPath);
- this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
- if (metaFile == null) {
- this.metaSuffix = null;
- } else if (blockFile == null) {
- this.metaSuffix = getSuffix(metaFile, condensedVolPath);
- } else {
- this.metaSuffix = getSuffix(metaFile,
- condensedVolPath + blockSuffix);
- }
- this.volume = vol;
- }
-
- /**
- * Returns the block data file.
- *
- * @return the block data file
- */
- File getBlockFile() {
- return (blockSuffix == null) ? null :
- new File(volume.getBasePath(), blockSuffix);
- }
-
- /**
- * Return the length of the data block. The length returned is the length
- * cached when this object was created.
- *
- * @return the length of the data block
- */
- long getBlockFileLength() {
- return blockFileLength;
- }
-
- /**
- * Returns the block meta data file or null if there isn't one.
- *
- * @return the block meta data file
- */
- File getMetaFile() {
- if (metaSuffix == null) {
- return null;
- } else if (blockSuffix == null) {
- return new File(volume.getBasePath(), metaSuffix);
- } else {
- return new File(volume.getBasePath(), blockSuffix + metaSuffix);
- }
- }
-
- /**
- * Returns the block ID.
- *
- * @return the block ID
- */
- long getBlockId() {
- return blockId;
- }
-
- /**
- * Returns the volume that contains the block that this object describes.
- *
- * @return the volume
- */
- FsVolumeSpi getVolume() {
- return volume;
- }
-
- @Override // Comparable
- public int compareTo(ScanInfo b) {
- if (blockId < b.blockId) {
- return -1;
- } else if (blockId == b.blockId) {
- return 0;
- } else {
- return 1;
- }
- }
-
- @Override // Object
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof ScanInfo)) {
- return false;
- }
- return blockId == ((ScanInfo) o).blockId;
- }
-
- @Override // Object
- public int hashCode() {
- return (int)(blockId^(blockId>>>32));
- }
-
- public long getGenStamp() {
- return metaSuffix != null ? Block.getGenerationStamp(
- getMetaFile().getName()) :
- HdfsConstants.GRANDFATHER_GENERATION_STAMP;
- }
- }
/**
* Create a new directory scanner, but don't cycle it running yet.
@@ -644,7 +445,7 @@ public class DirectoryScanner implements Runnable {
// There may be multiple on-disk records for the same block, don't increment
// the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)];
- if (nextInfo.getBlockId() != info.blockId) {
+ if (nextInfo.getBlockId() != info.getBlockId()) {
++m;
}
} else {
@@ -763,19 +564,6 @@ public class DirectoryScanner implements Runnable {
}
/**
- * Helper method to determine if a file name is consistent with a block.
- * meta-data file
- *
- * @param blockId the block ID
- * @param metaFile the file to check
- * @return whether the file name is a block meta-data file name
- */
- private static boolean isBlockMetaFile(String blockId, String metaFile) {
- return metaFile.startsWith(blockId)
- && metaFile.endsWith(Block.METADATA_EXTENSION);
- }
-
- /**
* The ReportCompiler class encapsulates the process of searching a datanode's
* disks for block information. It operates by performing a DFS of the
* volume to discover block information.
@@ -784,7 +572,7 @@ public class DirectoryScanner implements Runnable {
* ScanInfo object for it and adds that object to its report list. The report
* list is returned by the {@link #call()} method.
*/
- private class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
+ public class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
private final FsVolumeSpi volume;
private final DataNode datanode;
// Variable for tracking time spent running for throttling purposes
@@ -816,14 +604,12 @@ public class DirectoryScanner implements Runnable {
ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<>();
- File bpFinalizedDir = volume.getFinalizedDir(bpid);
perfTimer.start();
throttleTimer.start();
try {
- result.put(bpid,
- compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
+ result.put(bpid, volume.compileReport(bpid, report, this));
} catch (InterruptedException ex) {
// Exit quickly and flag the scanner to do the same
result = null;
@@ -834,106 +620,12 @@ public class DirectoryScanner implements Runnable {
}
/**
- * Compile a list of {@link ScanInfo} for the blocks in the directory
- * given by {@code dir}.
- *
- * @param vol the volume that contains the directory to scan
- * @param bpFinalizedDir the root directory of the directory to scan
- * @param dir the directory to scan
- * @param report the list onto which blocks reports are placed
- */
- private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol,
- File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
- throws InterruptedException {
-
- throttle();
-
- List <String> fileNames;
- try {
- fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
- } catch (IOException ioe) {
- LOG.warn("Exception occured while compiling report: ", ioe);
- // Initiate a check on disk failure.
- datanode.checkDiskErrorAsync();
- // Ignore this directory and proceed.
- return report;
- }
- Collections.sort(fileNames);
-
- /*
- * Assumption: In the sorted list of files block file appears immediately
- * before block metadata file. This is true for the current naming
- * convention for block file blk_<blockid> and meta file
- * blk_<blockid>_<genstamp>.meta
- */
- for (int i = 0; i < fileNames.size(); i++) {
- // Make sure this thread can make a timely exit. With a low throttle
- // rate, completing a run can take a looooong time.
- if (Thread.interrupted()) {
- throw new InterruptedException();
- }
-
- File file = new File(dir, fileNames.get(i));
- if (file.isDirectory()) {
- compileReport(vol, bpFinalizedDir, file, report);
- continue;
- }
- if (!Block.isBlockFilename(file)) {
- if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
- long blockId = Block.getBlockId(file.getName());
- verifyFileLocation(file.getParentFile(), bpFinalizedDir,
- blockId);
- report.add(new ScanInfo(blockId, null, file, vol));
- }
- continue;
- }
- File blockFile = file;
- long blockId = Block.filename2id(file.getName());
- File metaFile = null;
-
- // Skip all the files that start with block name until
- // getting to the metafile for the block
- while (i + 1 < fileNames.size()) {
- File blkMetaFile = new File(dir, fileNames.get(i + 1));
- if (!(blkMetaFile.isFile()
- && blkMetaFile.getName().startsWith(blockFile.getName()))) {
- break;
- }
- i++;
- if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
- metaFile = blkMetaFile;
- break;
- }
- }
- verifyFileLocation(blockFile, bpFinalizedDir, blockId);
- report.add(new ScanInfo(blockId, blockFile, metaFile, vol));
- }
- return report;
- }
-
- /**
- * Verify whether the actual directory location of block file has the
- * expected directory path computed using its block ID.
- */
- private void verifyFileLocation(File actualBlockFile,
- File bpFinalizedDir, long blockId) {
- File expectedBlockDir =
- DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
- File actualBlockDir = actualBlockFile.getParentFile();
- if (actualBlockDir.compareTo(expectedBlockDir) != 0) {
- LOG.warn("Block: " + blockId +
- " found in invalid directory. Expected directory: " +
- expectedBlockDir + ". Actual directory: " + actualBlockDir);
- }
- }
-
- /**
* Called by the thread before each potential disk scan so that a pause
* can be optionally inserted to limit the number of scans per second.
* The limit is controlled by
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
*/
- private void throttle() throws InterruptedException {
+ public void throttle() throws InterruptedException {
accumulateTimeRunning();
if ((throttleLimitMsPerSec < 1000) &&
@@ -963,7 +655,7 @@ public class DirectoryScanner implements Runnable {
}
}
- private enum BlockDirFilter implements FilenameFilter {
+ public enum BlockDirFilter implements FilenameFilter {
INSTANCE;
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index e7e9105..0c75001 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -500,7 +500,8 @@ public class DiskBalancer {
references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
- storageIDToVolBasePathMap.put(vol.getStorageID(), vol.getBasePath());
+ storageIDToVolBasePathMap.put(vol.getStorageID(),
+ vol.getBaseURI().getPath());
}
references.close();
}
@@ -1023,7 +1024,7 @@ public class DiskBalancer {
openPoolIters(source, poolIters);
if (poolIters.size() == 0) {
LOG.error("No block pools found on volume. volume : {}. Exiting.",
- source.getBasePath());
+ source.getBaseURI());
return;
}
@@ -1033,17 +1034,16 @@ public class DiskBalancer {
// Check for the max error count constraint.
if (item.getErrorCount() > getMaxError(item)) {
LOG.error("Exceeded the max error count. source {}, dest: {} " +
- "error count: {}", source.getBasePath(),
- dest.getBasePath(), item.getErrorCount());
- this.setExitFlag();
- continue;
+ "error count: {}", source.getBaseURI(),
+ dest.getBaseURI(), item.getErrorCount());
+ break;
}
// Check for the block tolerance constraint.
if (isCloseEnough(item)) {
LOG.info("Copy from {} to {} done. copied {} bytes and {} " +
"blocks.",
- source.getBasePath(), dest.getBasePath(),
+ source.getBaseURI(), dest.getBaseURI(),
item.getBytesCopied(), item.getBlocksCopied());
this.setExitFlag();
continue;
@@ -1053,7 +1053,7 @@ public class DiskBalancer {
// we are not able to find any blocks to copy.
if (block == null) {
LOG.error("No source blocks, exiting the copy. Source: {}, " +
- "Dest:{}", source.getBasePath(), dest.getBasePath());
+ "Dest:{}", source.getBaseURI(), dest.getBaseURI());
this.setExitFlag();
continue;
}
@@ -1081,14 +1081,13 @@ public class DiskBalancer {
// exiting here.
LOG.error("Destination volume: {} does not have enough space to" +
" accommodate a block. Block Size: {} Exiting from" +
- " copyBlocks.", dest.getBasePath(), block.getNumBytes());
- this.setExitFlag();
- continue;
+ " copyBlocks.", dest.getBaseURI(), block.getNumBytes());
+ break;
}
LOG.debug("Moved block with size {} from {} to {}",
- block.getNumBytes(), source.getBasePath(),
- dest.getBasePath());
+ block.getNumBytes(), source.getBaseURI(),
+ dest.getBaseURI());
// Check for the max throughput constraint.
// We sleep here to keep the promise that we will not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
index cbfc9a5..58febf0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.io.IOUtils;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index cbbafc3..dc63238 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -25,8 +25,8 @@ import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.util.LightWeightResizableGSet;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
index 3162c5c..75abc1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.StringUtils;
+
/**
* Encapsulates the URI and storage medium that together describe a
* storage directory.
@@ -37,7 +38,7 @@ import org.apache.hadoop.util.StringUtils;
*
*/
@InterfaceAudience.Private
-public class StorageLocation {
+public class StorageLocation implements Comparable<StorageLocation>{
final StorageType storageType;
final File file;
@@ -104,16 +105,37 @@ public class StorageLocation {
@Override
public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- } else if (obj == null || !(obj instanceof StorageLocation)) {
+ if (obj == null || !(obj instanceof StorageLocation)) {
return false;
}
- return toString().equals(obj.toString());
+ int comp = compareTo((StorageLocation) obj);
+ return comp == 0;
}
@Override
public int hashCode() {
return toString().hashCode();
}
+
+ @Override
+ public int compareTo(StorageLocation obj) {
+ if (obj == this) {
+ return 0;
+ } else if (obj == null) {
+ return -1;
+ }
+
+ StorageLocation otherStorage = (StorageLocation) obj;
+ if (this.getFile() != null && otherStorage.getFile() != null) {
+ return this.getFile().getAbsolutePath().compareTo(
+ otherStorage.getFile().getAbsolutePath());
+ } else if (this.getFile() == null && otherStorage.getFile() == null) {
+ return this.storageType.compareTo(otherStorage.getStorageType());
+ } else if (this.getFile() == null) {
+ return -1;
+ } else {
+ return 1;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 3416b53..1e44fb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -217,7 +217,7 @@ public class VolumeScanner extends Thread {
public void printStats(StringBuilder p) {
p.append(String.format("Block scanner information for volume %s with base" +
- " path %s%n", volume.getStorageID(), volume.getBasePath()));
+ " path %s%n", volume.getStorageID(), volume));
synchronized (stats) {
p.append(String.format("Bytes verified in last hour : %57d%n",
stats.bytesScannedInPastHour));
@@ -253,20 +253,20 @@ public class VolumeScanner extends Thread {
public void setup(VolumeScanner scanner) {
LOG.trace("Starting VolumeScanner {}",
- scanner.volume.getBasePath());
+ scanner.volume);
this.scanner = scanner;
}
public void handle(ExtendedBlock block, IOException e) {
FsVolumeSpi volume = scanner.volume;
if (e == null) {
- LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath());
+ LOG.trace("Successfully scanned {} on {}", block, volume);
return;
}
// If the block does not exist anymore, then it's not an error.
if (!volume.getDataset().contains(block)) {
LOG.debug("Volume {}: block {} is no longer in the dataset.",
- volume.getBasePath(), block);
+ volume, block);
return;
}
// If the block exists, the exception may due to a race with write:
@@ -278,11 +278,10 @@ public class VolumeScanner extends Thread {
if (e instanceof FileNotFoundException ) {
LOG.info("Volume {}: verification failed for {} because of " +
"FileNotFoundException. This may be due to a race with write.",
- volume.getBasePath(), block);
+ volume, block);
return;
}
- LOG.warn("Reporting bad " + block + " with volume "
- + volume.getBasePath(), e);
+ LOG.warn("Reporting bad {} on {}", block, volume);
try {
scanner.datanode.reportBadBlocks(block, volume);
} catch (IOException ie) {
@@ -305,7 +304,7 @@ public class VolumeScanner extends Thread {
handler = new ScanResultHandler();
}
this.resultHandler = handler;
- setName("VolumeScannerThread(" + volume.getBasePath() + ")");
+ setName("VolumeScannerThread(" + volume + ")");
setDaemon(true);
}
@@ -376,7 +375,7 @@ public class VolumeScanner extends Thread {
BlockIterator iter = blockIters.get(idx);
if (!iter.atEnd()) {
LOG.info("Now scanning bpid {} on volume {}",
- iter.getBlockPoolId(), volume.getBasePath());
+ iter.getBlockPoolId(), volume);
curBlockIter = iter;
return 0L;
}
@@ -385,7 +384,7 @@ public class VolumeScanner extends Thread {
if (waitMs <= 0) {
iter.rewind();
LOG.info("Now rescanning bpid {} on volume {}, after more than " +
- "{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(),
+ "{} hour(s)", iter.getBlockPoolId(), volume,
TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS));
curBlockIter = iter;
return 0L;
@@ -416,16 +415,16 @@ public class VolumeScanner extends Thread {
cblock.getBlockPoolId(), cblock.getBlockId());
if (b == null) {
LOG.info("Replica {} was not found in the VolumeMap for volume {}",
- cblock, volume.getBasePath());
+ cblock, volume);
} else {
block = new ExtendedBlock(cblock.getBlockPoolId(), b);
}
} catch (FileNotFoundException e) {
LOG.info("FileNotFoundException while finding block {} on volume {}",
- cblock, volume.getBasePath());
+ cblock, volume);
} catch (IOException e) {
LOG.warn("I/O error while finding block {} on volume {}",
- cblock, volume.getBasePath());
+ cblock, volume);
}
if (block == null) {
return -1; // block not found.
@@ -642,7 +641,7 @@ public class VolumeScanner extends Thread {
@Override
public String toString() {
- return "VolumeScanner(" + volume.getBasePath() +
+ return "VolumeScanner(" + volume +
", " + volume.getStorageID() + ")";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index b75ed5b..f2ffa83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -206,7 +207,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @param clearFailure set true to clear the failure information about the
* volumes.
*/
- void removeVolumes(Set<File> volumes, boolean clearFailure);
+ void removeVolumes(Collection<StorageLocation> volumes, boolean clearFailure);
/** @return a storage with the given storage ID */
DatanodeStorage getStorage(final String storageUuid);
@@ -482,7 +483,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Check if all the data directories are healthy
* @return A set of unhealthy data directories.
*/
- Set<File> checkDataDir();
+ Set<StorageLocation> checkDataDir();
/**
* Shutdown the FSDataset
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index 9e16121..dbba31d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -20,10 +20,20 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.nio.channels.ClosedChannelException;
+import java.util.LinkedList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
/**
* This is an interface for the underlying volume.
@@ -48,14 +58,14 @@ public interface FsVolumeSpi {
long getAvailable() throws IOException;
/** @return the base path to the volume */
- String getBasePath();
+ URI getBaseURI();
- /** @return the path to the volume */
- String getPath(String bpid) throws IOException;
+ DF getUsageStats(Configuration conf);
- /** @return the directory for the finalized blocks in the block pool. */
- File getFinalizedDir(String bpid) throws IOException;
-
+ /** @return the {@link StorageLocation} to the volume */
+ StorageLocation getStorageLocation();
+
+ /** @return the {@link StorageType} of the volume */
StorageType getStorageType();
/** Returns true if the volume is NOT backed by persistent storage. */
@@ -186,4 +196,216 @@ public interface FsVolumeSpi {
* Get the FSDatasetSpi which this volume is a part of.
*/
FsDatasetSpi getDataset();
+
+ /**
+ * Tracks the files and other information related to a block on the disk
+ * Missing file is indicated by setting the corresponding member
+ * to null.
+ *
+ * Because millions of these structures may be created, we try to save
+ * memory here. So instead of storing full paths, we store path suffixes.
+ * The block file, if it exists, will have a path like this:
+ * <volume_base_path>/<block_path>
+ * So we don't need to store the volume path, since we already know what the
+ * volume is.
+ *
+ * The metadata file, if it exists, will have a path like this:
+ * <volume_base_path>/<block_path>_<genstamp>.meta
+ * So if we have a block file, there isn't any need to store the block path
+ * again.
+ *
+ * The accessor functions take care of these manipulations.
+ */
+ public static class ScanInfo implements Comparable<ScanInfo> {
+ private final long blockId;
+
+ /**
+ * The block file path, relative to the volume's base directory.
+ * If there was no block file found, this may be null. If 'vol'
+ * is null, then this is the full path of the block file.
+ */
+ private final String blockSuffix;
+
+ /**
+ * The suffix of the meta file path relative to the block file.
+ * If blockSuffix is null, then this will be the entire path relative
+ * to the volume base directory, or an absolute path if vol is also
+ * null.
+ */
+ private final String metaSuffix;
+
+ private final FsVolumeSpi volume;
+
+ /**
+ * Get the file's length in async block scan
+ */
+ private final long blockFileLength;
+
+ private final static Pattern CONDENSED_PATH_REGEX =
+ Pattern.compile("(?<!^)(\\\\|/){2,}");
+
+ private final static String QUOTED_FILE_SEPARATOR =
+ Matcher.quoteReplacement(File.separator);
+
+ /**
+ * Get the most condensed version of the path.
+ *
+ * For example, the condensed version of /foo//bar is /foo/bar
+ * Unlike {@link File#getCanonicalPath()}, this will never perform I/O
+ * on the filesystem.
+ *
+ * @param path the path to condense
+ * @return the condensed path
+ */
+ private static String getCondensedPath(String path) {
+ return CONDENSED_PATH_REGEX.matcher(path).
+ replaceAll(QUOTED_FILE_SEPARATOR);
+ }
+
+ /**
+ * Get a path suffix.
+ *
+ * @param f The file to get the suffix for.
+ * @param prefix The prefix we're stripping off.
+ *
+ * @return A suffix such that prefix + suffix = path to f
+ */
+ private static String getSuffix(File f, String prefix) {
+ String fullPath = getCondensedPath(f.getAbsolutePath());
+ if (fullPath.startsWith(prefix)) {
+ return fullPath.substring(prefix.length());
+ }
+ throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
+ }
+
+ /**
+ * Create a ScanInfo object for a block. This constructor will examine
+ * the block data and meta-data files.
+ *
+ * @param blockId the block ID
+ * @param blockFile the path to the block data file
+ * @param metaFile the path to the block meta-data file
+ * @param vol the volume that contains the block
+ */
+ public ScanInfo(long blockId, File blockFile, File metaFile,
+ FsVolumeSpi vol) {
+ this.blockId = blockId;
+ String condensedVolPath =
+ (vol == null || vol.getBaseURI() == null) ? null :
+ getCondensedPath(new File(vol.getBaseURI()).getAbsolutePath());
+ this.blockSuffix = blockFile == null ? null :
+ getSuffix(blockFile, condensedVolPath);
+ this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
+ if (metaFile == null) {
+ this.metaSuffix = null;
+ } else if (blockFile == null) {
+ this.metaSuffix = getSuffix(metaFile, condensedVolPath);
+ } else {
+ this.metaSuffix = getSuffix(metaFile,
+ condensedVolPath + blockSuffix);
+ }
+ this.volume = vol;
+ }
+
+ /**
+ * Returns the block data file.
+ *
+ * @return the block data file
+ */
+ public File getBlockFile() {
+ return (blockSuffix == null) ? null :
+ new File(new File(volume.getBaseURI()).getAbsolutePath(), blockSuffix);
+ }
+
+ /**
+ * Return the length of the data block. The length returned is the length
+ * cached when this object was created.
+ *
+ * @return the length of the data block
+ */
+ public long getBlockFileLength() {
+ return blockFileLength;
+ }
+
+ /**
+ * Returns the block meta data file or null if there isn't one.
+ *
+ * @return the block meta data file
+ */
+ public File getMetaFile() {
+ if (metaSuffix == null) {
+ return null;
+ } else if (blockSuffix == null) {
+ return new File(new File(volume.getBaseURI()).getAbsolutePath(),
+ metaSuffix);
+ } else {
+ return new File(new File(volume.getBaseURI()).getAbsolutePath(),
+ blockSuffix + metaSuffix);
+ }
+ }
+
+ /**
+ * Returns the block ID.
+ *
+ * @return the block ID
+ */
+ public long getBlockId() {
+ return blockId;
+ }
+
+ /**
+ * Returns the volume that contains the block that this object describes.
+ *
+ * @return the volume
+ */
+ public FsVolumeSpi getVolume() {
+ return volume;
+ }
+
+ @Override // Comparable
+ public int compareTo(ScanInfo b) {
+ if (blockId < b.blockId) {
+ return -1;
+ } else if (blockId == b.blockId) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ScanInfo)) {
+ return false;
+ }
+ return blockId == ((ScanInfo) o).blockId;
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return (int)(blockId^(blockId>>>32));
+ }
+
+ public long getGenStamp() {
+ return metaSuffix != null ? Block.getGenerationStamp(
+ getMetaFile().getName()) :
+ HdfsConstants.GRANDFATHER_GENERATION_STAMP;
+ }
+ }
+
+ /**
+ * Compile a list of {@link ScanInfo} for the blocks in
+ * the block pool with id {@code bpid}.
+ *
+ * @param bpid block pool id to scan
+ * @param report the list onto which blocks reports are placed
+ * @param reportCompiler
+ * @throws IOException
+ */
+ LinkedList<ScanInfo> compileReport(String bpid,
+ LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+ throws InterruptedException, IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index c9160cd..b9c731b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -71,8 +71,8 @@ class FsDatasetAsyncDiskService {
private final DataNode datanode;
private final FsDatasetImpl fsdatasetImpl;
private final ThreadGroup threadGroup;
- private Map<File, ThreadPoolExecutor> executors
- = new HashMap<File, ThreadPoolExecutor>();
+ private Map<String, ThreadPoolExecutor> executors
+ = new HashMap<String, ThreadPoolExecutor>();
private Map<String, Set<Long>> deletedBlockIds
= new HashMap<String, Set<Long>>();
private static final int MAX_DELETED_BLOCKS = 64;
@@ -91,7 +91,7 @@ class FsDatasetAsyncDiskService {
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
}
- private void addExecutorForVolume(final File volume) {
+ private void addExecutorForVolume(final FsVolumeImpl volume) {
ThreadFactory threadFactory = new ThreadFactory() {
int counter = 0;
@@ -115,18 +115,21 @@ class FsDatasetAsyncDiskService {
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
- executors.put(volume, executor);
+ executors.put(volume.getStorageID(), executor);
}
/**
* Starts AsyncDiskService for a new volume
* @param volume the root of the new data volume.
*/
- synchronized void addVolume(File volume) {
+ synchronized void addVolume(FsVolumeImpl volume) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
- ThreadPoolExecutor executor = executors.get(volume);
+ if (volume == null) {
+ throw new RuntimeException("Attempt to add a null volume");
+ }
+ ThreadPoolExecutor executor = executors.get(volume.getStorageID());
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
@@ -137,17 +140,17 @@ class FsDatasetAsyncDiskService {
* Stops AsyncDiskService for a volume.
* @param volume the root of the volume.
*/
- synchronized void removeVolume(File volume) {
+ synchronized void removeVolume(String storageId) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
- ThreadPoolExecutor executor = executors.get(volume);
+ ThreadPoolExecutor executor = executors.get(storageId);
if (executor == null) {
- throw new RuntimeException("Can not find volume " + volume
- + " to remove.");
+ throw new RuntimeException("Can not find volume with storageId "
+ + storageId + " to remove.");
} else {
executor.shutdown();
- executors.remove(volume);
+ executors.remove(storageId);
}
}
@@ -162,13 +165,16 @@ class FsDatasetAsyncDiskService {
/**
* Execute the task sometime in the future, using ThreadPools.
*/
- synchronized void execute(File root, Runnable task) {
+ synchronized void execute(FsVolumeImpl volume, Runnable task) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
- ThreadPoolExecutor executor = executors.get(root);
+ if (volume == null) {
+ throw new RuntimeException("A null volume does not have a executor");
+ }
+ ThreadPoolExecutor executor = executors.get(volume.getStorageID());
if (executor == null) {
- throw new RuntimeException("Cannot find root " + root
+ throw new RuntimeException("Cannot find volume " + volume
+ " for execution of task " + task);
} else {
executor.execute(task);
@@ -185,7 +191,7 @@ class FsDatasetAsyncDiskService {
} else {
LOG.info("Shutting down all async disk service threads");
- for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
+ for (Map.Entry<String, ThreadPoolExecutor> e : executors.entrySet()) {
e.getValue().shutdown();
}
// clear the executor map so that calling execute again will fail.
@@ -198,7 +204,7 @@ class FsDatasetAsyncDiskService {
public void submitSyncFileRangeRequest(FsVolumeImpl volume,
final FileDescriptor fd, final long offset, final long nbytes,
final int flags) {
- execute(volume.getCurrentDir(), new Runnable() {
+ execute(volume, new Runnable() {
@Override
public void run() {
try {
@@ -220,7 +226,7 @@ class FsDatasetAsyncDiskService {
+ " replica " + replicaToDelete + " for deletion");
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
volumeRef, replicaToDelete, block, trashDirectory);
- execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask);
+ execute(((FsVolumeImpl) volumeRef.getVolume()), deletionTask);
}
/** A task for deleting a block file and its associated meta file, as well
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 26a2e9f..fd747bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -361,20 +361,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
Collection<StorageLocation> dataLocations, DataStorage storage) {
- Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
+ Set<StorageLocation> failedLocationSet = Sets.newHashSetWithExpectedSize(
dataLocations.size());
for (StorageLocation sl: dataLocations) {
- failedLocationSet.add(sl.getFile().getAbsolutePath());
+ LOG.info("Adding to failedLocationSet " + sl);
+ failedLocationSet.add(sl);
}
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) {
Storage.StorageDirectory sd = it.next();
- failedLocationSet.remove(sd.getRoot().getAbsolutePath());
+ failedLocationSet.remove(sd.getStorageLocation());
+ LOG.info("Removing from failedLocationSet " + sd.getStorageLocation());
}
List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
failedLocationSet.size());
long failureDate = Time.now();
- for (String failedStorageLocation: failedLocationSet) {
+ for (StorageLocation failedStorageLocation: failedLocationSet) {
volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
failureDate));
}
@@ -403,49 +405,55 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
- asyncDiskService.addVolume(sd.getCurrentDir());
+ asyncDiskService.addVolume((FsVolumeImpl) ref.getVolume());
volumes.addVolume(ref);
}
}
private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException {
- final File dir = sd.getCurrentDir();
- final StorageType storageType =
- getStorageTypeFromLocations(dataLocations, sd.getRoot());
+ final StorageLocation storageLocation = sd.getStorageLocation();
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
// nothing needed to be rolled back to make various data structures, e.g.,
// storageMap and asyncDiskService, consistent.
- FsVolumeImpl fsVolume = new FsVolumeImpl(
- this, sd.getStorageUuid(), dir, this.conf, storageType);
+ FsVolumeImpl fsVolume = new FsVolumeImplBuilder()
+ .setDataset(this)
+ .setStorageID(sd.getStorageUuid())
+ .setStorageDirectory(sd)
+ .setConf(this.conf)
+ .build();
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
- activateVolume(tempVolumeMap, sd, storageType, ref);
- LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+ activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
+ LOG.info("Added volume - " + storageLocation + ", StorageType: " +
+ storageLocation.getStorageType());
}
@VisibleForTesting
- public FsVolumeImpl createFsVolume(String storageUuid, File currentDir,
- StorageType storageType) throws IOException {
- return new FsVolumeImpl(this, storageUuid, currentDir, conf, storageType);
+ public FsVolumeImpl createFsVolume(String storageUuid,
+ Storage.StorageDirectory sd,
+ final StorageLocation location) throws IOException {
+ return new FsVolumeImplBuilder()
+ .setDataset(this)
+ .setStorageID(storageUuid)
+ .setStorageDirectory(sd)
+ .setConf(conf)
+ .build();
}
@Override
public void addVolume(final StorageLocation location,
final List<NamespaceInfo> nsInfos)
throws IOException {
- final File dir = location.getFile();
-
// Prepare volume in DataStorage
final DataStorage.VolumeBuilder builder;
try {
- builder = dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
+ builder = dataStorage.prepareVolume(datanode, location, nsInfos);
} catch (IOException e) {
- volumes.addVolumeFailureInfo(new VolumeFailureInfo(
- location.getFile().getAbsolutePath(), Time.now()));
+ volumes.addVolumeFailureInfo(new VolumeFailureInfo(location, Time.now()));
throw e;
}
@@ -453,7 +461,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume =
- createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
+ createFsVolume(sd.getStorageUuid(), sd, location);
final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
ArrayList<IOException> exceptions = Lists.newArrayList();
@@ -482,34 +490,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
builder.build();
activateVolume(tempVolumeMap, sd, storageType, ref);
- LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+ LOG.info("Added volume - " + location + ", StorageType: " + storageType);
}
/**
* Removes a set of volumes from FsDataset.
- * @param volumesToRemove a set of absolute root path of each volume.
+ * @param storageLocationsToRemove a set of
+ * {@link StorageLocation}s for each volume.
* @param clearFailure set true to clear failure information.
*/
@Override
- public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) {
- // Make sure that all volumes are absolute path.
- for (File vol : volumesToRemove) {
- Preconditions.checkArgument(vol.isAbsolute(),
- String.format("%s is not absolute path.", vol.getPath()));
- }
-
+ public void removeVolumes(
+ Collection<StorageLocation> storageLocationsToRemove,
+ boolean clearFailure) {
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>();
try (AutoCloseableLock lock = datasetLock.acquire()) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
- final File absRoot = sd.getRoot().getAbsoluteFile();
- if (volumesToRemove.contains(absRoot)) {
- LOG.info("Removing " + absRoot + " from FsDataset.");
-
+ final StorageLocation sdLocation = sd.getStorageLocation();
+ LOG.info("Checking removing StorageLocation " +
+ sdLocation + " with id " + sd.getStorageUuid());
+ if (storageLocationsToRemove.contains(sdLocation)) {
+ LOG.info("Removing StorageLocation " + sdLocation + " with id " +
+ sd.getStorageUuid() + " from FsDataset.");
// Disable the volume from the service.
- asyncDiskService.removeVolume(sd.getCurrentDir());
- volumes.removeVolume(absRoot, clearFailure);
+ asyncDiskService.removeVolume(sd.getStorageUuid());
+ volumes.removeVolume(sdLocation, clearFailure);
volumes.waitVolumeRemoved(5000, datasetLockCondition);
// Removed all replica information for the blocks on the volume.
@@ -517,12 +524,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// not scan disks.
for (String bpid : volumeMap.getBlockPoolList()) {
List<ReplicaInfo> blocks = new ArrayList<>();
- for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
- it.hasNext(); ) {
+ for (Iterator<ReplicaInfo> it =
+ volumeMap.replicas(bpid).iterator(); it.hasNext();) {
ReplicaInfo block = it.next();
- final File absBasePath =
- new File(block.getVolume().getBasePath()).getAbsoluteFile();
- if (absBasePath.equals(absRoot)) {
+ final StorageLocation blockStorageLocation =
+ block.getVolume().getStorageLocation();
+ LOG.info("checking for block " + block.getBlockId() +
+ " with storageLocation " + blockStorageLocation);
+ if (blockStorageLocation.equals(sdLocation)) {
blocks.add(block);
it.remove();
}
@@ -625,7 +634,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
infos.length);
for (VolumeFailureInfo info: infos) {
- failedStorageLocations.add(info.getFailedStorageLocation());
+ failedStorageLocations.add(
+ info.getFailedStorageLocation().getFile().getAbsolutePath());
}
return failedStorageLocations.toArray(
new String[failedStorageLocations.size()]);
@@ -663,7 +673,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long lastVolumeFailureDate = 0;
long estimatedCapacityLostTotal = 0;
for (VolumeFailureInfo info: infos) {
- failedStorageLocations.add(info.getFailedStorageLocation());
+ failedStorageLocations.add(
+ info.getFailedStorageLocation().getFile().getAbsolutePath());
long failureDate = info.getFailureDate();
if (failureDate > lastVolumeFailureDate) {
lastVolumeFailureDate = failureDate;
@@ -960,25 +971,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
// Copy files to temp dir first
- File[] blockFiles = copyBlockFiles(block.getBlockId(),
- block.getGenerationStamp(), replicaInfo,
- targetVolume.getTmpDir(block.getBlockPoolId()),
- replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
-
- ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
- .setBlockId(replicaInfo.getBlockId())
- .setGenerationStamp(replicaInfo.getGenerationStamp())
- .setFsVolume(targetVolume)
- .setDirectoryToUse(blockFiles[0].getParentFile())
- .setBytesToReserve(0)
- .build();
- newReplicaInfo.setNumBytes(blockFiles[1].length());
+ ReplicaInfo newReplicaInfo = targetVolume.moveBlockToTmpLocation(block,
+ replicaInfo, smallBufferSize, conf);
+
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
try (AutoCloseableLock lock = datasetLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
- volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
+ volume.incrNumBlocks(block.getBlockPoolId());
}
removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
@@ -2072,7 +2073,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @return the failed volumes. Returns null if no volume failed.
*/
@Override // FsDatasetSpi
- public Set<File> checkDataDir() {
+ public Set<StorageLocation> checkDataDir() {
return volumes.checkDirs();
}
@@ -2250,9 +2251,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setFsVolume(vol)
.setDirectoryToUse(diskFile.getParentFile())
.build();
- ((FsVolumeImpl) vol).getBlockPoolSlice(bpid)
- .resolveDuplicateReplicas(
- memBlockInfo, diskBlockInfo, volumeMap);
+ ((FsVolumeImpl) vol).resolveDuplicateReplicas(bpid,
+ memBlockInfo, diskBlockInfo, volumeMap);
}
} else {
if (!diskFile.delete()) {
@@ -2803,15 +2803,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Add thread for DISK volume if RamDisk is configured
if (ramDiskConfigured &&
asyncLazyPersistService != null &&
- !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
- asyncLazyPersistService.addVolume(v.getCurrentDir());
+ !asyncLazyPersistService.queryVolume(v)) {
+ asyncLazyPersistService.addVolume(v);
}
// Remove thread for DISK volume if RamDisk is not configured
if (!ramDiskConfigured &&
asyncLazyPersistService != null &&
- asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
- asyncLazyPersistService.removeVolume(v.getCurrentDir());
+ asyncLazyPersistService.queryVolume(v)) {
+ asyncLazyPersistService.removeVolume(v);
}
}
@@ -2946,11 +2946,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Move the replica from lazyPersist/ to finalized/ on
// the target volume
- BlockPoolSlice bpSlice =
- replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
-
newReplicaInfo =
- bpSlice.activateSavedReplica(replicaInfo, replicaState);
+ replicaState.getLazyPersistVolume().activateSavedReplica(bpid,
+ replicaInfo, replicaState);
// Update the volumeMap entry.
volumeMap.add(bpid, newReplicaInfo);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org