You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 20:26:21 UTC
[09/50] [abbrv] hadoop git commit: HDFS-12291: [SPS]: Provide a
mechanism to recursively iterate and satisfy storage policy of all the files
under the given dir. Contributed by Surendra Singh Lilhore.
HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy storage policy of all the files under the given dir. Contributed by Surendra Singh Lilhore.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3dcf130
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3dcf130
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3dcf130
Branch: refs/heads/HDFS-10285
Commit: a3dcf1301c08d357bfd6bfb5496abc7fc4a7ea56
Parents: 4e8ec60
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Sat Sep 30 06:31:52 2017 -0700
Committer: Uma Maheswara Rao Gangumalla <um...@apache.org>
Committed: Fri Aug 10 13:22:59 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 22 +-
.../BlockStorageMovementAttemptedItems.java | 8 +-
.../namenode/BlockStorageMovementNeeded.java | 277 +++++++++++---
.../server/namenode/ReencryptionHandler.java | 1 +
.../server/namenode/StoragePolicySatisfier.java | 43 ++-
.../src/main/resources/hdfs-default.xml | 23 ++
.../src/site/markdown/ArchivalStorage.md | 3 +-
.../TestBlockStorageMovementAttemptedItems.java | 2 +-
.../TestPersistentStoragePolicySatisfier.java | 8 +-
.../namenode/TestStoragePolicySatisfier.java | 377 ++++++++++++++++++-
11 files changed, 689 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e66806f..c90ca33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -618,6 +618,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.storage.policy.satisfier.enabled";
public static final boolean DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT =
false;
+ public static final String DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY =
+ "dfs.storage.policy.satisfier.queue.limit";
+ public static final int DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT =
+ 1000;
+ public static final String DFS_SPS_WORK_MULTIPLIER_PER_ITERATION =
+ "dfs.storage.policy.satisfier.work.multiplier.per.iteration";
+ public static final int DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT =
+ 1;
public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY =
"dfs.storage.policy.satisfier.recheck.timeout.millis";
public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index f5ceeaf..c26599c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -1457,7 +1457,27 @@ public class DFSUtil {
"It should be a positive, non-zero integer value.");
return blocksReplWorkMultiplier;
}
-
+
+ /**
+ * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
+ * configuration.
+ *
+ * @param conf Configuration
+ * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
+ */
+ public static int getSPSWorkMultiplier(Configuration conf) {
+ int spsWorkMultiplier = conf
+ .getInt(
+ DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
+ DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
+ Preconditions.checkArgument(
+ (spsWorkMultiplier > 0),
+ DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
+ " = '" + spsWorkMultiplier + "' is invalid. " +
+ "It should be a positive, non-zero integer value.");
+ return spsWorkMultiplier;
+ }
+
/**
* Get SPNEGO keytab Key from configuration
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 278b62b..549819f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -101,7 +101,7 @@ public class BlockStorageMovementAttemptedItems {
public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
synchronized (storageMovementAttemptedItems) {
AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
- itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(),
+ itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(),
allBlockLocsAttemptedToSatisfy);
storageMovementAttemptedItems.put(itemInfo.getTrackId(),
attemptedItemInfo);
@@ -260,7 +260,7 @@ public class BlockStorageMovementAttemptedItems {
synchronized (storageMovementAttemptedResults) {
if (!isExistInResult(blockCollectionID)) {
ItemInfo candidate = new ItemInfo(
- itemInfo.getRootId(), blockCollectionID);
+ itemInfo.getStartId(), blockCollectionID);
blockStorageMovementNeeded.add(candidate);
iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed "
@@ -315,7 +315,7 @@ public class BlockStorageMovementAttemptedItems {
// blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
// the xAttr
ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
- ? attemptedItemInfo.getRootId() : trackId, trackId);
+ ? attemptedItemInfo.getStartId() : trackId, trackId);
switch (status) {
case FAILURE:
if (attemptedItemInfo != null) {
@@ -345,7 +345,7 @@ public class BlockStorageMovementAttemptedItems {
if (attemptedItemInfo != null) {
if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
blockStorageMovementNeeded
- .add(new ItemInfo(attemptedItemInfo.getRootId(), trackId));
+ .add(new ItemInfo(attemptedItemInfo.getStartId(), trackId));
LOG.warn("{} But adding trackID back to retry queue as some of"
+ " the blocks couldn't find matching target nodes in"
+ " previous SPS iteration.", msg);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
index 41a3a6c..788a98b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
@@ -29,12 +29,15 @@ import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* A Class to track the block collection IDs (Inode's ID) for which physical
* storage movement needed as per the Namespace and StorageReports from DN.
@@ -53,11 +56,11 @@ public class BlockStorageMovementNeeded {
new LinkedList<ItemInfo>();
/**
- * Map of rootId and number of child's. Number of child's indicate the number
- * of files pending to satisfy the policy.
+ * Map of startId and number of child's. Number of child's indicate the
+ * number of files pending to satisfy the policy.
*/
- private final Map<Long, Integer> pendingWorkForDirectory =
- new HashMap<Long, Integer>();
+ private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
+ new HashMap<Long, DirPendingWorkInfo>();
private final Namesystem namesystem;
@@ -66,12 +69,15 @@ public class BlockStorageMovementNeeded {
private final StoragePolicySatisfier sps;
- private Daemon fileInodeIdCollector;
+ private Daemon inodeIdCollector;
+
+ private final int maxQueuedItem;
public BlockStorageMovementNeeded(Namesystem namesystem,
- StoragePolicySatisfier sps) {
+ StoragePolicySatisfier sps, int queueLimit) {
this.namesystem = namesystem;
this.sps = sps;
+ this.maxQueuedItem = queueLimit;
}
/**
@@ -88,15 +94,24 @@ public class BlockStorageMovementNeeded {
/**
* Add the itemInfo to tracking list for which storage movement
* expected if necessary.
- * @param rootId
- * - root inode id
+ * @param startId
+ * - start id
* @param itemInfoList
* - List of child in the directory
*/
- private synchronized void addAll(Long rootId,
- List<ItemInfo> itemInfoList) {
+ @VisibleForTesting
+ public synchronized void addAll(long startId,
+ List<ItemInfo> itemInfoList, boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList);
- pendingWorkForDirectory.put(rootId, itemInfoList.size());
+ DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+ if (pendingWork == null) {
+ pendingWork = new DirPendingWorkInfo();
+ pendingWorkForDirectory.put(startId, pendingWork);
+ }
+ pendingWork.addPendingWorkCount(itemInfoList.size());
+ if (scanCompleted) {
+ pendingWork.markScanCompleted();
+ }
}
/**
@@ -118,6 +133,25 @@ public class BlockStorageMovementNeeded {
}
}
+ /**
+ * Returns queue remaining capacity.
+ */
+ public synchronized int remainingCapacity() {
+ int size = storageMovementNeeded.size();
+ if (size >= maxQueuedItem) {
+ return 0;
+ } else {
+ return (maxQueuedItem - size);
+ }
+ }
+
+ /**
+ * Returns queue size.
+ */
+ public synchronized int size() {
+ return storageMovementNeeded.size();
+ }
+
public synchronized void clearAll() {
spsDirsToBeTraveresed.clear();
storageMovementNeeded.clear();
@@ -131,20 +165,20 @@ public class BlockStorageMovementNeeded {
public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
throws IOException {
if (trackInfo.isDir()) {
- // If track is part of some root then reduce the pending directory work
- // count.
- long rootId = trackInfo.getRootId();
- INode inode = namesystem.getFSDirectory().getInode(rootId);
+ // If track is part of some start inode then reduce the pending
+ // directory work count.
+ long startId = trackInfo.getStartId();
+ INode inode = namesystem.getFSDirectory().getInode(startId);
if (inode == null) {
// directory deleted just remove it.
- this.pendingWorkForDirectory.remove(rootId);
+ this.pendingWorkForDirectory.remove(startId);
} else {
- if (pendingWorkForDirectory.get(rootId) != null) {
- Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1;
- pendingWorkForDirectory.put(rootId, pendingWork);
- if (pendingWork <= 0) {
- namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY);
- pendingWorkForDirectory.remove(rootId);
+ DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+ if (pendingWork != null) {
+ pendingWork.decrementPendingWorkCount();
+ if (pendingWork.isDirWorkDone()) {
+ namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
+ pendingWorkForDirectory.remove(startId);
}
}
}
@@ -161,7 +195,7 @@ public class BlockStorageMovementNeeded {
Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) {
ItemInfo next = iterator.next();
- if (next.getRootId() == trackId) {
+ if (next.getStartId() == trackId) {
iterator.remove();
}
}
@@ -208,7 +242,17 @@ public class BlockStorageMovementNeeded {
* Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
* ID's to process for satisfy the policy.
*/
- private class FileInodeIdCollector implements Runnable {
+ private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
+ implements Runnable {
+
+ private int remainingCapacity = 0;
+
+ private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
+
+ StorageMovementPendingInodeIdCollector(FSDirectory dir) {
+ super(dir);
+ }
+
@Override
public void run() {
LOG.info("Starting FileInodeIdCollector!.");
@@ -216,38 +260,36 @@ public class BlockStorageMovementNeeded {
try {
if (!namesystem.isInSafeMode()) {
FSDirectory fsd = namesystem.getFSDirectory();
- Long rootINodeId = spsDirsToBeTraveresed.poll();
- if (rootINodeId == null) {
+ Long startINodeId = spsDirsToBeTraveresed.poll();
+ if (startINodeId == null) {
// Waiting for SPS path
synchronized (spsDirsToBeTraveresed) {
spsDirsToBeTraveresed.wait(5000);
}
} else {
- INode rootInode = fsd.getInode(rootINodeId);
- if (rootInode != null) {
- // TODO : HDFS-12291
- // 1. Implement an efficient recursive directory iteration
- // mechanism and satisfies storage policy for all the files
- // under the given directory.
- // 2. Process files in batches,so datanodes workload can be
- // handled.
- List<ItemInfo> itemInfoList =
- new ArrayList<>();
- for (INode childInode : rootInode.asDirectory()
- .getChildrenList(Snapshot.CURRENT_STATE_ID)) {
- if (childInode.isFile()
- && childInode.asFile().numBlocks() != 0) {
- itemInfoList.add(
- new ItemInfo(rootINodeId, childInode.getId()));
- }
+ INode startInode = fsd.getInode(startINodeId);
+ if (startInode != null) {
+ try {
+ remainingCapacity = remainingCapacity();
+ readLock();
+ traverseDir(startInode.asDirectory(), startINodeId,
+ HdfsFileStatus.EMPTY_NAME,
+ new SPSTraverseInfo(startINodeId));
+ } finally {
+ readUnlock();
}
- if (itemInfoList.isEmpty()) {
- // satisfy track info is empty, so remove the xAttr from the
- // directory
- namesystem.removeXattr(rootINodeId,
+ // Mark startInode traverse is done
+ addAll(startInode.getId(), currentBatch, true);
+ currentBatch.clear();
+
+ // check if directory was empty and no child added to queue
+ DirPendingWorkInfo dirPendingWorkInfo =
+ pendingWorkForDirectory.get(startInode.getId());
+ if (dirPendingWorkInfo.isDirWorkDone()) {
+ namesystem.removeXattr(startInode.getId(),
XATTR_SATISFY_STORAGE_POLICY);
+ pendingWorkForDirectory.remove(startInode.getId());
}
- addAll(rootINodeId, itemInfoList);
}
}
}
@@ -256,17 +298,140 @@ public class BlockStorageMovementNeeded {
}
}
}
+
+ @Override
+ protected void checkPauseForTesting() throws InterruptedException {
+ // TODO implement if needed
+ }
+
+ @Override
+ protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+ throws IOException, InterruptedException {
+ assert getFSDirectory().hasReadLock();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing {} for statisy the policy",
+ inode.getFullPathName());
+ }
+ if (!inode.isFile()) {
+ return false;
+ }
+ if (inode.isFile() && inode.asFile().numBlocks() != 0) {
+ currentBatch.add(new ItemInfo(
+ ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
+ remainingCapacity--;
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean canSubmitCurrentBatch() {
+ return remainingCapacity <= 0;
+ }
+
+ @Override
+ protected void checkINodeReady(long startId) throws IOException {
+ FSNamesystem fsn = ((FSNamesystem) namesystem);
+ fsn.checkNameNodeSafeMode("NN is in safe mode,"
+ + "cannot satisfy the policy.");
+ // SPS work should be cancelled when NN goes to standby. Just
+ // double checking for sanity.
+ fsn.checkOperation(NameNode.OperationCategory.WRITE);
+ }
+
+ @Override
+ protected void submitCurrentBatch(long startId)
+ throws IOException, InterruptedException {
+ // Add current child's to queue
+ addAll(startId, currentBatch, false);
+ currentBatch.clear();
+ }
+
+ @Override
+ protected void throttle() throws InterruptedException {
+ assert !getFSDirectory().hasReadLock();
+ assert !namesystem.hasReadLock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+ + " waiting for some free slots.");
+ }
+ remainingCapacity = remainingCapacity();
+ // wait for queue to be free
+ while (remainingCapacity <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+ }
+ Thread.sleep(5000);
+ remainingCapacity = remainingCapacity();
+ }
+ }
+
+ @Override
+ protected boolean canTraverseDir(INode inode) throws IOException {
+ return true;
+ }
}
- public void start() {
- fileInodeIdCollector = new Daemon(new FileInodeIdCollector());
- fileInodeIdCollector.setName("FileInodeIdCollector");
- fileInodeIdCollector.start();
+ /**
+ * Info for directory recursive scan.
+ */
+ public static class DirPendingWorkInfo {
+
+ private int pendingWorkCount = 0;
+ private boolean fullyScanned = false;
+
+ /**
+ * Increment the pending work count for directory.
+ */
+ public synchronized void addPendingWorkCount(int count) {
+ this.pendingWorkCount = this.pendingWorkCount + count;
+ }
+
+ /**
+ * Decrement the pending work count for directory one track info is
+ * completed.
+ */
+ public synchronized void decrementPendingWorkCount() {
+ this.pendingWorkCount--;
+ }
+
+ /**
+ * Return true if all the pending work is done and directory fully
+ * scanned, otherwise false.
+ */
+ public synchronized boolean isDirWorkDone() {
+ return (pendingWorkCount <= 0 && fullyScanned);
+ }
+
+ /**
+ * Mark directory scan is completed.
+ */
+ public synchronized void markScanCompleted() {
+ this.fullyScanned = true;
+ }
}
- public void stop() {
- if (fileInodeIdCollector != null) {
- fileInodeIdCollector.interrupt();
+ public void init() {
+ inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
+ namesystem.getFSDirectory()));
+ inodeIdCollector.setName("FileInodeIdCollector");
+ inodeIdCollector.start();
+ }
+
+ public void close() {
+ if (inodeIdCollector != null) {
+ inodeIdCollector.interrupt();
+ }
+ }
+
+ class SPSTraverseInfo extends TraverseInfo {
+ private long startId;
+
+ SPSTraverseInfo(long startId) {
+ this.startId = startId;
+ }
+
+ public long getStartId() {
+ return startId;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
index b92fe9f..feacd74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 48d0598..a4372d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -77,7 +77,8 @@ public class StoragePolicySatisfier implements Runnable {
private final BlockStorageMovementNeeded storageMovementNeeded;
private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false;
-
+ private int spsWorkMultiplier;
+ private long blockCount = 0L;
/**
* Represents the collective analysis status for all blocks.
*/
@@ -106,7 +107,9 @@ public class StoragePolicySatisfier implements Runnable {
final BlockManager blkManager, Configuration conf) {
this.namesystem = namesystem;
this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
- this);
+ this, conf.getInt(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
this.blockManager = blkManager;
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
conf.getLong(
@@ -117,6 +120,7 @@ public class StoragePolicySatisfier implements Runnable {
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
storageMovementNeeded,
this);
+ this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
}
/**
@@ -143,7 +147,7 @@ public class StoragePolicySatisfier implements Runnable {
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
- storageMovementNeeded.start();
+ storageMovementNeeded.init();
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@@ -164,7 +168,7 @@ public class StoragePolicySatisfier implements Runnable {
return;
}
- storageMovementNeeded.stop();
+ storageMovementNeeded.close();
storagePolicySatisfierThread.interrupt();
this.storageMovementsMonitor.stop();
@@ -268,9 +272,13 @@ public class StoragePolicySatisfier implements Runnable {
}
}
}
- // TODO: We can think to make this as configurable later, how frequently
- // we want to check block movements.
- Thread.sleep(3000);
+ int numLiveDn = namesystem.getFSDirectory().getBlockManager()
+ .getDatanodeManager().getNumLiveDataNodes();
+ if (storageMovementNeeded.size() == 0
+ || blockCount > (numLiveDn * spsWorkMultiplier)) {
+ Thread.sleep(3000);
+ blockCount = 0L;
+ }
} catch (Throwable t) {
handleException(t);
}
@@ -380,6 +388,11 @@ public class StoragePolicySatisfier implements Runnable {
assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
blockMovingInfos, coordinatorNode);
+ int count = 0;
+ for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+ count = count + blkMovingInfo.getSources().length;
+ }
+ blockCount = blockCount + count;
return status;
}
@@ -840,7 +853,7 @@ public class StoragePolicySatisfier implements Runnable {
* - file inode/blockcollection id.
*/
public void satisfyStoragePolicy(Long inodeId) {
- //For file rootId and trackId is same
+ //For file startId and trackId is same
storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block "
@@ -864,19 +877,19 @@ public class StoragePolicySatisfier implements Runnable {
* policy.
*/
public static class ItemInfo {
- private long rootId;
+ private long startId;
private long trackId;
- public ItemInfo(long rootId, long trackId) {
- this.rootId = rootId;
+ public ItemInfo(long startId, long trackId) {
+ this.startId = startId;
this.trackId = trackId;
}
/**
- * Return the root of the current track Id.
+ * Return the start inode id of the current track Id.
*/
- public long getRootId() {
- return rootId;
+ public long getStartId() {
+ return startId;
}
/**
@@ -890,7 +903,7 @@ public class StoragePolicySatisfier implements Runnable {
* Returns true if the tracking path is a directory, false otherwise.
*/
public boolean isDir() {
- return (rootId != trackId);
+ return (startId != trackId);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 17f7795..41a74a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4510,6 +4510,29 @@
</property>
<property>
+ <name>dfs.storage.policy.satisfier.queue.limit</name>
+ <value>1000</value>
+ <description>
+ Storage policy satisfier queue size. This queue contains the currently
+ scheduled file's inode ID for statisfy the policy.
+ Default value is 1000.
+ </description>
+</property>
+
+<property>
+ <name>dfs.storage.policy.satisfier.work.multiplier.per.iteration</name>
+ <value>1</value>
+ <description>
+ *Note*: Advanced property. Change with caution.
+ This determines the total amount of block transfers to begin in
+ one iteration, for satisfy the policy. The actual number is obtained by
+ multiplying this multiplier with the total number of live nodes in the
+ cluster. The result number is the number of blocks to begin transfers
+ immediately. This number can be any positive, non-zero integer.
+ </description>
+</property>
+
+<property>
<name>dfs.storage.policy.satisfier.recheck.timeout.millis</name>
<value>300000</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index f6bbd10..c8a9466 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -112,7 +112,7 @@ SPS can be enabled and disabled dynamically without restarting the Namenode.
Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285)
-* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will consider the files which are immediate to that directory. Sub-directories won't be considered for satisfying the policy. Its user responsibility to call this API on directories recursively, to track all files under the sub tree.
+* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will scan all sub-directories and consider all the files for satisfy the policy..
* HdfsAdmin API :
`public void satisfyStoragePolicy(final Path path) throws IOException`
@@ -214,7 +214,6 @@ Get the storage policy of a file or a directory.
### Satisfy Storage Policy
Schedule blocks to move based on file's/directory's current storage policy.
-Note: For directory case, it will consider immediate files under that directory and it will not consider sub directories recursively.
* Command:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 55ebf9c..7918821 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -41,7 +41,7 @@ public class TestBlockStorageMovementAttemptedItems {
public void setup() throws Exception {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
Mockito.mock(Namesystem.class),
- Mockito.mock(StoragePolicySatisfier.class));
+ Mockito.mock(StoragePolicySatisfier.class), 100);
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index e7b9148..5bce296 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -191,7 +191,7 @@ public class TestPersistentStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
- childFileName, StorageType.DEFAULT, 3, timeout, fs);
+ childFileName, StorageType.ARCHIVE, 3, timeout, fs);
} finally {
clusterShutdown();
@@ -232,7 +232,9 @@ public class TestPersistentStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 2, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
- childFileName, StorageType.DEFAULT, 3, timeout, fs);
+ childFileName, StorageType.DISK, 1, timeout, fs);
+ DFSTestUtil.waitExpectedStorageType(
+ childFileName, StorageType.ARCHIVE, 2, timeout, fs);
} finally {
clusterShutdown();
}
@@ -269,7 +271,7 @@ public class TestPersistentStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
- childFileName, StorageType.DEFAULT, 3, timeout, fs);
+ childFileName, StorageType.ARCHIVE, 3, timeout, fs);
} finally {
clusterShutdown();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3dcf130/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 3375590..57e9f94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -21,6 +21,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -61,8 +64,10 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
import com.google.common.base.Supplier;
@@ -71,6 +76,12 @@ import com.google.common.base.Supplier;
* moved and finding its suggested target locations to move.
*/
public class TestStoragePolicySatisfier {
+
+ {
+ GenericTestUtils.setLogLevel(
+ getLogger(FSTreeTraverser.class), Level.DEBUG);
+ }
+
private static final String ONE_SSD = "ONE_SSD";
private static final String COLD = "COLD";
private static final Logger LOG =
@@ -341,7 +352,9 @@ public class TestStoragePolicySatisfier {
// take no effect for the sub-dir's file in the directory.
DFSTestUtil.waitExpectedStorageType(
- subFile2, StorageType.DEFAULT, 3, 30000, dfs);
+ subFile2, StorageType.SSD, 1, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(
+ subFile2, StorageType.DISK, 2, 30000, dfs);
} finally {
shutdownCluster();
}
@@ -1083,6 +1096,368 @@ public class TestStoragePolicySatisfier {
}
}
+ /**
+ * Test SPS for empty directory, xAttr should be removed.
+ */
+ @Test(timeout = 300000)
+ public void testSPSForEmptyDirectory() throws IOException, TimeoutException,
+ InterruptedException {
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ Path emptyDir = new Path("/emptyDir");
+ fs.mkdirs(emptyDir);
+ fs.satisfyStoragePolicy(emptyDir);
+ // Make sure satisfy xattr has been removed.
+ DFSTestUtil.waitForXattrRemoved("/emptyDir",
+ XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test SPS for not exist directory.
+ */
+ @Test(timeout = 300000)
+ public void testSPSForNonExistDirectory() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ Path emptyDir = new Path("/emptyDir");
+ try {
+ fs.satisfyStoragePolicy(emptyDir);
+ fail("FileNotFoundException should throw");
+ } catch (FileNotFoundException e) {
+ // nothing to do
+ }
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test SPS for directory tree which doesn't have files.
+ */
+ @Test(timeout = 300000)
+ public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ // Create directories
+ /*
+ * root
+ * |
+ * A--------C--------D
+ * |
+ * G----H----I
+ * |
+ * O
+ */
+ DistributedFileSystem fs = cluster.getFileSystem();
+ fs.mkdirs(new Path("/root/C/H/O"));
+ fs.mkdirs(new Path("/root/A"));
+ fs.mkdirs(new Path("/root/D"));
+ fs.mkdirs(new Path("/root/C/G"));
+ fs.mkdirs(new Path("/root/C/I"));
+ fs.satisfyStoragePolicy(new Path("/root"));
+ // Make sure satisfy xattr has been removed.
+ DFSTestUtil.waitForXattrRemoved("/root",
+ XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test SPS for directory which has multilevel directories.
+ */
+ @Test(timeout = 300000)
+ public void testMultipleLevelDirectoryForSatisfyStoragePolicy()
+ throws Exception {
+ try {
+ StorageType[][] diskTypes = new StorageType[][] {
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.SSD},
+ {StorageType.DISK, StorageType.DISK}};
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+ storagesPerDatanode, capacity);
+ dfs = hdfsCluster.getFileSystem();
+ createDirectoryTree(dfs);
+
+ List<String> files = getDFSListOfTree();
+ dfs.setStoragePolicy(new Path("/root"), COLD);
+ dfs.satisfyStoragePolicy(new Path("/root"));
+ for (String fileName : files) {
+ // Wait till the block is moved to ARCHIVE
+ DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
+ 30000, dfs);
+ }
+ } finally {
+ shutdownCluster();
+ }
+ }
+
+ /**
+ * Test SPS for batch processing.
+ */
+ @Test(timeout = 300000)
+ public void testBatchProcessingForSPSDirectory() throws Exception {
+ try {
+ StorageType[][] diskTypes = new StorageType[][] {
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.SSD},
+ {StorageType.DISK, StorageType.DISK}};
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ // Set queue max capacity
+ config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+ 5);
+ hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+ storagesPerDatanode, capacity);
+ dfs = hdfsCluster.getFileSystem();
+ createDirectoryTree(dfs);
+ List<String> files = getDFSListOfTree();
+ LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory
+ .getLog(FSTreeTraverser.class));
+
+ dfs.setStoragePolicy(new Path("/root"), COLD);
+ dfs.satisfyStoragePolicy(new Path("/root"));
+ for (String fileName : files) {
+ // Wait till the block is moved to ARCHIVE
+ DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
+ 30000, dfs);
+ }
+ waitForBlocksMovementResult(files.size(), 30000);
+ String expectedLogMessage = "StorageMovementNeeded queue remaining"
+ + " capacity is zero";
+ assertTrue("Log output does not contain expected log message: "
+ + expectedLogMessage, logs.getOutput().contains(expectedLogMessage));
+ } finally {
+ shutdownCluster();
+ }
+ }
+
+
+ /**
+ * Test traverse when parent got deleted.
+ * 1. Delete /root when traversing Q
+ * 2. U, R, S should not be in queued.
+ */
+ @Test
+ public void testTraverseWhenParentDeleted() throws Exception {
+ StorageType[][] diskTypes = new StorageType[][] {
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.SSD},
+ {StorageType.DISK, StorageType.DISK}};
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+ storagesPerDatanode, capacity);
+ dfs = hdfsCluster.getFileSystem();
+ createDirectoryTree(dfs);
+
+ List<String> expectedTraverseOrder = getDFSListOfTree();
+
+ //Remove files which will not be traverse when parent is deleted
+ expectedTraverseOrder.remove("/root/D/L/R");
+ expectedTraverseOrder.remove("/root/D/L/S");
+ expectedTraverseOrder.remove("/root/D/L/Q/U");
+ FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
+
+ //Queue limit can control the traverse logic to wait for some free
+ //entry in queue. After 10 files, traverse control will be on U.
+ StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
+ Mockito.when(sps.isRunning()).thenReturn(true);
+ BlockStorageMovementNeeded movmentNeededQueue =
+ new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
+ INode rootINode = fsDir.getINode("/root");
+ movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+ movmentNeededQueue.init();
+
+ //Wait for thread to reach U.
+ Thread.sleep(1000);
+
+ dfs.delete(new Path("/root/D/L"), true);
+
+ // Remove 10 element and make queue free, So other traversing will start.
+ for (int i = 0; i < 10; i++) {
+ String path = expectedTraverseOrder.remove(0);
+ long trackId = movmentNeededQueue.get().getTrackId();
+ INode inode = fsDir.getInode(trackId);
+ assertTrue("Failed to traverse tree, expected " + path + " but got "
+ + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+ }
+ //Wait to finish tree traverse
+ Thread.sleep(5000);
+
+ // Check other element traversed in order and R,S should not be added in
+ // queue which we already removed from expected list
+ for (String path : expectedTraverseOrder) {
+ long trackId = movmentNeededQueue.get().getTrackId();
+ INode inode = fsDir.getInode(trackId);
+ assertTrue("Failed to traverse tree, expected " + path + " but got "
+ + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+ }
+ dfs.delete(new Path("/root"), true);
+ }
+
+ /**
+ * Test traverse when root parent got deleted.
+ * 1. Delete L when traversing Q
+ * 2. E, M, U, R, S should not be in queued.
+ */
+ @Test
+ public void testTraverseWhenRootParentDeleted() throws Exception {
+ StorageType[][] diskTypes = new StorageType[][] {
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.SSD},
+ {StorageType.DISK, StorageType.DISK}};
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+ storagesPerDatanode, capacity);
+ dfs = hdfsCluster.getFileSystem();
+ createDirectoryTree(dfs);
+
+ List<String> expectedTraverseOrder = getDFSListOfTree();
+
+ // Remove files which will not be traverse when parent is deleted
+ expectedTraverseOrder.remove("/root/D/L/R");
+ expectedTraverseOrder.remove("/root/D/L/S");
+ expectedTraverseOrder.remove("/root/D/L/Q/U");
+ expectedTraverseOrder.remove("/root/D/M");
+ expectedTraverseOrder.remove("/root/E");
+ FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
+ StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
+ Mockito.when(sps.isRunning()).thenReturn(true);
+ // Queue limit can control the traverse logic to wait for some free
+ // entry in queue. After 10 files, traverse control will be on U.
+ BlockStorageMovementNeeded movmentNeededQueue =
+ new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
+ movmentNeededQueue.init();
+ INode rootINode = fsDir.getINode("/root");
+ movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+ // Wait for thread to reach U.
+ Thread.sleep(1000);
+
+ dfs.delete(new Path("/root/D/L"), true);
+
+ // Remove 10 element and make queue free, So other traversing will start.
+ for (int i = 0; i < 10; i++) {
+ String path = expectedTraverseOrder.remove(0);
+ long trackId = movmentNeededQueue.get().getTrackId();
+ INode inode = fsDir.getInode(trackId);
+ assertTrue("Failed to traverse tree, expected " + path + " but got "
+ + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+ }
+ // Wait to finish tree traverse
+ Thread.sleep(5000);
+
+ // Check other element traversed in order and E, M, U, R, S should not be
+ // added in queue which we already removed from expected list
+ for (String path : expectedTraverseOrder) {
+ long trackId = movmentNeededQueue.get().getTrackId();
+ INode inode = fsDir.getInode(trackId);
+ assertTrue("Failed to traverse tree, expected " + path + " but got "
+ + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+ }
+ dfs.delete(new Path("/root"), true);
+ }
+
+ private static void createDirectoryTree(DistributedFileSystem dfs)
+ throws Exception {
+ // tree structure
+ /*
+ * root
+ * |
+ * A--------B--------C--------D--------E
+ * | |
+ * F----G----H----I J----K----L----M
+ * | |
+ * N----O----P Q----R----S
+ * | |
+ * T U
+ */
+ // create root Node and child
+ dfs.mkdirs(new Path("/root"));
+ DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/B"));
+ DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/D"));
+ DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0);
+
+ // Create /root/B child
+ DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/B/G"));
+ DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0);
+ DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0);
+
+ // Create /root/D child
+ DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0);
+ DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/D/L"));
+ DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0);
+
+ // Create /root/B/G child
+ DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0);
+ DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0);
+ dfs.mkdirs(new Path("/root/B/G/P"));
+
+ // Create /root/D/L child
+ dfs.mkdirs(new Path("/root/D/L/Q"));
+ DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0);
+ DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0);
+
+ // Create /root/B/G/P child
+ DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0);
+
+ // Create /root/D/L/Q child
+ DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0);
+ }
+
+ private List<String> getDFSListOfTree() {
+ List<String> dfsList = new ArrayList<>();
+ dfsList.add("/root/A");
+ dfsList.add("/root/B/F");
+ dfsList.add("/root/B/G/N");
+ dfsList.add("/root/B/G/O");
+ dfsList.add("/root/B/G/P/T");
+ dfsList.add("/root/B/H");
+ dfsList.add("/root/B/I");
+ dfsList.add("/root/C");
+ dfsList.add("/root/D/J");
+ dfsList.add("/root/D/K");
+ dfsList.add("/root/D/L/Q/U");
+ dfsList.add("/root/D/L/R");
+ dfsList.add("/root/D/L/S");
+ dfsList.add("/root/D/M");
+ dfsList.add("/root/E");
+ return dfsList;
+ }
+
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
throws IOException {
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org