You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2017/08/23 22:37:30 UTC
hadoop git commit: HDFS-12225: [SPS]: Optimize extended attributes
for tracking SPS movements. Contributed by Surendra Singh Lilhore.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-10285 aff40b2ba -> e4bf35913
HDFS-12225: [SPS]: Optimize extended attributes for tracking SPS movements. Contributed by Surendra Singh Lilhore.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e4bf3591
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e4bf3591
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e4bf3591
Branch: refs/heads/HDFS-10285
Commit: e4bf359137b3ee3b63d3f92a732b62ee94831872
Parents: aff40b2
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Wed Aug 23 15:37:03 2017 -0700
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Wed Aug 23 15:37:03 2017 -0700
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 21 +-
.../server/blockmanagement/DatanodeManager.java | 14 +-
.../hdfs/server/datanode/BPOfferService.java | 1 +
.../BlockStorageMovementAttemptedItems.java | 95 +++++---
.../namenode/BlockStorageMovementNeeded.java | 233 ++++++++++++++++++-
.../namenode/FSDirSatisfyStoragePolicyOp.java | 91 +++-----
.../hdfs/server/namenode/FSDirXAttrOp.java | 11 +-
.../hdfs/server/namenode/FSDirectory.java | 2 +-
.../hdfs/server/namenode/FSNamesystem.java | 2 +-
.../server/namenode/StoragePolicySatisfier.java | 108 ++++++---
.../TestStoragePolicySatisfyWorker.java | 5 +-
.../TestBlockStorageMovementAttemptedItems.java | 34 +--
.../TestPersistentStoragePolicySatisfier.java | 104 +++++++++
.../namenode/TestStoragePolicySatisfier.java | 127 +++++-----
14 files changed, 589 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index b55fabe..eee4458 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -89,7 +89,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementNeeded;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@@ -421,9 +420,6 @@ public class BlockManager implements BlockStatsMXBean {
private final StoragePolicySatisfier sps;
private final boolean storagePolicyEnabled;
private boolean spsEnabled;
- private final BlockStorageMovementNeeded storageMovementNeeded =
- new BlockStorageMovementNeeded();
-
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
*/
@@ -466,8 +462,7 @@ public class BlockManager implements BlockStatsMXBean {
conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
- sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this,
- conf);
+ sps = new StoragePolicySatisfier(namesystem, this, conf);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
this.maxCorruptFilesReturned = conf.getInt(
@@ -4850,20 +4845,6 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- * Set file block collection for which storage movement needed for its blocks.
- *
- * @param id
- * - file block collection id.
- */
- public void satisfyStoragePolicy(long id) {
- storageMovementNeeded.add(id);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added block collection id {} to block "
- + "storageMovementNeeded queue", id);
- }
- }
-
- /**
* Gets the storage policy satisfier instance.
*
* @return sps
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 600d5ed..9b88809 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1724,6 +1724,13 @@ public class DatanodeManager {
}
}
+ if (nodeinfo.shouldDropSPSWork()) {
+ cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
+ // Set back to false to indicate that the new value has been sent to the
+ // datanode.
+ nodeinfo.setDropSPSWork(false);
+ }
+
// check pending block storage movement tasks
BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo
.getBlocksToMoveStorages();
@@ -1735,13 +1742,6 @@ public class DatanodeManager {
blkStorageMovementInfosBatch.getBlockMovingInfo()));
}
- if (nodeinfo.shouldDropSPSWork()) {
- cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
- // Set back to false to indicate that the new value has been sent to the
- // datanode.
- nodeinfo.setDropSPSWork(false);
- }
-
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 8213ff1..c18cc24 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -802,6 +802,7 @@ class BPOfferService {
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
+ case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
break;
default:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 37833e2..278b62b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
import org.apache.hadoop.util.Daemon;
@@ -54,7 +55,7 @@ public class BlockStorageMovementAttemptedItems {
* A map holds the items which are already taken for blocks movements
* processing and sent to DNs.
*/
- private final Map<Long, ItemInfo> storageMovementAttemptedItems;
+ private final Map<Long, AttemptedItemInfo> storageMovementAttemptedItems;
private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
@@ -91,18 +92,19 @@ public class BlockStorageMovementAttemptedItems {
* Add item to block storage movement attempted items map which holds the
* tracking/blockCollection id versus time stamp.
*
- * @param blockCollectionID
- * - tracking id / block collection id
+ * @param itemInfo
+ * - tracking info
* @param allBlockLocsAttemptedToSatisfy
- * - failed to find matching target nodes to satisfy storage type for
- * all the block locations of the given blockCollectionID
+ * - failed to find matching target nodes to satisfy storage type
+ * for all the block locations of the given blockCollectionID
*/
- public void add(Long blockCollectionID,
- boolean allBlockLocsAttemptedToSatisfy) {
+ public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
synchronized (storageMovementAttemptedItems) {
- ItemInfo itemInfo = new ItemInfo(monotonicNow(),
+ AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
+ itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(),
allBlockLocsAttemptedToSatisfy);
- storageMovementAttemptedItems.put(blockCollectionID, itemInfo);
+ storageMovementAttemptedItems.put(itemInfo.getTrackId(),
+ attemptedItemInfo);
}
}
@@ -167,21 +169,27 @@ public class BlockStorageMovementAttemptedItems {
* satisfy storage policy. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/
- private final static class ItemInfo {
+ private final static class AttemptedItemInfo extends ItemInfo {
private long lastAttemptedOrReportedTime;
private final boolean allBlockLocsAttemptedToSatisfy;
/**
- * ItemInfo constructor.
+ * AttemptedItemInfo constructor.
*
+ * @param rootId
+ * rootId for trackId
+ * @param trackId
+ * trackId for file.
* @param lastAttemptedOrReportedTime
* last attempted or reported time
* @param allBlockLocsAttemptedToSatisfy
* whether all the blocks in the trackID were attempted and blocks
* movement has been scheduled to satisfy storage policy
*/
- private ItemInfo(long lastAttemptedOrReportedTime,
+ private AttemptedItemInfo(long rootId, long trackId,
+ long lastAttemptedOrReportedTime,
boolean allBlockLocsAttemptedToSatisfy) {
+ super(rootId, trackId);
this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
}
@@ -211,6 +219,7 @@ public class BlockStorageMovementAttemptedItems {
private void touchLastReportedTimeStamp() {
this.lastAttemptedOrReportedTime = monotonicNow();
}
+
}
/**
@@ -239,18 +248,20 @@ public class BlockStorageMovementAttemptedItems {
@VisibleForTesting
void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedItems) {
- Iterator<Entry<Long, ItemInfo>> iter = storageMovementAttemptedItems
- .entrySet().iterator();
+ Iterator<Entry<Long, AttemptedItemInfo>> iter =
+ storageMovementAttemptedItems.entrySet().iterator();
long now = monotonicNow();
while (iter.hasNext()) {
- Entry<Long, ItemInfo> entry = iter.next();
- ItemInfo itemInfo = entry.getValue();
+ Entry<Long, AttemptedItemInfo> entry = iter.next();
+ AttemptedItemInfo itemInfo = entry.getValue();
if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) {
Long blockCollectionID = entry.getKey();
synchronized (storageMovementAttemptedResults) {
if (!isExistInResult(blockCollectionID)) {
- blockStorageMovementNeeded.add(blockCollectionID);
+ ItemInfo candidate = new ItemInfo(
+ itemInfo.getRootId(), blockCollectionID);
+ blockStorageMovementNeeded.add(candidate);
iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed "
+ "retries queue for next iteration.", blockCollectionID);
@@ -297,17 +308,30 @@ public class BlockStorageMovementAttemptedItems {
synchronized (storageMovementAttemptedItems) {
Status status = storageMovementAttemptedResult.getStatus();
long trackId = storageMovementAttemptedResult.getTrackId();
- ItemInfo itemInfo;
+ AttemptedItemInfo attemptedItemInfo = storageMovementAttemptedItems
+ .get(trackId);
+ // itemInfo is null means no root for trackId, using trackId only as
+ // root and handling it in
+ // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
+ // the xAttr
+ ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
+ ? attemptedItemInfo.getRootId() : trackId, trackId);
switch (status) {
case FAILURE:
- blockStorageMovementNeeded.add(trackId);
- LOG.warn("Blocks storage movement results for the tracking id: {}"
- + " is reported from co-ordinating datanode, but result"
- + " status is FAILURE. So, added for retry", trackId);
+ if (attemptedItemInfo != null) {
+ blockStorageMovementNeeded.add(itemInfo);
+ LOG.warn("Blocks storage movement results for the tracking id:"
+ + "{} is reported from co-ordinating datanode, but result"
+ + " status is FAILURE. So, added for retry", trackId);
+ } else {
+ LOG.info("Blocks storage movement is FAILURE for the track"
+ + " id {}. But the trackID doesn't exists in"
+ + " storageMovementAttemptedItems list.", trackId);
+ blockStorageMovementNeeded
+ .removeItemTrackInfo(itemInfo);
+ }
break;
case SUCCESS:
- itemInfo = storageMovementAttemptedItems.get(trackId);
-
// ItemInfo could be null. One case is, before the blocks movements
// result arrives the attempted trackID became timed out and then
// removed the trackID from the storageMovementAttemptedItems list.
@@ -318,33 +342,32 @@ public class BlockStorageMovementAttemptedItems {
// storage policy in previous SPS iteration.
String msg = "Blocks storage movement is SUCCESS for the track id: "
+ trackId + " reported from co-ordinating datanode.";
- if (itemInfo != null) {
- if (!itemInfo.isAllBlockLocsAttemptedToSatisfy()) {
- blockStorageMovementNeeded.add(trackId);
+ if (attemptedItemInfo != null) {
+ if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
+ blockStorageMovementNeeded
+ .add(new ItemInfo(attemptedItemInfo.getRootId(), trackId));
LOG.warn("{} But adding trackID back to retry queue as some of"
+ " the blocks couldn't find matching target nodes in"
+ " previous SPS iteration.", msg);
} else {
LOG.info(msg);
- // Remove xattr for the track id.
- this.sps.postBlkStorageMovementCleanup(
- storageMovementAttemptedResult.getTrackId());
+ blockStorageMovementNeeded
+ .removeItemTrackInfo(itemInfo);
}
} else {
LOG.info("{} But the trackID doesn't exists in "
+ "storageMovementAttemptedItems list", msg);
- // Remove xattr for the track id.
- this.sps.postBlkStorageMovementCleanup(
- storageMovementAttemptedResult.getTrackId());
+ blockStorageMovementNeeded
+ .removeItemTrackInfo(itemInfo);
}
break;
case IN_PROGRESS:
isInprogress = true;
- itemInfo = storageMovementAttemptedItems
+ attemptedItemInfo = storageMovementAttemptedItems
.get(storageMovementAttemptedResult.getTrackId());
- if(itemInfo != null){
+ if(attemptedItemInfo != null){
// update the attempted expiration time to next cycle.
- itemInfo.touchLastReportedTimeStamp();
+ attemptedItemInfo.touchLastReportedTimeStamp();
}
break;
default:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
index 3241e6d..41a3a6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
@@ -17,28 +17,86 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * A Class to track the block collection IDs for which physical storage movement
- * needed as per the Namespace and StorageReports from DN.
+ * A Class to track the block collection IDs (Inode's ID) for which physical
+ * storage movement needed as per the Namespace and StorageReports from DN.
+ * It scan the pending directories for which storage movement is required and
+ * schedule the block collection IDs for movement. It track the info of
+ * scheduled items and remove the SPS xAttr from the file/Directory once
+ * movement is success.
*/
@InterfaceAudience.Private
public class BlockStorageMovementNeeded {
- private final Queue<Long> storageMovementNeeded = new LinkedList<Long>();
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
+
+ private final Queue<ItemInfo> storageMovementNeeded =
+ new LinkedList<ItemInfo>();
/**
- * Add the block collection id to tracking list for which storage movement
+ * Map of rootId and number of child's. Number of child's indicate the number
+ * of files pending to satisfy the policy.
+ */
+ private final Map<Long, Integer> pendingWorkForDirectory =
+ new HashMap<Long, Integer>();
+
+ private final Namesystem namesystem;
+
+ // List of pending dir to satisfy the policy
+ private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
+
+ private final StoragePolicySatisfier sps;
+
+ private Daemon fileInodeIdCollector;
+
+ public BlockStorageMovementNeeded(Namesystem namesystem,
+ StoragePolicySatisfier sps) {
+ this.namesystem = namesystem;
+ this.sps = sps;
+ }
+
+ /**
+ * Add the candidate to tracking list for which storage movement
* expected if necessary.
*
- * @param blockCollectionID
- * - block collection id, which is nothing but inode id.
+ * @param trackInfo
+ * - track info for satisfy the policy
*/
- public synchronized void add(Long blockCollectionID) {
- storageMovementNeeded.add(blockCollectionID);
+ public synchronized void add(ItemInfo trackInfo) {
+ storageMovementNeeded.add(trackInfo);
+ }
+
+ /**
+ * Add the itemInfo to tracking list for which storage movement
+ * expected if necessary.
+ * @param rootId
+ * - root inode id
+ * @param itemInfoList
+ * - List of child in the directory
+ */
+ private synchronized void addAll(Long rootId,
+ List<ItemInfo> itemInfoList) {
+ storageMovementNeeded.addAll(itemInfoList);
+ pendingWorkForDirectory.put(rootId, itemInfoList.size());
}
/**
@@ -47,11 +105,168 @@ public class BlockStorageMovementNeeded {
*
* @return block collection ID
*/
- public synchronized Long get() {
+ public synchronized ItemInfo get() {
return storageMovementNeeded.poll();
}
+ public synchronized void addToPendingDirQueue(long id) {
+ spsDirsToBeTraveresed.add(id);
+ // Notify waiting FileInodeIdCollector thread about the newly
+ // added SPS path.
+ synchronized (spsDirsToBeTraveresed) {
+ spsDirsToBeTraveresed.notify();
+ }
+ }
+
public synchronized void clearAll() {
+ spsDirsToBeTraveresed.clear();
storageMovementNeeded.clear();
+ pendingWorkForDirectory.clear();
+ }
+
+ /**
+ * Decrease the pending child count for directory once one file blocks moved
+ * successfully. Remove the SPS xAttr if pending child count is zero.
+ */
+ public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
+ throws IOException {
+ if (trackInfo.isDir()) {
+ // If track is part of some root then reduce the pending directory work
+ // count.
+ long rootId = trackInfo.getRootId();
+ INode inode = namesystem.getFSDirectory().getInode(rootId);
+ if (inode == null) {
+ // directory deleted just remove it.
+ this.pendingWorkForDirectory.remove(rootId);
+ } else {
+ if (pendingWorkForDirectory.get(rootId) != null) {
+ Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1;
+ pendingWorkForDirectory.put(rootId, pendingWork);
+ if (pendingWork <= 0) {
+ namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY);
+ pendingWorkForDirectory.remove(rootId);
+ }
+ }
+ }
+ } else {
+ // Remove xAttr if trackID doesn't exist in
+ // storageMovementAttemptedItems or file policy satisfied.
+ namesystem.removeXattr(trackInfo.getTrackId(),
+ XATTR_SATISFY_STORAGE_POLICY);
+ }
+ }
+
+ public synchronized void clearQueue(long trackId) {
+ spsDirsToBeTraveresed.remove(trackId);
+ Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
+ while (iterator.hasNext()) {
+ ItemInfo next = iterator.next();
+ if (next.getRootId() == trackId) {
+ iterator.remove();
+ }
+ }
+ pendingWorkForDirectory.remove(trackId);
+ }
+
+ /**
+ * Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
+ * and notify to clean up required resources.
+ * @throws IOException
+ */
+ public synchronized void clearQueuesWithNotification() {
+ // Remove xAttr from directories
+ Long trackId;
+ while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
+ try {
+ // Remove xAttr for file
+ namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
+ } catch (IOException ie) {
+ LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
+ }
+ }
+
+ // File's directly added to storageMovementNeeded, So try to remove
+ // xAttr for file
+ ItemInfo itemInfo;
+ while ((itemInfo = storageMovementNeeded.poll()) != null) {
+ try {
+ // Remove xAttr for file
+ if (!itemInfo.isDir()) {
+ namesystem.removeXattr(itemInfo.getTrackId(),
+ XATTR_SATISFY_STORAGE_POLICY);
+ }
+ } catch (IOException ie) {
+ LOG.warn(
+ "Failed to remove SPS xattr for track id "
+ + itemInfo.getTrackId(), ie);
+ }
+ }
+ this.clearAll();
+ }
+
+ /**
+ * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
+ * ID's to process for satisfy the policy.
+ */
+ private class FileInodeIdCollector implements Runnable {
+ @Override
+ public void run() {
+ LOG.info("Starting FileInodeIdCollector!.");
+ while (namesystem.isRunning() && sps.isRunning()) {
+ try {
+ if (!namesystem.isInSafeMode()) {
+ FSDirectory fsd = namesystem.getFSDirectory();
+ Long rootINodeId = spsDirsToBeTraveresed.poll();
+ if (rootINodeId == null) {
+ // Waiting for SPS path
+ synchronized (spsDirsToBeTraveresed) {
+ spsDirsToBeTraveresed.wait(5000);
+ }
+ } else {
+ INode rootInode = fsd.getInode(rootINodeId);
+ if (rootInode != null) {
+ // TODO : HDFS-12291
+ // 1. Implement an efficient recursive directory iteration
+ // mechanism and satisfies storage policy for all the files
+ // under the given directory.
+ // 2. Process files in batches,so datanodes workload can be
+ // handled.
+ List<ItemInfo> itemInfoList =
+ new ArrayList<>();
+ for (INode childInode : rootInode.asDirectory()
+ .getChildrenList(Snapshot.CURRENT_STATE_ID)) {
+ if (childInode.isFile()
+ && childInode.asFile().numBlocks() != 0) {
+ itemInfoList.add(
+ new ItemInfo(rootINodeId, childInode.getId()));
+ }
+ }
+ if (itemInfoList.isEmpty()) {
+ // satisfy track info is empty, so remove the xAttr from the
+ // directory
+ namesystem.removeXattr(rootINodeId,
+ XATTR_SATISFY_STORAGE_POLICY);
+ }
+ addAll(rootINodeId, itemInfoList);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn("Exception while loading inodes to satisfy the policy", t);
+ }
+ }
+ }
+ }
+
+ public void start() {
+ fileInodeIdCollector = new Daemon(new FileInodeIdCollector());
+ fileInodeIdCollector.setName("FileInodeIdCollector");
+ fileInodeIdCollector.start();
+ }
+
+ public void stop() {
+ if (fileInodeIdCollector != null) {
+ fileInodeIdCollector.interrupt();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
index bd4e5ed..fb6eec9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -31,6 +30,7 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import com.google.common.collect.Lists;
@@ -60,10 +60,24 @@ final class FSDirSatisfyStoragePolicyOp {
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
- XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd);
- if (satisfyXAttr != null) {
+ INode inode = FSDirectory.resolveLastINode(iip);
+ if (inodeHasSatisfyXAttr(inode)) {
+ throw new IOException(
+ "Cannot request to call satisfy storage policy on path "
+ + inode.getFullPathName()
+ + ", as this file/dir was already called for satisfying "
+ + "storage policy.");
+ }
+ if (unprotectedSatisfyStoragePolicy(inode, fsd)) {
+ XAttr satisfyXAttr = XAttrHelper
+ .buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
xAttrs.add(satisfyXAttr);
+ List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+ List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
+ xAttrs, EnumSet.of(XAttrSetFlag.CREATE));
+ XAttrStorage.updateINodeXAttrs(inode, newXAttrs,
+ iip.getLatestSnapshotId());
fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
}
} finally {
@@ -72,62 +86,29 @@ final class FSDirSatisfyStoragePolicyOp {
return fsd.getAuditFileInfo(iip);
}
- static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip,
- BlockManager bm, FSDirectory fsd) throws IOException {
-
- final INode inode = FSDirectory.resolveLastINode(iip);
- final int snapshotId = iip.getLatestSnapshotId();
- final List<INode> candidateNodes = new ArrayList<>();
-
- // TODO: think about optimization here, label the dir instead
- // of the sub-files of the dir.
+ static boolean unprotectedSatisfyStoragePolicy(INode inode, FSDirectory fsd) {
if (inode.isFile() && inode.asFile().numBlocks() != 0) {
- candidateNodes.add(inode);
- } else if (inode.isDirectory()) {
- for (INode node : inode.asDirectory().getChildrenList(snapshotId)) {
- if (node.isFile() && node.asFile().numBlocks() != 0) {
- candidateNodes.add(node);
- }
- }
- }
-
- if (candidateNodes.isEmpty()) {
- return null;
+ // Adding directly in the storageMovementNeeded queue, So it can
+ // get more priority compare to directory.
+ fsd.getBlockManager().getStoragePolicySatisfier()
+ .satisfyStoragePolicy(inode.getId());
+ return true;
+ } else if (inode.isDirectory()
+ && inode.asDirectory().getChildrenNum(Snapshot.CURRENT_STATE_ID) > 0) {
+ // Adding directory in the pending queue, so FileInodeIdCollector process
+ // directory child in batch and recursively
+ fsd.getBlockManager().getStoragePolicySatisfier()
+ .addInodeToPendingDirQueue(inode.getId());
+ return true;
}
- // If node has satisfy xattr, then stop adding it
- // to satisfy movement queue.
- if (inodeHasSatisfyXAttr(candidateNodes)) {
- throw new IOException(
- "Cannot request to call satisfy storage policy on path "
- + iip.getPath()
- + ", as this file/dir was already called for satisfying "
- + "storage policy.");
- }
-
- final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
- final XAttr satisfyXAttr = XAttrHelper
- .buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
- xattrs.add(satisfyXAttr);
-
- for (INode node : candidateNodes) {
- bm.satisfyStoragePolicy(node.getId());
- List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(node);
- List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
- xattrs, EnumSet.of(XAttrSetFlag.CREATE));
- XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId);
- }
- return satisfyXAttr;
+ return false;
}
- private static boolean inodeHasSatisfyXAttr(List<INode> candidateNodes) {
- // If the node is a directory and one of the child files
- // has satisfy xattr, then return true for this directory.
- for (INode inode : candidateNodes) {
- final XAttrFeature f = inode.getXAttrFeature();
- if (inode.isFile() && f != null
- && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
- return true;
- }
+ private static boolean inodeHasSatisfyXAttr(INode inode) {
+ final XAttrFeature f = inode.getXAttrFeature();
+ if (inode.isFile() && f != null
+ && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
+ return true;
}
return false;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index df426bc..9795b29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -195,6 +195,14 @@ class FSDirXAttrOp {
List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
removedXAttrs);
if (existingXAttrs.size() != newXAttrs.size()) {
+ for (XAttr xattr : toRemove) {
+ if (XATTR_SATISFY_STORAGE_POLICY
+ .equals(XAttrHelper.getPrefixedName(xattr))) {
+ fsd.getBlockManager().getStoragePolicySatisfier()
+ .clearQueue(inode.getId());
+ break;
+ }
+ }
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
return removedXAttrs;
}
@@ -280,8 +288,7 @@ class FSDirXAttrOp {
// Add inode id to movement queue if xattrs contain satisfy xattr.
if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) {
- FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(iip,
- fsd.getBlockManager(), fsd);
+ FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(inode, fsd);
continue;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 87030db9..90bc23a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1352,7 +1352,7 @@ public class FSDirectory implements Closeable {
if (xattr == null) {
return;
}
- getBlockManager().satisfyStoragePolicy(inode.getId());
+ FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(inode, this);
}
private void addEncryptionZone(INodeWithAdditionalFields inode,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 68931e2..9d9e05e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1298,7 +1298,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
writeLock();
try {
if (blockManager != null) {
- blockManager.stopSPS(true);
+ blockManager.stopSPS(false);
}
stopSecretManager();
leaseManager.stopMonitor();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 3165813..48d0598 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
-
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -106,10 +103,10 @@ public class StoragePolicySatisfier implements Runnable {
}
public StoragePolicySatisfier(final Namesystem namesystem,
- final BlockStorageMovementNeeded storageMovementNeeded,
final BlockManager blkManager, Configuration conf) {
this.namesystem = namesystem;
- this.storageMovementNeeded = storageMovementNeeded;
+ this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
+ this);
this.blockManager = blkManager;
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
conf.getLong(
@@ -146,7 +143,7 @@ public class StoragePolicySatisfier implements Runnable {
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
-
+ storageMovementNeeded.start();
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@@ -162,14 +159,17 @@ public class StoragePolicySatisfier implements Runnable {
*/
public synchronized void disable(boolean forceStop) {
isRunning = false;
+
if (storagePolicySatisfierThread == null) {
return;
}
+ storageMovementNeeded.stop();
+
storagePolicySatisfierThread.interrupt();
this.storageMovementsMonitor.stop();
if (forceStop) {
- this.clearQueuesWithNotification();
+ storageMovementNeeded.clearQueuesWithNotification();
addDropSPSWorkCommandsToAllDNs();
} else {
LOG.info("Stopping StoragePolicySatisfier.");
@@ -184,6 +184,7 @@ public class StoragePolicySatisfier implements Runnable {
disable(true);
}
this.storageMovementsMonitor.stopGracefully();
+
if (storagePolicySatisfierThread == null) {
return;
}
@@ -220,10 +221,11 @@ public class StoragePolicySatisfier implements Runnable {
while (namesystem.isRunning() && isRunning) {
try {
if (!namesystem.isInSafeMode()) {
- Long blockCollectionID = storageMovementNeeded.get();
- if (blockCollectionID != null) {
+ ItemInfo itemInfo = storageMovementNeeded.get();
+ if (itemInfo != null) {
+ long trackId = itemInfo.getTrackId();
BlockCollection blockCollection =
- namesystem.getBlockCollection(blockCollectionID);
+ namesystem.getBlockCollection(trackId);
// Check blockCollectionId existence.
if (blockCollection != null) {
BlocksMovingAnalysisStatus status =
@@ -234,21 +236,21 @@ public class StoragePolicySatisfier implements Runnable {
// Just add to monitor, so it will be tracked for result and
// be removed on successful storage movement result.
case ALL_BLOCKS_TARGETS_PAIRED:
- this.storageMovementsMonitor.add(blockCollectionID, true);
+ this.storageMovementsMonitor.add(itemInfo, true);
break;
// Add to monitor with allBlcoksAttemptedToSatisfy flag false, so
// that it will be tracked and still it will be consider for retry
// as analysis was not found targets for storage movement blocks.
case FEW_BLOCKS_TARGETS_PAIRED:
- this.storageMovementsMonitor.add(blockCollectionID, false);
+ this.storageMovementsMonitor.add(itemInfo, false);
break;
case FEW_LOW_REDUNDANCY_BLOCKS:
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding trackID " + blockCollectionID
+ LOG.debug("Adding trackID " + trackId
+ " back to retry queue as some of the blocks"
+ " are low redundant.");
}
- this.storageMovementNeeded.add(blockCollectionID);
+ this.storageMovementNeeded.add(itemInfo);
break;
// Just clean Xattrs
case BLOCKS_TARGET_PAIRING_SKIPPED:
@@ -256,9 +258,13 @@ public class StoragePolicySatisfier implements Runnable {
default:
LOG.info("Block analysis skipped or blocks already satisfied"
+ " with storages. So, Cleaning up the Xattrs.");
- postBlkStorageMovementCleanup(blockCollectionID);
+ storageMovementNeeded.removeItemTrackInfo(itemInfo);
break;
}
+ } else {
+ // File doesn't exists (maybe got deleted), remove trackId from
+ // the queue
+ storageMovementNeeded.removeItemTrackInfo(itemInfo);
}
}
}
@@ -828,31 +834,63 @@ public class StoragePolicySatisfier implements Runnable {
}
/**
- * Clean all the movements in storageMovementNeeded and notify
- * to clean up required resources.
- * @throws IOException
+ * Set file inode in queue for which storage movement needed for its blocks.
+ *
+ * @param inodeId
+ * - file inode/blockcollection id.
*/
- private void clearQueuesWithNotification() {
- Long id;
- while ((id = storageMovementNeeded.get()) != null) {
- try {
- postBlkStorageMovementCleanup(id);
- } catch (IOException ie) {
- LOG.warn("Failed to remove SPS "
- + "xattr for collection id " + id, ie);
- }
+ public void satisfyStoragePolicy(Long inodeId) {
+ //For file rootId and trackId is same
+ storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added track info for inode {} to block "
+ + "storageMovementNeeded queue", inodeId);
}
}
+ public void addInodeToPendingDirQueue(long id) {
+ storageMovementNeeded.addToPendingDirQueue(id);
+ }
+
+ /**
+ * Clear queues for given track id.
+ */
+ public void clearQueue(long trackId) {
+ storageMovementNeeded.clearQueue(trackId);
+ }
+
/**
- * When block movement has been finished successfully, some additional
- * operations should be notified, for example, SPS xattr should be
- * removed.
- * @param trackId track id i.e., block collection id.
- * @throws IOException
+ * ItemInfo is a file info object for which need to satisfy the
+ * policy.
*/
- public void postBlkStorageMovementCleanup(long trackId)
- throws IOException {
- this.namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
+ public static class ItemInfo {
+ private long rootId;
+ private long trackId;
+
+ public ItemInfo(long rootId, long trackId) {
+ this.rootId = rootId;
+ this.trackId = trackId;
+ }
+
+ /**
+ * Return the root of the current track Id.
+ */
+ public long getRootId() {
+ return rootId;
+ }
+
+ /**
+ * Return the File inode Id for which needs to satisfy the policy.
+ */
+ public long getTrackId() {
+ return trackId;
+ }
+
+ /**
+ * Returns true if the tracking path is a directory, false otherwise.
+ */
+ public boolean isDir() {
+ return (rootId != trackId);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index 402d4d1..b84b1d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@@ -115,9 +114,7 @@ public class TestStoragePolicySatisfyWorker {
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
- FSNamesystem namesystem = cluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
cluster.triggerHeartbeats();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 04a63ac..55ebf9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.junit.Assert.*;
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.junit.After;
import org.junit.Before;
@@ -38,7 +39,9 @@ public class TestBlockStorageMovementAttemptedItems {
@Before
public void setup() throws Exception {
- unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
+ unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
+ Mockito.mock(Namesystem.class),
+ Mockito.mock(StoragePolicySatisfier.class));
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
@@ -57,9 +60,9 @@ public class TestBlockStorageMovementAttemptedItems {
long stopTime = monotonicNow() + (retryTimeout * 2);
boolean isItemFound = false;
while (monotonicNow() < (stopTime)) {
- Long ele = null;
+ ItemInfo ele = null;
while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
- if (item.longValue() == ele.longValue()) {
+ if (item == ele.getTrackId()) {
isItemFound = true;
break;
}
@@ -77,7 +80,7 @@ public class TestBlockStorageMovementAttemptedItems {
public void testAddResultWithFailureResult() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234);
- bsmAttemptedItems.add(item, true);
+ bsmAttemptedItems.add(new ItemInfo(0L, item), true);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
@@ -88,7 +91,7 @@ public class TestBlockStorageMovementAttemptedItems {
public void testAddResultWithSucessResult() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234);
- bsmAttemptedItems.add(item, true);
+ bsmAttemptedItems.add(new ItemInfo(0L, item), true);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@@ -99,7 +102,7 @@ public class TestBlockStorageMovementAttemptedItems {
public void testNoResultAdded() throws Exception {
bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234);
- bsmAttemptedItems.add(item, true);
+ bsmAttemptedItems.add(new ItemInfo(0L, item), true);
// After self retry timeout, it should be added back for retry
assertTrue("Failed to add to the retry list",
checkItemMovedForRetry(item, 600));
@@ -115,7 +118,7 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried1() throws Exception {
Long item = new Long(1234);
- bsmAttemptedItems.add(item, false);
+ bsmAttemptedItems.add(new ItemInfo(0L, item), false);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@@ -136,7 +139,7 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried2() throws Exception {
Long item = new Long(1234);
- bsmAttemptedItems.add(item, false);
+ bsmAttemptedItems.add(new ItemInfo(0L, item), false);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@@ -153,17 +156,20 @@ public class TestBlockStorageMovementAttemptedItems {
}
/**
- * Partial block movement with only BlocksStorageMovementResult#FAILURE result
- * and storageMovementAttemptedItems list is empty.
+ * Partial block movement with only BlocksStorageMovementResult#FAILURE
+ * result and storageMovementAttemptedItems list is empty.
*/
@Test(timeout = 30000)
- public void testPartialBlockMovementShouldBeRetried3() throws Exception {
+ public void testPartialBlockMovementWithEmptyAttemptedQueue()
+ throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
- item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
+ item, BlocksStorageMovementResult.Status.FAILURE)});
bsmAttemptedItems.blockStorageMovementResultCheck();
- assertTrue("Failed to add to the retry list",
+ assertFalse(
+ "Should not add in queue again if it is not there in"
+ + " storageMovementAttemptedItems",
checkItemMovedForRetry(item, 5000));
assertEquals("Failed to remove from the attempted list", 0,
bsmAttemptedItems.getAttemptedItemsCount());
@@ -176,7 +182,7 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000)
public void testPartialBlockMovementShouldBeRetried4() throws Exception {
Long item = new Long(1234);
- bsmAttemptedItems.add(item, false);
+ bsmAttemptedItems.add(new ItemInfo(0L, item), false);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index 8516ea0..e7b9148 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -20,16 +20,22 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
+import com.google.common.base.Supplier;
+
import java.io.IOException;
+import java.util.List;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.*;
@@ -482,6 +488,104 @@ public class TestPersistentStoragePolicySatisfier {
}
/**
+ * Test SPS xAttr on directory. xAttr should be removed from the directory
+ * once all the files blocks moved to specific storage.
+ */
+ @Test(timeout = 300000)
+ public void testSPSxAttrWhenSpsCalledForDir() throws Exception {
+ try {
+ clusterSetUp();
+ Path parent = new Path("/parent");
+ // create parent dir
+ fs.mkdirs(parent);
+
+ // create 10 child files
+ for (int i = 0; i < 5; i++) {
+ DFSTestUtil.createFile(fs, new Path(parent, "f" + i), 1024, (short) 3,
+ 0);
+ }
+
+ // Set storage policy for parent directory
+ fs.setStoragePolicy(parent, "COLD");
+
+ // Stop one DN so we can check the SPS xAttr for directory.
+ DataNodeProperties stopDataNode = cluster.stopDataNode(0);
+
+ fs.satisfyStoragePolicy(parent);
+
+ // Check xAttr for parent directory
+ FSNamesystem namesystem = cluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode("/parent");
+ XAttrFeature f = inode.getXAttrFeature();
+ assertTrue("SPS xAttr should be exist",
+ f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null);
+
+ // check for the child, SPS xAttr should not be there
+ for (int i = 0; i < 5; i++) {
+ inode = namesystem.getFSDirectory().getINode("/parent/f" + i);
+ f = inode.getXAttrFeature();
+ assertTrue(f == null);
+ }
+
+ cluster.restartDataNode(stopDataNode, false);
+
+ // wait and check all the file block moved in ARCHIVE
+ for (int i = 0; i < 5; i++) {
+ DFSTestUtil.waitExpectedStorageType("/parent/f" + i,
+ StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
+ }
+ DFSTestUtil.waitForXattrRemoved("/parent", XATTR_SATISFY_STORAGE_POLICY,
+ namesystem, 10000);
+ } finally {
+ clusterShutdown();
+ }
+
+ }
+
+ /**
+ * Test SPS xAttr on file. xAttr should be removed from the file
+ * once all the blocks moved to specific storage.
+ */
+ @Test(timeout = 300000)
+ public void testSPSxAttrWhenSpsCalledForFile() throws Exception {
+ try {
+ clusterSetUp();
+ Path file = new Path("/file");
+ DFSTestUtil.createFile(fs, file, 1024, (short) 3, 0);
+
+ // Set storage policy for file
+ fs.setStoragePolicy(file, "COLD");
+
+ // Stop one DN so we can check the SPS xAttr for file.
+ DataNodeProperties stopDataNode = cluster.stopDataNode(0);
+
+ fs.satisfyStoragePolicy(file);
+
+ // Check xAttr for parent directory
+ FSNamesystem namesystem = cluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode("/file");
+ XAttrFeature f = inode.getXAttrFeature();
+ assertTrue("SPS xAttr should be exist",
+ f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null);
+
+ cluster.restartDataNode(stopDataNode, false);
+
+ // wait and check all the file block moved in ARCHIVE
+ DFSTestUtil.waitExpectedStorageType("/file", StorageType.ARCHIVE, 3,
+ 30000, cluster.getFileSystem());
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+ return !existingXAttrs.contains(XATTR_SATISFY_STORAGE_POLICY);
+ }
+ }, 100, 10000);
+ } finally {
+ clusterShutdown();
+ }
+ }
+
+ /**
* Restart the hole env and trigger the DataNode's heart beats.
* @throws Exception
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4bf3591/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 2536834..3375590 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -119,8 +119,6 @@ public class TestStoragePolicySatisfier {
private void doTestWhenStoragePolicySetToCOLD() throws Exception {
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD);
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
@@ -129,7 +127,7 @@ public class TestStoragePolicySatisfier {
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details
@@ -144,8 +142,6 @@ public class TestStoragePolicySatisfier {
createCluster();
// Change policy to ALL_SSD
dfs.setStoragePolicy(new Path(file), "ALL_SSD");
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK},
@@ -156,7 +152,7 @@ public class TestStoragePolicySatisfier {
// datanodes.
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
@@ -174,8 +170,6 @@ public class TestStoragePolicySatisfier {
createCluster();
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD);
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
@@ -184,7 +178,7 @@ public class TestStoragePolicySatisfier {
// datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
@@ -207,8 +201,6 @@ public class TestStoragePolicySatisfier {
createCluster();
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD);
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
@@ -217,7 +209,7 @@ public class TestStoragePolicySatisfier {
// datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
// Wait till the block is moved to SSD areas
@@ -250,13 +242,10 @@ public class TestStoragePolicySatisfier {
files.add(file1);
writeContent(file1);
}
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- List<Long> blockCollectionIds = new ArrayList<>();
// Change policy to ONE_SSD
for (String fileName : files) {
dfs.setStoragePolicy(new Path(fileName), ONE_SSD);
- INode inode = namesystem.getFSDirectory().getINode(fileName);
- blockCollectionIds.add(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(fileName));
}
StorageType[][] newtypes =
@@ -266,9 +255,6 @@ public class TestStoragePolicySatisfier {
// datanodes.
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
- for (long inodeId : blockCollectionIds) {
- namesystem.getBlockManager().satisfyStoragePolicy(inodeId);
- }
hdfsCluster.triggerHeartbeats();
for (String fileName : files) {
@@ -279,7 +265,7 @@ public class TestStoragePolicySatisfier {
fileName, StorageType.DISK, 2, 30000, dfs);
}
- waitForBlocksMovementResult(blockCollectionIds.size(), 30000);
+ waitForBlocksMovementResult(files.size(), 30000);
} finally {
shutdownCluster();
}
@@ -441,8 +427,6 @@ public class TestStoragePolicySatisfier {
createCluster();
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD);
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
@@ -451,7 +435,7 @@ public class TestStoragePolicySatisfier {
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area.
@@ -486,8 +470,6 @@ public class TestStoragePolicySatisfier {
createCluster();
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD);
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
@@ -495,7 +477,7 @@ public class TestStoragePolicySatisfier {
startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
storagesPerDatanode, capacity, hdfsCluster);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
// No block movement will be scheduled as there is no target node
@@ -600,47 +582,51 @@ public class TestStoragePolicySatisfier {
*/
@Test(timeout = 120000)
public void testMoveWithBlockPinning() throws Exception {
- config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
- config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
- true);
- hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3)
- .storageTypes(
- new StorageType[][] {{StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK}})
- .build();
-
- hdfsCluster.waitActive();
- dfs = hdfsCluster.getFileSystem();
+ try{
+ config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+ config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3)
+ .storageTypes(
+ new StorageType[][] {{StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK}})
+ .build();
- // create a file with replication factor 3 and mark 2 pinned block
- // locations.
- final String file1 = createFileAndSimulateFavoredNodes(2);
+ hdfsCluster.waitActive();
+ dfs = hdfsCluster.getFileSystem();
- // Change policy to COLD
- dfs.setStoragePolicy(new Path(file1), COLD);
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file1);
+ // create a file with replication factor 3 and mark 2 pinned block
+ // locations.
+ final String file1 = createFileAndSimulateFavoredNodes(2);
- StorageType[][] newtypes =
- new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
- {StorageType.ARCHIVE, StorageType.ARCHIVE},
- {StorageType.ARCHIVE, StorageType.ARCHIVE}};
- // Adding DISK based datanodes
- startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
+ // Change policy to COLD
+ dfs.setStoragePolicy(new Path(file1), COLD);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
- hdfsCluster.triggerHeartbeats();
+ StorageType[][] newtypes =
+ new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}};
+ // Adding DISK based datanodes
+ startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
+ storagesPerDatanode, capacity, hdfsCluster);
- // No block movement will be scheduled as there is no target node available
- // with the required storage type.
- waitForAttemptedItems(1, 30000);
- waitForBlocksMovementResult(1, 30000);
- DFSTestUtil.waitExpectedStorageType(
- file1, StorageType.ARCHIVE, 1, 30000, dfs);
- DFSTestUtil.waitExpectedStorageType(
- file1, StorageType.DISK, 2, 30000, dfs);
+ dfs.satisfyStoragePolicy(new Path(file1));
+ hdfsCluster.triggerHeartbeats();
+
+ // No block movement will be scheduled as there is no target node
+ // available with the required storage type.
+ waitForAttemptedItems(1, 30000);
+ waitForBlocksMovementResult(1, 30000);
+ DFSTestUtil.waitExpectedStorageType(
+ file1, StorageType.ARCHIVE, 1, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(
+ file1, StorageType.DISK, 2, 30000, dfs);
+ } finally {
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ }
+ }
}
/**
@@ -682,10 +668,8 @@ public class TestStoragePolicySatisfier {
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), COLD);
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area.
@@ -723,10 +707,8 @@ public class TestStoragePolicySatisfier {
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD);
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType(
file, StorageType.SSD, 1, 30000, dfs);
@@ -764,10 +746,7 @@ public class TestStoragePolicySatisfier {
// Change policy to WARM
dfs.setStoragePolicy(new Path(file), "WARM");
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
-
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType(
@@ -848,8 +827,6 @@ public class TestStoragePolicySatisfier {
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), ONE_SSD);
- FSNamesystem namesystem = hdfsCluster.getNamesystem();
- INode inode = namesystem.getFSDirectory().getINode(file);
Path filePath = new Path("/testChooseInSameDatanode");
final FSDataOutputStream out =
dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
@@ -872,7 +849,7 @@ public class TestStoragePolicySatisfier {
for (DataNode dataNode : dataNodes) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
}
- namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ dfs.satisfyStoragePolicy(new Path(file));
// Wait for items to be processed
waitForAttemptedItems(1, 30000);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org