You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/10/13 22:35:14 UTC
[24/51] [abbrv] hadoop git commit: HDFS-10637. Modifications to
remove the assumption that FsVolumes are backed by java.io.File. (Virajith
Jalaparti via lei)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 57fab66..76af724 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -23,11 +23,13 @@ import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -56,13 +58,18 @@ import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.BlockDirFilter;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.CloseableReferenceCount;
@@ -102,8 +109,14 @@ public class FsVolumeImpl implements FsVolumeSpi {
private final StorageType storageType;
private final Map<String, BlockPoolSlice> bpSlices
= new ConcurrentHashMap<String, BlockPoolSlice>();
+
+ // Refers to the base StorageLocation used to construct this volume
+ // (i.e., does not include STORAGE_DIR_CURRENT in
+ // <location>/STORAGE_DIR_CURRENT/)
+ private final StorageLocation storageLocation;
+
private final File currentDir; // <StorageDirectory>/current
- private final DF usage;
+ private final DF usage;
private final long reserved;
private CloseableReferenceCount reference = new CloseableReferenceCount();
@@ -124,19 +137,25 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
protected ThreadPoolExecutor cacheExecutor;
- FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
- Configuration conf, StorageType storageType) throws IOException {
+ FsVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd,
+ Configuration conf) throws IOException {
+
+ if (sd.getStorageLocation() == null) {
+ throw new IOException("StorageLocation specified for storage directory " +
+ sd + " is null");
+ }
this.dataset = dataset;
this.storageID = storageID;
+ this.reservedForReplicas = new AtomicLong(0L);
+ this.storageLocation = sd.getStorageLocation();
+ this.currentDir = sd.getCurrentDir();
+ File parent = currentDir.getParentFile();
+ this.usage = new DF(parent, conf);
+ this.storageType = storageLocation.getStorageType();
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY
+ "." + StringUtils.toLowerCase(storageType.toString()), conf.getLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT));
- this.reservedForReplicas = new AtomicLong(0L);
- this.currentDir = currentDir;
- File parent = currentDir.getParentFile();
- this.usage = new DF(parent, conf);
- this.storageType = storageType;
this.configuredCapacity = -1;
this.conf = conf;
cacheExecutor = initializeCacheExecutor(parent);
@@ -285,19 +304,20 @@ public class FsVolumeImpl implements FsVolumeSpi {
return true;
}
+ @VisibleForTesting
File getCurrentDir() {
return currentDir;
}
- File getRbwDir(String bpid) throws IOException {
+ protected File getRbwDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getRbwDir();
}
- File getLazyPersistDir(String bpid) throws IOException {
+ protected File getLazyPersistDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getLazypersistDir();
}
- File getTmpDir(String bpid) throws IOException {
+ protected File getTmpDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getTmpDir();
}
@@ -448,6 +468,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
return reserved;
}
+ @VisibleForTesting
BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp == null) {
@@ -457,21 +478,33 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
@Override
- public String getBasePath() {
- return currentDir.getParent();
+ public URI getBaseURI() {
+ return new File(currentDir.getParent()).toURI();
}
-
+
@Override
- public boolean isTransientStorage() {
- return storageType.isTransient();
+ public DF getUsageStats(Configuration conf) {
+ if (currentDir != null) {
+ try {
+ return new DF(new File(currentDir.getParent()), conf);
+ } catch (IOException e) {
+ LOG.error("Unable to get disk statistics for volume " + this);
+ }
+ }
+ return null;
}
@Override
- public String getPath(String bpid) throws IOException {
- return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
+ public StorageLocation getStorageLocation() {
+ return storageLocation;
}
@Override
+ public boolean isTransientStorage() {
+ return storageType.isTransient();
+ }
+
+ @VisibleForTesting
public File getFinalizedDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getFinalizedDir();
}
@@ -951,7 +984,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
@Override
public String toString() {
- return currentDir.getAbsolutePath();
+ return currentDir != null ? currentDir.getParent() : "NULL";
}
void shutdown() {
@@ -1189,5 +1222,167 @@ public class FsVolumeImpl implements FsVolumeSpi {
dstBlockFile, true, DFSUtilClient.getSmallBufferSize(conf), conf);
}
+ @Override
+ public LinkedList<ScanInfo> compileReport(String bpid,
+ LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+ throws InterruptedException, IOException {
+ return compileReport(getFinalizedDir(bpid),
+ getFinalizedDir(bpid), report, reportCompiler);
+ }
+
+ private LinkedList<ScanInfo> compileReport(File bpFinalizedDir,
+ File dir, LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+ throws InterruptedException {
+
+ reportCompiler.throttle();
+
+ List <String> fileNames;
+ try {
+ fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
+ } catch (IOException ioe) {
+ LOG.warn("Exception occured while compiling report: ", ioe);
+ // Initiate a check on disk failure.
+ dataset.datanode.checkDiskErrorAsync();
+ // Ignore this directory and proceed.
+ return report;
+ }
+ Collections.sort(fileNames);
+
+ /*
+ * Assumption: In the sorted list of files block file appears immediately
+ * before block metadata file. This is true for the current naming
+ * convention for block file blk_<blockid> and meta file
+ * blk_<blockid>_<genstamp>.meta
+ */
+ for (int i = 0; i < fileNames.size(); i++) {
+ // Make sure this thread can make a timely exit. With a low throttle
+ // rate, completing a run can take a looooong time.
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ File file = new File(dir, fileNames.get(i));
+ if (file.isDirectory()) {
+ compileReport(bpFinalizedDir, file, report, reportCompiler);
+ continue;
+ }
+ if (!Block.isBlockFilename(file)) {
+ if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
+ long blockId = Block.getBlockId(file.getName());
+ verifyFileLocation(file.getParentFile(), bpFinalizedDir,
+ blockId);
+ report.add(new ScanInfo(blockId, null, file, this));
+ }
+ continue;
+ }
+ File blockFile = file;
+ long blockId = Block.filename2id(file.getName());
+ File metaFile = null;
+
+ // Skip all the files that start with block name until
+ // getting to the metafile for the block
+ while (i + 1 < fileNames.size()) {
+ File blkMetaFile = new File(dir, fileNames.get(i + 1));
+ if (!(blkMetaFile.isFile()
+ && blkMetaFile.getName().startsWith(blockFile.getName()))) {
+ break;
+ }
+ i++;
+ if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
+ metaFile = blkMetaFile;
+ break;
+ }
+ }
+ verifyFileLocation(blockFile, bpFinalizedDir, blockId);
+ report.add(new ScanInfo(blockId, blockFile, metaFile, this));
+ }
+ return report;
+ }
+
+ /**
+ * Helper method to determine if a file name is consistent with a block.
+ * meta-data file
+ *
+ * @param blockId the block ID
+ * @param metaFile the file to check
+ * @return whether the file name is a block meta-data file name
+ */
+ private static boolean isBlockMetaFile(String blockId, String metaFile) {
+ return metaFile.startsWith(blockId)
+ && metaFile.endsWith(Block.METADATA_EXTENSION);
+ }
+
+ /**
+ * Verify whether the actual directory location of block file has the
+ * expected directory path computed using its block ID.
+ */
+ private void verifyFileLocation(File actualBlockFile,
+ File bpFinalizedDir, long blockId) {
+ File expectedBlockDir =
+ DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId);
+ File actualBlockDir = actualBlockFile.getParentFile();
+ if (actualBlockDir.compareTo(expectedBlockDir) != 0) {
+ LOG.warn("Block: " + blockId +
+ " found in invalid directory. Expected directory: " +
+ expectedBlockDir + ". Actual directory: " + actualBlockDir);
+ }
+ }
+
+ public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block,
+ ReplicaInfo replicaInfo,
+ int smallBufferSize,
+ Configuration conf) throws IOException {
+
+ File[] blockFiles = FsDatasetImpl.copyBlockFiles(block.getBlockId(),
+ block.getGenerationStamp(), replicaInfo,
+ getTmpDir(block.getBlockPoolId()),
+ replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
+
+ ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
+ .setBlockId(replicaInfo.getBlockId())
+ .setGenerationStamp(replicaInfo.getGenerationStamp())
+ .setFsVolume(this)
+ .setDirectoryToUse(blockFiles[0].getParentFile())
+ .setBytesToReserve(0)
+ .build();
+ newReplicaInfo.setNumBytes(blockFiles[1].length());
+ return newReplicaInfo;
+ }
+
+ public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
+ long genStamp,
+ ReplicaInfo replicaInfo,
+ int smallBufferSize,
+ Configuration conf) throws IOException {
+
+ File lazyPersistDir = getLazyPersistDir(bpId);
+ if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
+ FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
+ throw new IOException("LazyWriter fail to find or " +
+ "create lazy persist dir: " + lazyPersistDir.toString());
+ }
+
+ // No FsDatasetImpl lock for the file copy
+ File[] targetFiles = FsDatasetImpl.copyBlockFiles(
+ blockId, genStamp, replicaInfo, lazyPersistDir, true,
+ smallBufferSize, conf);
+ return targetFiles;
+ }
+
+ public void incrNumBlocks(String bpid) throws IOException {
+ getBlockPoolSlice(bpid).incrNumBlocks();
+ }
+
+ public void resolveDuplicateReplicas(String bpid, ReplicaInfo memBlockInfo,
+ ReplicaInfo diskBlockInfo, ReplicaMap volumeMap) throws IOException {
+ getBlockPoolSlice(bpid).resolveDuplicateReplicas(
+ memBlockInfo, diskBlockInfo, volumeMap);
+ }
+
+ public ReplicaInfo activateSavedReplica(String bpid,
+ ReplicaInfo replicaInfo, RamDiskReplica replicaState) throws IOException {
+ return getBlockPoolSlice(bpid).activateSavedReplica(replicaInfo,
+ replicaState);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
new file mode 100644
index 0000000..a1f7e91
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+
+/**
+ * This class is to be used as a builder for {@link FsVolumeImpl} objects.
+ */
+public class FsVolumeImplBuilder {
+
+ private FsDatasetImpl dataset;
+ private String storageID;
+ private StorageDirectory sd;
+ private Configuration conf;
+
+ public FsVolumeImplBuilder() {
+ dataset = null;
+ storageID = null;
+ sd = null;
+ conf = null;
+ }
+
+ FsVolumeImplBuilder setDataset(FsDatasetImpl dataset) {
+ this.dataset = dataset;
+ return this;
+ }
+
+ FsVolumeImplBuilder setStorageID(String id) {
+ this.storageID = id;
+ return this;
+ }
+
+ FsVolumeImplBuilder setStorageDirectory(StorageDirectory sd) {
+ this.sd = sd;
+ return this;
+ }
+
+ FsVolumeImplBuilder setConf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ FsVolumeImpl build() throws IOException {
+ return new FsVolumeImpl(dataset, storageID, sd, conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index f869008..cf9c319 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
@@ -51,8 +51,10 @@ class FsVolumeList {
private final CopyOnWriteArrayList<FsVolumeImpl> volumes =
new CopyOnWriteArrayList<>();
// Tracks volume failures, sorted by volume path.
- private final Map<String, VolumeFailureInfo> volumeFailureInfos =
- Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
+ // map from volume storageID to the volume failure info
+ private final Map<StorageLocation, VolumeFailureInfo> volumeFailureInfos =
+ Collections.synchronizedMap(
+ new TreeMap<StorageLocation, VolumeFailureInfo>());
private final ConcurrentLinkedQueue<FsVolumeImpl> volumesBeingRemoved =
new ConcurrentLinkedQueue<>();
private final AutoCloseableLock checkDirsLock;
@@ -234,10 +236,9 @@ class FsVolumeList {
*
* @return list of all the failed volumes.
*/
- Set<File> checkDirs() {
+ Set<StorageLocation> checkDirs() {
try (AutoCloseableLock lock = checkDirsLock.acquire()) {
- Set<File> failedVols = null;
-
+ Set<StorageLocation> failedLocations = null;
// Make a copy of volumes for performing modification
final List<FsVolumeImpl> volumeList = getVolumes();
@@ -247,10 +248,10 @@ class FsVolumeList {
fsv.checkDirs();
} catch (DiskErrorException e) {
FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
- if (failedVols == null) {
- failedVols = new HashSet<>(1);
+ if (failedLocations == null) {
+ failedLocations = new HashSet<>(1);
}
- failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
+ failedLocations.add(fsv.getStorageLocation());
addVolumeFailureInfo(fsv);
removeVolume(fsv);
} catch (ClosedChannelException e) {
@@ -261,13 +262,13 @@ class FsVolumeList {
}
}
- if (failedVols != null && failedVols.size() > 0) {
- FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size()
- + " failure volumes.");
+ if (failedLocations != null && failedLocations.size() > 0) {
+ FsDatasetImpl.LOG.warn("Completed checkDirs. Found " +
+ failedLocations.size() + " failure volumes.");
}
waitVolumeRemoved(5000, checkDirsLockCondition);
- return failedVols;
+ return failedLocations;
}
}
@@ -315,7 +316,7 @@ class FsVolumeList {
}
// If the volume is used to replace a failed volume, it needs to reset the
// volume failure info for this volume.
- removeVolumeFailureInfo(new File(volume.getBasePath()));
+ removeVolumeFailureInfo(volume.getStorageLocation());
FsDatasetImpl.LOG.info("Added new volume: " +
volume.getStorageID());
}
@@ -351,16 +352,15 @@ class FsVolumeList {
* @param volume the volume to be removed.
* @param clearFailure set true to remove failure info for this volume.
*/
- void removeVolume(File volume, boolean clearFailure) {
+ void removeVolume(StorageLocation storageLocation, boolean clearFailure) {
for (FsVolumeImpl fsVolume : volumes) {
- String basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
- String targetPath = volume.getAbsolutePath();
- if (basePath.equals(targetPath)) {
+ StorageLocation baseLocation = fsVolume.getStorageLocation();
+ if (baseLocation.equals(storageLocation)) {
removeVolume(fsVolume);
}
}
if (clearFailure) {
- removeVolumeFailureInfo(volume);
+ removeVolumeFailureInfo(storageLocation);
}
}
@@ -394,13 +394,13 @@ class FsVolumeList {
private void addVolumeFailureInfo(FsVolumeImpl vol) {
addVolumeFailureInfo(new VolumeFailureInfo(
- new File(vol.getBasePath()).getAbsolutePath(),
+ vol.getStorageLocation(),
Time.now(),
vol.getCapacity()));
}
- private void removeVolumeFailureInfo(File vol) {
- volumeFailureInfos.remove(vol.getAbsolutePath());
+ private void removeVolumeFailureInfo(StorageLocation location) {
+ volumeFailureInfos.remove(location);
}
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 9e549f9..d6969c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -58,8 +58,8 @@ class RamDiskAsyncLazyPersistService {
private final Configuration conf;
private final ThreadGroup threadGroup;
- private Map<File, ThreadPoolExecutor> executors
- = new HashMap<File, ThreadPoolExecutor>();
+ private Map<String, ThreadPoolExecutor> executors
+ = new HashMap<String, ThreadPoolExecutor>();
private final static HdfsConfiguration EMPTY_HDFS_CONF = new HdfsConfiguration();
/**
@@ -75,13 +75,14 @@ class RamDiskAsyncLazyPersistService {
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
}
- private void addExecutorForVolume(final File volume) {
+ private void addExecutorForVolume(final String storageId) {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(threadGroup, r);
- t.setName("Async RamDisk lazy persist worker for volume " + volume);
+ t.setName("Async RamDisk lazy persist worker " +
+ " for volume with id " + storageId);
return t;
}
};
@@ -93,39 +94,41 @@ class RamDiskAsyncLazyPersistService {
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
- executors.put(volume, executor);
+ executors.put(storageId, executor);
}
/**
* Starts AsyncLazyPersistService for a new volume
* @param volume the root of the new data volume.
*/
- synchronized void addVolume(File volume) {
+ synchronized void addVolume(FsVolumeImpl volume) {
+ String storageId = volume.getStorageID();
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
- ThreadPoolExecutor executor = executors.get(volume);
+ ThreadPoolExecutor executor = executors.get(storageId);
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
- addExecutorForVolume(volume);
+ addExecutorForVolume(storageId);
}
/**
* Stops AsyncLazyPersistService for a volume.
* @param volume the root of the volume.
*/
- synchronized void removeVolume(File volume) {
+ synchronized void removeVolume(FsVolumeImpl volume) {
+ String storageId = volume.getStorageID();
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
- ThreadPoolExecutor executor = executors.get(volume);
+ ThreadPoolExecutor executor = executors.get(storageId);
if (executor == null) {
- throw new RuntimeException("Can not find volume " + volume
- + " to remove.");
+ throw new RuntimeException("Can not find volume with storage id " +
+ storageId + " to remove.");
} else {
executor.shutdown();
- executors.remove(volume);
+ executors.remove(storageId);
}
}
@@ -135,25 +138,28 @@ class RamDiskAsyncLazyPersistService {
* @return true if there is one thread pool for the volume
* false otherwise
*/
- synchronized boolean queryVolume(File volume) {
+ synchronized boolean queryVolume(FsVolumeImpl volume) {
+ String storageId = volume.getStorageID();
if (executors == null) {
- throw new RuntimeException("AsyncLazyPersistService is already shutdown");
+ throw new RuntimeException(
+ "AsyncLazyPersistService is already shutdown");
}
- ThreadPoolExecutor executor = executors.get(volume);
+ ThreadPoolExecutor executor = executors.get(storageId);
return (executor != null);
}
/**
* Execute the task sometime in the future, using ThreadPools.
*/
- synchronized void execute(File root, Runnable task) {
+ synchronized void execute(String storageId, Runnable task) {
if (executors == null) {
- throw new RuntimeException("AsyncLazyPersistService is already shutdown");
+ throw new RuntimeException(
+ "AsyncLazyPersistService is already shutdown");
}
- ThreadPoolExecutor executor = executors.get(root);
+ ThreadPoolExecutor executor = executors.get(storageId);
if (executor == null) {
- throw new RuntimeException("Cannot find root " + root
- + " for execution of task " + task);
+ throw new RuntimeException("Cannot find root storage volume with id " +
+ storageId + " for execution of task " + task);
} else {
executor.execute(task);
}
@@ -169,7 +175,7 @@ class RamDiskAsyncLazyPersistService {
} else {
LOG.info("Shutting down all async lazy persist service threads");
- for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
+ for (Map.Entry<String, ThreadPoolExecutor> e : executors.entrySet()) {
e.getValue().shutdown();
}
// clear the executor map so that calling execute again will fail.
@@ -189,18 +195,11 @@ class RamDiskAsyncLazyPersistService {
+ bpId + " block id: " + blockId);
}
- FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
- File lazyPersistDir = volume.getLazyPersistDir(bpId);
- if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
- FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
- throw new IOException("LazyWriter fail to find or create lazy persist dir: "
- + lazyPersistDir.toString());
- }
-
ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
- bpId, blockId, genStamp, creationTime, replica,
- target, lazyPersistDir);
- execute(volume.getCurrentDir(), lazyPersistTask);
+ bpId, blockId, genStamp, creationTime, replica, target);
+
+ FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
+ execute(volume.getStorageID(), lazyPersistTask);
}
class ReplicaLazyPersistTask implements Runnable {
@@ -210,19 +209,17 @@ class RamDiskAsyncLazyPersistService {
private final long creationTime;
private final ReplicaInfo replicaInfo;
private final FsVolumeReference targetVolume;
- private final File lazyPersistDir;
ReplicaLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
ReplicaInfo replicaInfo,
- FsVolumeReference targetVolume, File lazyPersistDir) {
+ FsVolumeReference targetVolume) {
this.bpId = bpId;
this.blockId = blockId;
this.genStamp = genStamp;
this.creationTime = creationTime;
this.replicaInfo = replicaInfo;
this.targetVolume = targetVolume;
- this.lazyPersistDir = lazyPersistDir;
}
@Override
@@ -241,14 +238,14 @@ class RamDiskAsyncLazyPersistService {
final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
try (FsVolumeReference ref = this.targetVolume) {
int smallBufferSize = DFSUtilClient.getSmallBufferSize(EMPTY_HDFS_CONF);
- // No FsDatasetImpl lock for the file copy
- File targetFiles[] = FsDatasetImpl.copyBlockFiles(
- blockId, genStamp, replicaInfo, lazyPersistDir, true,
- smallBufferSize, conf);
+
+ FsVolumeImpl volume = (FsVolumeImpl)ref.getVolume();
+ File[] targetFiles = volume.copyBlockToLazyPersistLocation(bpId,
+ blockId, genStamp, replicaInfo, smallBufferSize, conf);
// Lock FsDataSetImpl during onCompleteLazyPersist callback
dataset.onCompleteLazyPersist(bpId, blockId,
- creationTime, targetFiles, (FsVolumeImpl)ref.getVolume());
+ creationTime, targetFiles, volume);
succeeded = true;
} catch (Exception e){
FsDatasetImpl.LOG.warn(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java
index c3ce2a4..a762785 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java
@@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+
/**
* Tracks information about failure of a data volume.
*/
final class VolumeFailureInfo {
- private final String failedStorageLocation;
+ private final StorageLocation failedStorageLocation;
private final long failureDate;
private final long estimatedCapacityLost;
@@ -33,7 +35,8 @@ final class VolumeFailureInfo {
* @param failedStorageLocation storage location that has failed
* @param failureDate date/time of failure in milliseconds since epoch
*/
- public VolumeFailureInfo(String failedStorageLocation, long failureDate) {
+ public VolumeFailureInfo(StorageLocation failedStorageLocation,
+ long failureDate) {
this(failedStorageLocation, failureDate, 0);
}
@@ -44,8 +47,8 @@ final class VolumeFailureInfo {
* @param failureDate date/time of failure in milliseconds since epoch
* @param estimatedCapacityLost estimate of capacity lost in bytes
*/
- public VolumeFailureInfo(String failedStorageLocation, long failureDate,
- long estimatedCapacityLost) {
+ public VolumeFailureInfo(StorageLocation failedStorageLocation,
+ long failureDate, long estimatedCapacityLost) {
this.failedStorageLocation = failedStorageLocation;
this.failureDate = failureDate;
this.estimatedCapacityLost = estimatedCapacityLost;
@@ -56,7 +59,7 @@ final class VolumeFailureInfo {
*
* @return storage location that has failed
*/
- public String getFailedStorageLocation() {
+ public StorageLocation getFailedStorageLocation() {
return this.failedStorageLocation;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 0f4f14c..2471dc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -5413,7 +5413,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
if (volumeFailureSummary != null) {
innerinfo
- .put("failedStorageLocations",
+ .put("failedStorageIDs",
volumeFailureSummary.getFailedStorageLocations())
.put("lastVolumeFailureDate",
volumeFailureSummary.getLastVolumeFailureDate())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index b11b48a..6efc53a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -216,13 +217,13 @@ public class TestNameNodePrunesMissingStorages {
datanodeToRemoveStorageFromIdx++;
}
// Find the volume within the datanode which holds that first storage.
- String volumeDirectoryToRemove = null;
+ StorageLocation volumeLocationToRemove = null;
try (FsVolumeReferences volumes =
datanodeToRemoveStorageFrom.getFSDataset().getFsVolumeReferences()) {
assertEquals(NUM_STORAGES_PER_DN, volumes.size());
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageID().equals(storageIdToRemove)) {
- volumeDirectoryToRemove = volume.getBasePath();
+ volumeLocationToRemove = volume.getStorageLocation();
}
}
};
@@ -230,10 +231,11 @@ public class TestNameNodePrunesMissingStorages {
// Replace the volume directory with a regular file, which will
// cause a volume failure. (If we merely removed the directory,
// it would be re-initialized with a new storage ID.)
- assertNotNull(volumeDirectoryToRemove);
+ assertNotNull(volumeLocationToRemove);
datanodeToRemoveStorageFrom.shutdown();
- FileUtil.fullyDelete(new File(volumeDirectoryToRemove));
- FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove);
+ FileUtil.fullyDelete(volumeLocationToRemove.getFile());
+ FileOutputStream fos = new FileOutputStream(
+ volumeLocationToRemove.getFile().toString());
try {
fos.write(1);
} finally {
@@ -326,7 +328,8 @@ public class TestNameNodePrunesMissingStorages {
dn.getFSDataset().getFsVolumeReferences();
final String newStorageId = DatanodeStorage.generateUuid();
try {
- File currentDir = new File(volumeRefs.get(0).getBasePath(), "current");
+ File currentDir = new File(
+ volumeRefs.get(0).getStorageLocation().getFile(), "current");
File versionFile = new File(currentDir, "VERSION");
rewriteVersionFile(versionFile, newStorageId);
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 6034d1e..6c59231 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -22,7 +22,9 @@ import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.URI;
import java.nio.channels.ClosedChannelException;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@@ -38,6 +40,7 @@ import javax.management.StandardMBean;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -495,21 +499,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
- public String getBasePath() {
- return null;
- }
-
- @Override
- public String getPath(String bpid) throws IOException {
- return null;
- }
-
- @Override
- public File getFinalizedDir(String bpid) throws IOException {
- return null;
- }
-
- @Override
public StorageType getStorageType() {
return null;
}
@@ -546,6 +535,28 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public FsDatasetSpi getDataset() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public StorageLocation getStorageLocation() {
+ return null;
+ }
+
+ @Override
+ public URI getBaseURI() {
+ return null;
+ }
+
+ @Override
+ public DF getUsageStats(Configuration conf) {
+ return null;
+ }
+
+ @Override
+ public LinkedList<ScanInfo> compileReport(String bpid,
+ LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+ throws InterruptedException, IOException {
+ return null;
+ }
}
private final Map<String, Map<Block, BInfo>> blockMap
@@ -1030,7 +1041,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
- public Set<File> checkDataDir() {
+ public Set<StorageLocation> checkDataDir() {
// nothing to check for simulated data set
return null;
}
@@ -1344,7 +1355,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
- public synchronized void removeVolumes(Set<File> volumes, boolean clearFailure) {
+ public synchronized void removeVolumes(Collection<StorageLocation> volumes,
+ boolean clearFailure) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
index 021361b..c55a828 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -549,7 +549,8 @@ public class TestBlockScanner {
info.shouldRun = false;
}
ctx.datanode.shutdown();
- String vPath = ctx.volumes.get(0).getBasePath();
+ String vPath = ctx.volumes.get(0).getStorageLocation()
+ .getFile().getAbsolutePath();
File cursorPath = new File(new File(new File(vPath, "current"),
ctx.bpids[0]), "scanner.cursor");
assertTrue("Failed to find cursor save file in " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 0dbb09c..06387c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -52,7 +52,6 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -519,11 +518,8 @@ public class TestDataNodeHotSwapVolumes {
ExtendedBlock block =
DFSTestUtil.getAllBlocks(fs, testFile).get(1).getBlock();
FsVolumeSpi volumeWithBlock = dn.getFSDataset().getVolume(block);
- String basePath = volumeWithBlock.getBasePath();
- File storageDir = new File(basePath);
- URI fileUri = storageDir.toURI();
- String dirWithBlock =
- "[" + volumeWithBlock.getStorageType() + "]" + fileUri;
+ String dirWithBlock = "[" + volumeWithBlock.getStorageType() + "]" +
+ volumeWithBlock.getStorageLocation().getFile().toURI();
String newDirs = dirWithBlock;
for (String dir : oldDirs) {
if (dirWithBlock.startsWith(dir)) {
@@ -581,8 +577,8 @@ public class TestDataNodeHotSwapVolumes {
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
for (FsVolumeSpi volume : volumes) {
- assertThat(volume.getBasePath(), is(not(anyOf(
- is(newDirs.get(0)), is(newDirs.get(2))))));
+ assertThat(volume.getStorageLocation().getFile().toString(),
+ is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
}
}
DataStorage storage = dn.getStorage();
@@ -765,7 +761,7 @@ public class TestDataNodeHotSwapVolumes {
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
- if (vol.getBasePath().equals(basePath.getPath())) {
+ if (vol.getBaseURI().equals(basePath.toURI())) {
return (FsVolumeImpl) vol;
}
}
@@ -810,6 +806,7 @@ public class TestDataNodeHotSwapVolumes {
assertEquals(used, failedVolume.getDfsUsed());
DataNodeTestUtils.restoreDataDirFromFailure(dirToFail);
+ LOG.info("reconfiguring DN ");
assertThat(
"DN did not update its own config",
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 6792ba8..47f4823 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -21,7 +21,6 @@ import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -254,17 +253,18 @@ public class TestDataNodeVolumeFailure {
FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset();
try (FsDatasetSpi.FsVolumeReferences vols = data.getFsVolumeReferences()) {
for (FsVolumeSpi volume : vols) {
- assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(),
- dn0Vol1.getAbsoluteFile());
+ assertFalse(volume.getStorageLocation().getFile()
+ .getAbsolutePath().startsWith(dn0Vol1.getAbsolutePath()
+ ));
}
}
// 3. all blocks on dn0Vol1 have been removed.
for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) {
assertNotNull(replica.getVolume());
- assertNotEquals(
- new File(replica.getVolume().getBasePath()).getAbsoluteFile(),
- dn0Vol1.getAbsoluteFile());
+ assertFalse(replica.getVolume().getStorageLocation().getFile()
+ .getAbsolutePath().startsWith(dn0Vol1.getAbsolutePath()
+ ));
}
// 4. dn0Vol1 is not in DN0's configuration and dataDirs anymore.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
index 8d021cd..4bb5e7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
@@ -539,6 +539,16 @@ public class TestDataNodeVolumeFailureReporting {
assertCounter("VolumeFailures", expectedVolumeFailuresCounter,
getMetrics(dn.getMetrics().name()));
FsDatasetSpi<?> fsd = dn.getFSDataset();
+ StringBuilder strBuilder = new StringBuilder();
+ strBuilder.append("expectedFailedVolumes is ");
+ for (String expected: expectedFailedVolumes) {
+ strBuilder.append(expected + ",");
+ }
+ strBuilder.append(" fsd.getFailedStorageLocations() is ");
+ for (String expected: fsd.getFailedStorageLocations()) {
+ strBuilder.append(expected + ",");
+ }
+ LOG.info(strBuilder.toString());
assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations());
if (expectedFailedVolumes.length > 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 576aae0..08a5af9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
@@ -44,6 +45,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
@@ -56,11 +58,13 @@ import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -185,18 +189,20 @@ public class TestDirectoryScanner {
// Volume without a copy of the block. Make a copy now.
File sourceBlock = new File(b.getBlockURI());
File sourceMeta = new File(b.getMetadataURI());
- String sourceRoot = b.getVolume().getBasePath();
- String destRoot = v.getBasePath();
+ URI sourceRoot = b.getVolume().getStorageLocation().getFile().toURI();
+ URI destRoot = v.getStorageLocation().getFile().toURI();
String relativeBlockPath =
- new File(sourceRoot).toURI().relativize(sourceBlock.toURI())
+ sourceRoot.relativize(sourceBlock.toURI())
.getPath();
String relativeMetaPath =
- new File(sourceRoot).toURI().relativize(sourceMeta.toURI())
+ sourceRoot.relativize(sourceMeta.toURI())
.getPath();
- File destBlock = new File(destRoot, relativeBlockPath);
- File destMeta = new File(destRoot, relativeMetaPath);
+ File destBlock = new File(new File(destRoot).toString(),
+ relativeBlockPath);
+ File destMeta = new File(new File(destRoot).toString(),
+ relativeMetaPath);
destBlock.getParentFile().mkdirs();
FileUtils.copyFile(sourceBlock, destBlock);
@@ -238,7 +244,8 @@ public class TestDirectoryScanner {
try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
int numVolumes = volumes.size();
int index = rand.nextInt(numVolumes - 1);
- File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
+ File finalizedDir = ((FsVolumeImpl) volumes.get(index))
+ .getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
@@ -253,8 +260,8 @@ public class TestDirectoryScanner {
try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
-
- File finalizedDir = refs.get(index).getFinalizedDir(bpid);
+ File finalizedDir = ((FsVolumeImpl) refs.get(index))
+ .getFinalizedDir(bpid);
File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
@@ -271,7 +278,8 @@ public class TestDirectoryScanner {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
- File finalizedDir = refs.get(index).getFinalizedDir(bpid);
+ File finalizedDir =
+ ((FsVolumeImpl) refs.get(index)).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
@@ -311,7 +319,7 @@ public class TestDirectoryScanner {
scanner.reconcile();
assertTrue(scanner.diffs.containsKey(bpid));
- LinkedList<DirectoryScanner.ScanInfo> diff = scanner.diffs.get(bpid);
+ LinkedList<FsVolumeSpi.ScanInfo> diff = scanner.diffs.get(bpid);
assertTrue(scanner.stats.containsKey(bpid));
DirectoryScanner.Stats stats = scanner.stats.get(bpid);
@@ -820,17 +828,6 @@ public class TestDirectoryScanner {
return 0;
}
- @Override
- public String getBasePath() {
- return (new File("/base")).getAbsolutePath();
- }
-
- @Override
- public String getPath(String bpid) throws IOException {
- return (new File("/base/current/" + bpid)).getAbsolutePath();
- }
-
- @Override
public File getFinalizedDir(String bpid) throws IOException {
return new File("/base/current/" + bpid + "/finalized");
}
@@ -877,6 +874,29 @@ public class TestDirectoryScanner {
public FsDatasetSpi getDataset() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public StorageLocation getStorageLocation() {
+ return null;
+ }
+
+ @Override
+ public URI getBaseURI() {
+ return (new File("/base")).toURI();
+ }
+
+ @Override
+ public DF getUsageStats(Configuration conf) {
+ return null;
+ }
+
+ @Override
+ public LinkedList<ScanInfo> compileReport(String bpid,
+ LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+ throws InterruptedException, IOException {
+ return null;
+ }
+
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();
@@ -887,8 +907,8 @@ public class TestDirectoryScanner {
void testScanInfoObject(long blockId, File blockFile, File metaFile)
throws Exception {
- DirectoryScanner.ScanInfo scanInfo =
- new DirectoryScanner.ScanInfo(blockId, blockFile, metaFile, TEST_VOLUME);
+ FsVolumeSpi.ScanInfo scanInfo =
+ new FsVolumeSpi.ScanInfo(blockId, blockFile, metaFile, TEST_VOLUME);
assertEquals(blockId, scanInfo.getBlockId());
if (blockFile != null) {
assertEquals(blockFile.getAbsolutePath(),
@@ -906,8 +926,8 @@ public class TestDirectoryScanner {
}
void testScanInfoObject(long blockId) throws Exception {
- DirectoryScanner.ScanInfo scanInfo =
- new DirectoryScanner.ScanInfo(blockId, null, null, null);
+ FsVolumeSpi.ScanInfo scanInfo =
+ new FsVolumeSpi.ScanInfo(blockId, null, null, null);
assertEquals(blockId, scanInfo.getBlockId());
assertNull(scanInfo.getBlockFile());
assertNull(scanInfo.getMetaFile());
@@ -963,8 +983,8 @@ public class TestDirectoryScanner {
List<FsVolumeSpi> volumes = new ArrayList<>();
Iterator<FsVolumeSpi> iterator = fds.getFsVolumeReferences().iterator();
while (iterator.hasNext()) {
- FsVolumeSpi volume = iterator.next();
- FsVolumeSpi spy = Mockito.spy(volume);
+ FsVolumeImpl volume = (FsVolumeImpl) iterator.next();
+ FsVolumeImpl spy = Mockito.spy(volume);
Mockito.doThrow(new IOException("Error while getFinalizedDir"))
.when(spy).getFinalizedDir(volume.getBlockPoolList()[0]);
volumes.add(spy);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 86d2ff4..2103392 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -199,7 +199,7 @@ public class TestDiskError {
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
- String dir = vol.getBasePath();
+ String dir = vol.getStorageLocation().getFile().getAbsolutePath();
Path dataDir = new Path(dir);
FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
assertEquals("Permission for dir: " + dataDir + ", is " + actual +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 1268108..7b7f04f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -56,12 +56,14 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public void addVolume(StorageLocation location, List<NamespaceInfo> nsInfos) throws IOException {
-
+ public void addVolume(StorageLocation location, List<NamespaceInfo> nsInfos)
+ throws IOException {
}
@Override
- public void removeVolumes(Set<File> volumes, boolean clearFailure) {
+ public void removeVolumes(Collection<StorageLocation> volumes,
+ boolean clearFailure) {
+ throw new UnsupportedOperationException();
}
@Override
@@ -242,7 +244,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public Set<File> checkDataDir() {
+ public Set<StorageLocation> checkDataDir() {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 985a259..83d6c4c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -18,11 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.extdataset;
-import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.nio.channels.ClosedChannelException;
+import java.util.LinkedList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -44,21 +49,6 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
}
@Override
- public String getBasePath() {
- return null;
- }
-
- @Override
- public String getPath(String bpid) throws IOException {
- return null;
- }
-
- @Override
- public File getFinalizedDir(String bpid) throws IOException {
- return null;
- }
-
- @Override
public String getStorageID() {
return null;
}
@@ -100,4 +90,26 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
public FsDatasetSpi getDataset() {
return null;
}
+
+ @Override
+ public StorageLocation getStorageLocation() {
+ return null;
+ }
+
+ @Override
+ public URI getBaseURI() {
+ return null;
+ }
+
+ @Override
+ public DF getUsageStats(Configuration conf) {
+ return null;
+ }
+
+ @Override
+ public LinkedList<ScanInfo> compileReport(String bpid,
+ LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+ throws InterruptedException, IOException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
index a465c05..07ddb59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
@@ -374,9 +374,12 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
public long getRawCapacity() throws IOException {
try (FsVolumeReferences volRefs = dataset.getFsVolumeReferences()) {
Preconditions.checkState(volRefs.size() != 0);
- DF df = new DF(new File(volRefs.get(0).getBasePath()),
- dataset.datanode.getConf());
- return df.getCapacity();
+ DF df = volRefs.get(0).getUsageStats(dataset.datanode.getConf());
+ if (df != null) {
+ return df.getCapacity();
+ } else {
+ return -1;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 179b617..e48aae0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
@@ -50,7 +51,9 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException;
@@ -122,8 +125,10 @@ public class TestFsDatasetImpl {
private final static String BLOCKPOOL = "BP-TEST";
- private static Storage.StorageDirectory createStorageDirectory(File root) {
- Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
+ private static Storage.StorageDirectory createStorageDirectory(File root)
+ throws SecurityException, IOException {
+ Storage.StorageDirectory sd = new Storage.StorageDirectory(
+ StorageLocation.parse(root.toURI().toString()));
DataStorage.createStorageID(sd, false);
return sd;
}
@@ -196,16 +201,18 @@ public class TestFsDatasetImpl {
for (int i = 0; i < numNewVolumes; i++) {
String path = BASE_DIR + "/newData" + i;
String pathUri = new Path(path).toUri().toString();
- expectedVolumes.add(new File(pathUri).toString());
+ expectedVolumes.add(new File(pathUri).getAbsolutePath());
StorageLocation loc = StorageLocation.parse(pathUri);
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
- when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
+ when(storage.prepareVolume(eq(datanode), eq(loc),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
+ LOG.info("expectedVolumes " + i + " is " +
+ new File(pathUri).getAbsolutePath());
}
assertEquals(totalVolumes, getNumVolumes());
@@ -215,7 +222,9 @@ public class TestFsDatasetImpl {
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
for (int i = 0; i < numNewVolumes; i++) {
- actualVolumes.add(volumes.get(numExistingVolumes + i).getBasePath());
+ String volumeName = volumes.get(numExistingVolumes + i).toString();
+ actualVolumes.add(volumeName);
+ LOG.info("actualVolume " + i + " is " + volumeName);
}
}
assertEquals(actualVolumes.size(), expectedVolumes.size());
@@ -262,9 +271,18 @@ public class TestFsDatasetImpl {
final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
final String volumePathToRemove = dataDirs[0];
- Set<File> volumesToRemove = new HashSet<>();
- volumesToRemove.add(StorageLocation.parse(volumePathToRemove).getFile());
-
+ Set<StorageLocation> volumesToRemove = new HashSet<>();
+ volumesToRemove.add(StorageLocation.parse(volumePathToRemove));
+
+ FsVolumeReferences volReferences = dataset.getFsVolumeReferences();
+ FsVolumeImpl volumeToRemove = null;
+ for (FsVolumeSpi vol: volReferences) {
+ if (vol.getStorageLocation().equals(volumesToRemove.iterator().next())) {
+ volumeToRemove = (FsVolumeImpl) vol;
+ }
+ }
+ assertTrue(volumeToRemove != null);
+ volReferences.close();
dataset.removeVolumes(volumesToRemove, true);
int expectedNumVolumes = dataDirs.length - 1;
assertEquals("The volume has been removed from the volumeList.",
@@ -273,7 +291,7 @@ public class TestFsDatasetImpl {
expectedNumVolumes, dataset.storageMap.size());
try {
- dataset.asyncDiskService.execute(volumesToRemove.iterator().next(),
+ dataset.asyncDiskService.execute(volumeToRemove,
new Runnable() {
@Override
public void run() {}
@@ -281,7 +299,7 @@ public class TestFsDatasetImpl {
fail("Expect RuntimeException: the volume has been removed from the "
+ "AsyncDiskService.");
} catch (RuntimeException e) {
- GenericTestUtils.assertExceptionContains("Cannot find root", e);
+ GenericTestUtils.assertExceptionContains("Cannot find volume", e);
}
int totalNumReplicas = 0;
@@ -306,7 +324,7 @@ public class TestFsDatasetImpl {
Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
- when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
+ when(storage.prepareVolume(eq(datanode), eq(loc),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
@@ -315,8 +333,8 @@ public class TestFsDatasetImpl {
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
- Set<File> volumesToRemove = new HashSet<>();
- volumesToRemove.add(loc.getFile());
+ Set<StorageLocation> volumesToRemove = new HashSet<>();
+ volumesToRemove.add(loc);
dataset.removeVolumes(volumesToRemove, true);
assertEquals(numExistingVolumes, getNumVolumes());
}
@@ -336,7 +354,8 @@ public class TestFsDatasetImpl {
for (int i = 0; i < NUM_VOLUMES; i++) {
FsVolumeImpl volume = mock(FsVolumeImpl.class);
oldVolumes.add(volume);
- when(volume.getBasePath()).thenReturn("data" + i);
+ when(volume.getStorageLocation()).thenReturn(
+ StorageLocation.parse(new File("data" + i).toURI().toString()));
when(volume.checkClosed()).thenReturn(true);
FsVolumeReference ref = mock(FsVolumeReference.class);
when(ref.getVolume()).thenReturn(volume);
@@ -348,13 +367,16 @@ public class TestFsDatasetImpl {
final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
final FsVolumeReference newRef = mock(FsVolumeReference.class);
when(newRef.getVolume()).thenReturn(newVolume);
- when(newVolume.getBasePath()).thenReturn("data4");
+ when(newVolume.getStorageLocation()).thenReturn(
+ StorageLocation.parse(new File("data4").toURI().toString()));
FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
- volumeList.removeVolume(new File("data4"), false);
+ volumeList.removeVolume(
+ StorageLocation.parse((new File("data4")).toURI().toString()),
+ false);
volumeList.addVolume(newRef);
return null;
}
@@ -386,7 +408,8 @@ public class TestFsDatasetImpl {
File badDir = new File(BASE_DIR, "bad");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
- .createFsVolume(anyString(), any(File.class), any(StorageType.class));
+ .createFsVolume(anyString(), any(StorageDirectory.class),
+ any(StorageLocation.class));
doThrow(new IOException("Failed to getVolumeMap()"))
.when(mockVolume).getVolumeMap(
anyString(),
@@ -396,7 +419,8 @@ public class TestFsDatasetImpl {
Storage.StorageDirectory sd = createStorageDirectory(badDir);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
- when(storage.prepareVolume(eq(datanode), eq(badDir.getAbsoluteFile()),
+ when(storage.prepareVolume(eq(datanode),
+ eq(StorageLocation.parse(badDir.toURI().toString())),
Matchers.<List<NamespaceInfo>>any()))
.thenReturn(builder);
@@ -540,7 +564,7 @@ public class TestFsDatasetImpl {
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(
- storage.prepareVolume(eq(datanode), eq(loc.getFile()),
+ storage.prepareVolume(eq(datanode), eq(loc),
anyListOf(NamespaceInfo.class))).thenReturn(builder);
String cacheFilePath =
@@ -584,7 +608,7 @@ public class TestFsDatasetImpl {
return dfsUsed;
}
- @Test(timeout = 30000)
+ @Test(timeout = 60000)
public void testRemoveVolumeBeingWritten() throws Exception {
// Will write and remove on dn0.
final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0);
@@ -636,10 +660,9 @@ public class TestFsDatasetImpl {
class VolRemoveThread extends Thread {
public void run() {
- Set<File> volumesToRemove = new HashSet<>();
+ Set<StorageLocation> volumesToRemove = new HashSet<>();
try {
- volumesToRemove.add(StorageLocation.parse(
- dataset.getVolume(eb).getBasePath()).getFile());
+ volumesToRemove.add(dataset.getVolume(eb).getStorageLocation());
} catch (Exception e) {
LOG.info("Problem preparing volumes to remove: ", e);
Assert.fail("Exception in remove volume thread, check log for " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 3d4c38c..6eff300 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -22,7 +22,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
@@ -71,8 +73,13 @@ public class TestFsVolumeList {
for (int i = 0; i < 3; i++) {
File curDir = new File(baseDir, "nextvolume-" + i);
curDir.mkdirs();
- FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir,
- conf, StorageType.DEFAULT);
+ FsVolumeImpl volume = new FsVolumeImplBuilder()
+ .setConf(conf)
+ .setDataset(dataset)
+ .setStorageID("storage-id")
+ .setStorageDirectory(
+ new StorageDirectory(StorageLocation.parse(curDir.getPath())))
+ .build();
volume.setCapacityForTesting(1024 * 1024 * 1024);
volumes.add(volume);
volumeList.addVolume(volume.obtainReference());
@@ -109,8 +116,13 @@ public class TestFsVolumeList {
for (int i = 0; i < 3; i++) {
File curDir = new File(baseDir, "volume-" + i);
curDir.mkdirs();
- FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir,
- conf, StorageType.DEFAULT);
+ FsVolumeImpl volume = new FsVolumeImplBuilder()
+ .setConf(conf)
+ .setDataset(dataset)
+ .setStorageID("storage-id")
+ .setStorageDirectory(
+ new StorageDirectory(StorageLocation.parse(curDir.getPath())))
+ .build();
volumes.add(volume);
volumeList.addVolume(volume.obtainReference());
}
@@ -139,8 +151,13 @@ public class TestFsVolumeList {
Collections.<VolumeFailureInfo>emptyList(), null, blockChooser);
File volDir = new File(baseDir, "volume-0");
volDir.mkdirs();
- FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir,
- conf, StorageType.DEFAULT);
+ FsVolumeImpl volume = new FsVolumeImplBuilder()
+ .setConf(conf)
+ .setDataset(dataset)
+ .setStorageID("storage-id")
+ .setStorageDirectory(
+ new StorageDirectory(StorageLocation.parse(volDir.getPath())))
+ .build();
FsVolumeReference ref = volume.obtainReference();
volumeList.addVolume(ref);
assertNull(ref.getVolume());
@@ -155,8 +172,13 @@ public class TestFsVolumeList {
volDir.mkdirs();
// when storage type reserved is not configured,should consider
// dfs.datanode.du.reserved.
- FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, conf,
- StorageType.RAM_DISK);
+ FsVolumeImpl volume = new FsVolumeImplBuilder().setDataset(dataset)
+ .setStorageDirectory(
+ new StorageDirectory(
+ StorageLocation.parse("[RAM_DISK]"+volDir.getPath())))
+ .setStorageID("storage-id")
+ .setConf(conf)
+ .build();
assertEquals("", 100L, volume.getReserved());
// when storage type reserved is configured.
conf.setLong(
@@ -165,17 +187,37 @@ public class TestFsVolumeList {
conf.setLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY + "."
+ StringUtils.toLowerCase(StorageType.SSD.toString()), 2L);
- FsVolumeImpl volume1 = new FsVolumeImpl(dataset, "storage-id", volDir,
- conf, StorageType.RAM_DISK);
+ FsVolumeImpl volume1 = new FsVolumeImplBuilder().setDataset(dataset)
+ .setStorageDirectory(
+ new StorageDirectory(
+ StorageLocation.parse("[RAM_DISK]"+volDir.getPath())))
+ .setStorageID("storage-id")
+ .setConf(conf)
+ .build();
assertEquals("", 1L, volume1.getReserved());
- FsVolumeImpl volume2 = new FsVolumeImpl(dataset, "storage-id", volDir,
- conf, StorageType.SSD);
+ FsVolumeImpl volume2 = new FsVolumeImplBuilder().setDataset(dataset)
+ .setStorageDirectory(
+ new StorageDirectory(
+ StorageLocation.parse("[SSD]"+volDir.getPath())))
+ .setStorageID("storage-id")
+ .setConf(conf)
+ .build();
assertEquals("", 2L, volume2.getReserved());
- FsVolumeImpl volume3 = new FsVolumeImpl(dataset, "storage-id", volDir,
- conf, StorageType.DISK);
+ FsVolumeImpl volume3 = new FsVolumeImplBuilder().setDataset(dataset)
+ .setStorageDirectory(
+ new StorageDirectory(
+ StorageLocation.parse("[DISK]"+volDir.getPath())))
+ .setStorageID("storage-id")
+ .setConf(conf)
+ .build();
assertEquals("", 100L, volume3.getReserved());
- FsVolumeImpl volume4 = new FsVolumeImpl(dataset, "storage-id", volDir,
- conf, StorageType.DEFAULT);
+ FsVolumeImpl volume4 = new FsVolumeImplBuilder().setDataset(dataset)
+ .setStorageDirectory(
+ new StorageDirectory(
+ StorageLocation.parse(volDir.getPath())))
+ .setStorageID("storage-id")
+ .setConf(conf)
+ .build();
assertEquals("", 100L, volume4.getReserved());
}
@@ -197,8 +239,13 @@ public class TestFsVolumeList {
long actualNonDfsUsage = 300L;
long reservedForReplicas = 50L;
conf.setLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, duReserved);
- FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, conf,
- StorageType.DEFAULT);
+ FsVolumeImpl volume = new FsVolumeImplBuilder().setDataset(dataset)
+ .setStorageDirectory(
+ new StorageDirectory(
+ StorageLocation.parse(volDir.getPath())))
+ .setStorageID("storage-id")
+ .setConf(conf)
+ .build();
FsVolumeImpl spyVolume = Mockito.spy(volume);
// Set Capacity for testing
long testCapacity = diskCapacity - duReserved;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
index 794a887..7df0333 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
@@ -331,8 +331,8 @@ public class TestDiskBalancerWithMockMover {
.getFsVolumeReferences();
nodeID = dataNode.getDatanodeUuid();
- sourceName = references.get(0).getBasePath();
- destName = references.get(1).getBasePath();
+ sourceName = references.get(0).getBaseURI().getPath();
+ destName = references.get(1).getBaseURI().getPath();
sourceUUID = references.get(0).getStorageID();
destUUID = references.get(1).getStorageID();
references.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org