You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/08/27 07:16:17 UTC

[50/50] [abbrv] hadoop git commit: HDFS-12225: [SPS]: Optimize extended attributes for tracking SPS movements. Contributed by Surendra Singh Lilhore.

HDFS-12225: [SPS]: Optimize extended attributes for tracking SPS movements. Contributed by Surendra Singh Lilhore.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4a2c50b0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4a2c50b0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4a2c50b0

Branch: refs/heads/HDFS-10285
Commit: 4a2c50b09880d3b06e96d0d21b63cff838b2b0fa
Parents: f570493
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Wed Aug 23 15:37:03 2017 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Sun Aug 27 12:03:44 2017 +0530

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    |  21 +-
 .../server/blockmanagement/DatanodeManager.java |  14 +-
 .../hdfs/server/datanode/BPOfferService.java    |   1 +
 .../BlockStorageMovementAttemptedItems.java     |  95 +++++---
 .../namenode/BlockStorageMovementNeeded.java    | 233 ++++++++++++++++++-
 .../namenode/FSDirSatisfyStoragePolicyOp.java   |  91 +++-----
 .../hdfs/server/namenode/FSDirXAttrOp.java      |  11 +-
 .../hdfs/server/namenode/FSDirectory.java       |   2 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   2 +-
 .../server/namenode/StoragePolicySatisfier.java | 108 ++++++---
 .../TestStoragePolicySatisfyWorker.java         |   5 +-
 .../TestBlockStorageMovementAttemptedItems.java |  34 +--
 .../TestPersistentStoragePolicySatisfier.java   | 104 +++++++++
 .../namenode/TestStoragePolicySatisfier.java    | 127 +++++-----
 14 files changed, 589 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 3b185cd..419ca80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -89,7 +89,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementNeeded;
 import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@@ -421,9 +420,6 @@ public class BlockManager implements BlockStatsMXBean {
   private final StoragePolicySatisfier sps;
   private final boolean storagePolicyEnabled;
   private boolean spsEnabled;
-  private final BlockStorageMovementNeeded storageMovementNeeded =
-      new BlockStorageMovementNeeded();
-
   /** Minimum live replicas needed for the datanode to be transitioned
    * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
    */
@@ -466,8 +462,7 @@ public class BlockManager implements BlockStatsMXBean {
         conf.getBoolean(
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
-    sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this,
-        conf);
+    sps = new StoragePolicySatisfier(namesystem, this, conf);
     blockTokenSecretManager = createBlockTokenSecretManager(conf);
 
     this.maxCorruptFilesReturned = conf.getInt(
@@ -4869,20 +4864,6 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * Set file block collection for which storage movement needed for its blocks.
-   *
-   * @param id
-   *          - file block collection id.
-   */
-  public void satisfyStoragePolicy(long id) {
-    storageMovementNeeded.add(id);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Added block collection id {} to block "
-          + "storageMovementNeeded queue", id);
-    }
-  }
-
-  /**
    * Gets the storage policy satisfier instance.
    *
    * @return sps

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index a298843..3504cb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1749,6 +1749,13 @@ public class DatanodeManager {
       }
     }
 
+    if (nodeinfo.shouldDropSPSWork()) {
+      cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
+      // Set back to false to indicate that the new value has been sent to the
+      // datanode.
+      nodeinfo.setDropSPSWork(false);
+    }
+
     // check pending block storage movement tasks
     BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo
         .getBlocksToMoveStorages();
@@ -1760,13 +1767,6 @@ public class DatanodeManager {
           blkStorageMovementInfosBatch.getBlockMovingInfo()));
     }
 
-    if (nodeinfo.shouldDropSPSWork()) {
-      cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
-      // Set back to false to indicate that the new value has been sent to the
-      // datanode.
-      nodeinfo.setDropSPSWork(false);
-    }
-
     if (!cmds.isEmpty()) {
       return cmds.toArray(new DatanodeCommand[cmds.size()]);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 39ff4b9..d60fb6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -823,6 +823,7 @@ class BPOfferService {
     case DatanodeProtocol.DNA_UNCACHE:
     case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
     case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
+    case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
       LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
       break;
     default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 37833e2..278b62b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
 import org.apache.hadoop.util.Daemon;
@@ -54,7 +55,7 @@ public class BlockStorageMovementAttemptedItems {
    * A map holds the items which are already taken for blocks movements
    * processing and sent to DNs.
    */
-  private final Map<Long, ItemInfo> storageMovementAttemptedItems;
+  private final Map<Long, AttemptedItemInfo> storageMovementAttemptedItems;
   private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
@@ -91,18 +92,19 @@ public class BlockStorageMovementAttemptedItems {
    * Add item to block storage movement attempted items map which holds the
    * tracking/blockCollection id versus time stamp.
    *
-   * @param blockCollectionID
-   *          - tracking id / block collection id
+   * @param itemInfo
+   *          - tracking info
    * @param allBlockLocsAttemptedToSatisfy
-   *          - failed to find matching target nodes to satisfy storage type for
-   *          all the block locations of the given blockCollectionID
+   *          - failed to find matching target nodes to satisfy storage type
+   *          for all the block locations of the given blockCollectionID
    */
-  public void add(Long blockCollectionID,
-      boolean allBlockLocsAttemptedToSatisfy) {
+  public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
     synchronized (storageMovementAttemptedItems) {
-      ItemInfo itemInfo = new ItemInfo(monotonicNow(),
+      AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
+          itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(),
           allBlockLocsAttemptedToSatisfy);
-      storageMovementAttemptedItems.put(blockCollectionID, itemInfo);
+      storageMovementAttemptedItems.put(itemInfo.getTrackId(),
+          attemptedItemInfo);
     }
   }
 
@@ -167,21 +169,27 @@ public class BlockStorageMovementAttemptedItems {
    * satisfy storage policy. This is used by
    * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
    */
-  private final static class ItemInfo {
+  private final static class AttemptedItemInfo extends ItemInfo {
     private long lastAttemptedOrReportedTime;
     private final boolean allBlockLocsAttemptedToSatisfy;
 
     /**
-     * ItemInfo constructor.
+     * AttemptedItemInfo constructor.
      *
+     * @param rootId
+     *          rootId for trackId
+     * @param trackId
+     *          trackId for file.
      * @param lastAttemptedOrReportedTime
      *          last attempted or reported time
      * @param allBlockLocsAttemptedToSatisfy
      *          whether all the blocks in the trackID were attempted and blocks
      *          movement has been scheduled to satisfy storage policy
      */
-    private ItemInfo(long lastAttemptedOrReportedTime,
+    private AttemptedItemInfo(long rootId, long trackId,
+        long lastAttemptedOrReportedTime,
         boolean allBlockLocsAttemptedToSatisfy) {
+      super(rootId, trackId);
       this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
       this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
     }
@@ -211,6 +219,7 @@ public class BlockStorageMovementAttemptedItems {
     private void touchLastReportedTimeStamp() {
       this.lastAttemptedOrReportedTime = monotonicNow();
     }
+
   }
 
   /**
@@ -239,18 +248,20 @@ public class BlockStorageMovementAttemptedItems {
   @VisibleForTesting
   void blocksStorageMovementUnReportedItemsCheck() {
     synchronized (storageMovementAttemptedItems) {
-      Iterator<Entry<Long, ItemInfo>> iter = storageMovementAttemptedItems
-          .entrySet().iterator();
+      Iterator<Entry<Long, AttemptedItemInfo>> iter =
+          storageMovementAttemptedItems.entrySet().iterator();
       long now = monotonicNow();
       while (iter.hasNext()) {
-        Entry<Long, ItemInfo> entry = iter.next();
-        ItemInfo itemInfo = entry.getValue();
+        Entry<Long, AttemptedItemInfo> entry = iter.next();
+        AttemptedItemInfo itemInfo = entry.getValue();
         if (now > itemInfo.getLastAttemptedOrReportedTime()
             + selfRetryTimeout) {
           Long blockCollectionID = entry.getKey();
           synchronized (storageMovementAttemptedResults) {
             if (!isExistInResult(blockCollectionID)) {
-              blockStorageMovementNeeded.add(blockCollectionID);
+              ItemInfo candidate = new ItemInfo(
+                  itemInfo.getRootId(), blockCollectionID);
+              blockStorageMovementNeeded.add(candidate);
               iter.remove();
               LOG.info("TrackID: {} becomes timed out and moved to needed "
                   + "retries queue for next iteration.", blockCollectionID);
@@ -297,17 +308,30 @@ public class BlockStorageMovementAttemptedItems {
         synchronized (storageMovementAttemptedItems) {
           Status status = storageMovementAttemptedResult.getStatus();
           long trackId = storageMovementAttemptedResult.getTrackId();
-          ItemInfo itemInfo;
+          AttemptedItemInfo attemptedItemInfo = storageMovementAttemptedItems
+              .get(trackId);
+          // itemInfo is null means no root for trackId, using trackId only as
+          // root and handling it in
+          // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
+          // the xAttr
+          ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
+              ? attemptedItemInfo.getRootId() : trackId, trackId);
           switch (status) {
           case FAILURE:
-            blockStorageMovementNeeded.add(trackId);
-            LOG.warn("Blocks storage movement results for the tracking id: {}"
-                + " is reported from co-ordinating datanode, but result"
-                + " status is FAILURE. So, added for retry", trackId);
+            if (attemptedItemInfo != null) {
+              blockStorageMovementNeeded.add(itemInfo);
+              LOG.warn("Blocks storage movement results for the tracking id:"
+                  + "{} is reported from co-ordinating datanode, but result"
+                  + " status is FAILURE. So, added for retry", trackId);
+            } else {
+              LOG.info("Blocks storage movement is FAILURE for the track"
+                  + " id {}. But the trackID doesn't exists in"
+                  + " storageMovementAttemptedItems list.", trackId);
+              blockStorageMovementNeeded
+                  .removeItemTrackInfo(itemInfo);
+            }
             break;
           case SUCCESS:
-            itemInfo = storageMovementAttemptedItems.get(trackId);
-
             // ItemInfo could be null. One case is, before the blocks movements
             // result arrives the attempted trackID became timed out and then
             // removed the trackID from the storageMovementAttemptedItems list.
@@ -318,33 +342,32 @@ public class BlockStorageMovementAttemptedItems {
             // storage policy in previous SPS iteration.
             String msg = "Blocks storage movement is SUCCESS for the track id: "
                 + trackId + " reported from co-ordinating datanode.";
-            if (itemInfo != null) {
-              if (!itemInfo.isAllBlockLocsAttemptedToSatisfy()) {
-                blockStorageMovementNeeded.add(trackId);
+            if (attemptedItemInfo != null) {
+              if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
+                blockStorageMovementNeeded
+                    .add(new ItemInfo(attemptedItemInfo.getRootId(), trackId));
                 LOG.warn("{} But adding trackID back to retry queue as some of"
                     + " the blocks couldn't find matching target nodes in"
                     + " previous SPS iteration.", msg);
               } else {
                 LOG.info(msg);
-                // Remove xattr for the track id.
-                this.sps.postBlkStorageMovementCleanup(
-                    storageMovementAttemptedResult.getTrackId());
+                blockStorageMovementNeeded
+                    .removeItemTrackInfo(itemInfo);
               }
             } else {
               LOG.info("{} But the trackID doesn't exists in "
                   + "storageMovementAttemptedItems list", msg);
-              // Remove xattr for the track id.
-              this.sps.postBlkStorageMovementCleanup(
-                  storageMovementAttemptedResult.getTrackId());
+              blockStorageMovementNeeded
+              .removeItemTrackInfo(itemInfo);
             }
             break;
           case IN_PROGRESS:
             isInprogress = true;
-            itemInfo = storageMovementAttemptedItems
+            attemptedItemInfo = storageMovementAttemptedItems
                 .get(storageMovementAttemptedResult.getTrackId());
-            if(itemInfo != null){
+            if(attemptedItemInfo != null){
               // update the attempted expiration time to next cycle.
-              itemInfo.touchLastReportedTimeStamp();
+              attemptedItemInfo.touchLastReportedTimeStamp();
             }
             break;
           default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
index 3241e6d..41a3a6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
@@ -17,28 +17,86 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * A Class to track the block collection IDs for which physical storage movement
- * needed as per the Namespace and StorageReports from DN.
+ * A Class to track the block collection IDs (Inode's ID) for which physical
+ * storage movement needed as per the Namespace and StorageReports from DN.
+ * It scan the pending directories for which storage movement is required and
+ * schedule the block collection IDs for movement. It track the info of
+ * scheduled items and remove the SPS xAttr from the file/Directory once
+ * movement is success.
  */
 @InterfaceAudience.Private
 public class BlockStorageMovementNeeded {
-  private final Queue<Long> storageMovementNeeded = new LinkedList<Long>();
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
+
+  private final Queue<ItemInfo> storageMovementNeeded =
+      new LinkedList<ItemInfo>();
 
   /**
-   * Add the block collection id to tracking list for which storage movement
+   * Map of rootId and number of child's. Number of child's indicate the number
+   * of files pending to satisfy the policy.
+   */
+  private final Map<Long, Integer> pendingWorkForDirectory =
+      new HashMap<Long, Integer>();
+
+  private final Namesystem namesystem;
+
+  // List of pending dir to satisfy the policy
+  private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
+
+  private final StoragePolicySatisfier sps;
+
+  private Daemon fileInodeIdCollector;
+
+  public BlockStorageMovementNeeded(Namesystem namesystem,
+      StoragePolicySatisfier sps) {
+    this.namesystem = namesystem;
+    this.sps = sps;
+  }
+
+  /**
+   * Add the candidate to tracking list for which storage movement
    * expected if necessary.
    *
-   * @param blockCollectionID
-   *          - block collection id, which is nothing but inode id.
+   * @param trackInfo
+   *          - track info for satisfy the policy
    */
-  public synchronized void add(Long blockCollectionID) {
-    storageMovementNeeded.add(blockCollectionID);
+  public synchronized void add(ItemInfo trackInfo) {
+    storageMovementNeeded.add(trackInfo);
+  }
+
+  /**
+   * Add the itemInfo to tracking list for which storage movement
+   * expected if necessary.
+   * @param rootId
+   *            - root inode id
+   * @param itemInfoList
+   *            - List of child in the directory
+   */
+  private synchronized void addAll(Long rootId,
+      List<ItemInfo> itemInfoList) {
+    storageMovementNeeded.addAll(itemInfoList);
+    pendingWorkForDirectory.put(rootId, itemInfoList.size());
   }
 
   /**
@@ -47,11 +105,168 @@ public class BlockStorageMovementNeeded {
    *
    * @return block collection ID
    */
-  public synchronized Long get() {
+  public synchronized ItemInfo get() {
     return storageMovementNeeded.poll();
   }
 
+  public synchronized void addToPendingDirQueue(long id) {
+    spsDirsToBeTraveresed.add(id);
+    // Notify waiting FileInodeIdCollector thread about the newly
+    // added SPS path.
+    synchronized (spsDirsToBeTraveresed) {
+      spsDirsToBeTraveresed.notify();
+    }
+  }
+
   public synchronized void clearAll() {
+    spsDirsToBeTraveresed.clear();
     storageMovementNeeded.clear();
+    pendingWorkForDirectory.clear();
+  }
+
+  /**
+   * Decrease the pending child count for directory once one file blocks moved
+   * successfully. Remove the SPS xAttr if pending child count is zero.
+   */
+  public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
+      throws IOException {
+    if (trackInfo.isDir()) {
+      // If track is part of some root then reduce the pending directory work
+      // count.
+      long rootId = trackInfo.getRootId();
+      INode inode = namesystem.getFSDirectory().getInode(rootId);
+      if (inode == null) {
+        // directory deleted just remove it.
+        this.pendingWorkForDirectory.remove(rootId);
+      } else {
+        if (pendingWorkForDirectory.get(rootId) != null) {
+          Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1;
+          pendingWorkForDirectory.put(rootId, pendingWork);
+          if (pendingWork <= 0) {
+            namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY);
+            pendingWorkForDirectory.remove(rootId);
+          }
+        }
+      }
+    } else {
+      // Remove xAttr if trackID doesn't exist in
+      // storageMovementAttemptedItems or file policy satisfied.
+      namesystem.removeXattr(trackInfo.getTrackId(),
+          XATTR_SATISFY_STORAGE_POLICY);
+    }
+  }
+
+  public synchronized void clearQueue(long trackId) {
+    spsDirsToBeTraveresed.remove(trackId);
+    Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
+    while (iterator.hasNext()) {
+      ItemInfo next = iterator.next();
+      if (next.getRootId() == trackId) {
+        iterator.remove();
+      }
+    }
+    pendingWorkForDirectory.remove(trackId);
+  }
+
+  /**
+   * Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
+   * and notify to clean up required resources.
+   * @throws IOException
+   */
+  public synchronized void clearQueuesWithNotification() {
+    // Remove xAttr from directories
+    Long trackId;
+    while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
+      try {
+        // Remove xAttr for file
+        namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
+      } catch (IOException ie) {
+        LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
+      }
+    }
+
+    // File's directly added to storageMovementNeeded, So try to remove
+    // xAttr for file
+    ItemInfo itemInfo;
+    while ((itemInfo = storageMovementNeeded.poll()) != null) {
+      try {
+        // Remove xAttr for file
+        if (!itemInfo.isDir()) {
+          namesystem.removeXattr(itemInfo.getTrackId(),
+              XATTR_SATISFY_STORAGE_POLICY);
+        }
+      } catch (IOException ie) {
+        LOG.warn(
+            "Failed to remove SPS xattr for track id "
+                + itemInfo.getTrackId(), ie);
+      }
+    }
+    this.clearAll();
+  }
+
+  /**
+   * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
+   * ID's to process for satisfy the policy.
+   */
+  private class FileInodeIdCollector implements Runnable {
+    @Override
+    public void run() {
+      LOG.info("Starting FileInodeIdCollector!.");
+      while (namesystem.isRunning() && sps.isRunning()) {
+        try {
+          if (!namesystem.isInSafeMode()) {
+            FSDirectory fsd = namesystem.getFSDirectory();
+            Long rootINodeId = spsDirsToBeTraveresed.poll();
+            if (rootINodeId == null) {
+              // Waiting for SPS path
+              synchronized (spsDirsToBeTraveresed) {
+                spsDirsToBeTraveresed.wait(5000);
+              }
+            } else {
+              INode rootInode = fsd.getInode(rootINodeId);
+              if (rootInode != null) {
+                // TODO : HDFS-12291
+                // 1. Implement an efficient recursive directory iteration
+                // mechanism and satisfies storage policy for all the files
+                // under the given directory.
+                // 2. Process files in batches,so datanodes workload can be
+                // handled.
+                List<ItemInfo> itemInfoList =
+                    new ArrayList<>();
+                for (INode childInode : rootInode.asDirectory()
+                    .getChildrenList(Snapshot.CURRENT_STATE_ID)) {
+                  if (childInode.isFile()
+                      && childInode.asFile().numBlocks() != 0) {
+                    itemInfoList.add(
+                        new ItemInfo(rootINodeId, childInode.getId()));
+                  }
+                }
+                if (itemInfoList.isEmpty()) {
+                  // satisfy track info is empty, so remove the xAttr from the
+                  // directory
+                  namesystem.removeXattr(rootINodeId,
+                      XATTR_SATISFY_STORAGE_POLICY);
+                }
+                addAll(rootINodeId, itemInfoList);
+              }
+            }
+          }
+        } catch (Throwable t) {
+          LOG.warn("Exception while loading inodes to satisfy the policy", t);
+        }
+      }
+    }
+  }
+
+  public void start() {
+    fileInodeIdCollector = new Daemon(new FileInodeIdCollector());
+    fileInodeIdCollector.setName("FileInodeIdCollector");
+    fileInodeIdCollector.start();
+  }
+
+  public void stop() {
+    if (fileInodeIdCollector != null) {
+      fileInodeIdCollector.interrupt();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
index bd4e5ed..fb6eec9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -31,6 +30,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.collect.Lists;
 
@@ -60,10 +60,24 @@ final class FSDirSatisfyStoragePolicyOp {
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
       }
-      XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd);
-      if (satisfyXAttr != null) {
+      INode inode = FSDirectory.resolveLastINode(iip);
+      if (inodeHasSatisfyXAttr(inode)) {
+        throw new IOException(
+            "Cannot request to call satisfy storage policy on path "
+                + inode.getFullPathName()
+                + ", as this file/dir was already called for satisfying "
+                + "storage policy.");
+      }
+      if (unprotectedSatisfyStoragePolicy(inode, fsd)) {
+        XAttr satisfyXAttr = XAttrHelper
+            .buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
         List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
         xAttrs.add(satisfyXAttr);
+        List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+        List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
+            xAttrs, EnumSet.of(XAttrSetFlag.CREATE));
+        XAttrStorage.updateINodeXAttrs(inode, newXAttrs,
+            iip.getLatestSnapshotId());
         fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
       }
     } finally {
@@ -72,62 +86,29 @@ final class FSDirSatisfyStoragePolicyOp {
     return fsd.getAuditFileInfo(iip);
   }
 
-  static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip,
-      BlockManager bm, FSDirectory fsd) throws IOException {
-
-    final INode inode = FSDirectory.resolveLastINode(iip);
-    final int snapshotId = iip.getLatestSnapshotId();
-    final List<INode> candidateNodes = new ArrayList<>();
-
-    // TODO: think about optimization here, label the dir instead
-    // of the sub-files of the dir.
+  static boolean unprotectedSatisfyStoragePolicy(INode inode, FSDirectory fsd) {
     if (inode.isFile() && inode.asFile().numBlocks() != 0) {
-      candidateNodes.add(inode);
-    } else if (inode.isDirectory()) {
-      for (INode node : inode.asDirectory().getChildrenList(snapshotId)) {
-        if (node.isFile() && node.asFile().numBlocks() != 0) {
-          candidateNodes.add(node);
-        }
-      }
-    }
-
-    if (candidateNodes.isEmpty()) {
-      return null;
+      // Adding directly in the storageMovementNeeded queue, So it can
+      // get more priority compare to directory.
+      fsd.getBlockManager().getStoragePolicySatisfier()
+          .satisfyStoragePolicy(inode.getId());
+      return true;
+    } else if (inode.isDirectory()
+        && inode.asDirectory().getChildrenNum(Snapshot.CURRENT_STATE_ID) > 0) {
+      // Adding directory in the pending queue, so FileInodeIdCollector process
+      // directory child in batch and recursively
+      fsd.getBlockManager().getStoragePolicySatisfier()
+          .addInodeToPendingDirQueue(inode.getId());
+      return true;
     }
-    // If node has satisfy xattr, then stop adding it
-    // to satisfy movement queue.
-    if (inodeHasSatisfyXAttr(candidateNodes)) {
-      throw new IOException(
-          "Cannot request to call satisfy storage policy on path "
-              + iip.getPath()
-              + ", as this file/dir was already called for satisfying "
-              + "storage policy.");
-    }
-
-    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
-    final XAttr satisfyXAttr = XAttrHelper
-        .buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
-    xattrs.add(satisfyXAttr);
-
-    for (INode node : candidateNodes) {
-      bm.satisfyStoragePolicy(node.getId());
-      List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(node);
-      List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
-          xattrs, EnumSet.of(XAttrSetFlag.CREATE));
-      XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId);
-    }
-    return satisfyXAttr;
+    return false;
   }
 
-  private static boolean inodeHasSatisfyXAttr(List<INode> candidateNodes) {
-    // If the node is a directory and one of the child files
-    // has satisfy xattr, then return true for this directory.
-    for (INode inode : candidateNodes) {
-      final XAttrFeature f = inode.getXAttrFeature();
-      if (inode.isFile() && f != null
-          && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
-        return true;
-      }
+  private static boolean inodeHasSatisfyXAttr(INode inode) {
+    final XAttrFeature f = inode.getXAttrFeature();
+    if (inode.isFile() && f != null
+        && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
+      return true;
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index b73b1ab..e964a23 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -196,6 +196,14 @@ class FSDirXAttrOp {
     List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
                                               removedXAttrs);
     if (existingXAttrs.size() != newXAttrs.size()) {
+      for (XAttr xattr : toRemove) {
+        if (XATTR_SATISFY_STORAGE_POLICY
+            .equals(XAttrHelper.getPrefixedName(xattr))) {
+          fsd.getBlockManager().getStoragePolicySatisfier()
+              .clearQueue(inode.getId());
+          break;
+        }
+      }
       XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
       return removedXAttrs;
     }
@@ -287,8 +295,7 @@ class FSDirXAttrOp {
 
       // Add inode id to movement queue if xattrs contain satisfy xattr.
       if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) {
-        FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(iip,
-            fsd.getBlockManager(), fsd);
+        FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(inode, fsd);
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index b8d90f9..648232e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1359,7 +1359,7 @@ public class FSDirectory implements Closeable {
     if (xattr == null) {
       return;
     }
-    getBlockManager().satisfyStoragePolicy(inode.getId());
+    FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(inode, this);
   }
 
   private void addEncryptionZone(INodeWithAdditionalFields inode,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 57bfa2f..2d05090 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1306,7 +1306,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     writeLock();
     try {
       if (blockManager != null) {
-        blockManager.stopSPS(true);
+        blockManager.stopSPS(false);
       }
       stopSecretManager();
       leaseManager.stopMonitor();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 3165813..48d0598 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -106,10 +103,10 @@ public class StoragePolicySatisfier implements Runnable {
   }
 
   public StoragePolicySatisfier(final Namesystem namesystem,
-      final BlockStorageMovementNeeded storageMovementNeeded,
       final BlockManager blkManager, Configuration conf) {
     this.namesystem = namesystem;
-    this.storageMovementNeeded = storageMovementNeeded;
+    this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
+        this);
     this.blockManager = blkManager;
     this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
         conf.getLong(
@@ -146,7 +143,7 @@ public class StoragePolicySatisfier implements Runnable {
     // Ensure that all the previously submitted block movements(if any) have to
     // be stopped in all datanodes.
     addDropSPSWorkCommandsToAllDNs();
-
+    storageMovementNeeded.start();
     storagePolicySatisfierThread = new Daemon(this);
     storagePolicySatisfierThread.setName("StoragePolicySatisfier");
     storagePolicySatisfierThread.start();
@@ -162,14 +159,17 @@ public class StoragePolicySatisfier implements Runnable {
    */
   public synchronized void disable(boolean forceStop) {
     isRunning = false;
+
     if (storagePolicySatisfierThread == null) {
       return;
     }
 
+    storageMovementNeeded.stop();
+
     storagePolicySatisfierThread.interrupt();
     this.storageMovementsMonitor.stop();
     if (forceStop) {
-      this.clearQueuesWithNotification();
+      storageMovementNeeded.clearQueuesWithNotification();
       addDropSPSWorkCommandsToAllDNs();
     } else {
       LOG.info("Stopping StoragePolicySatisfier.");
@@ -184,6 +184,7 @@ public class StoragePolicySatisfier implements Runnable {
       disable(true);
     }
     this.storageMovementsMonitor.stopGracefully();
+
     if (storagePolicySatisfierThread == null) {
       return;
     }
@@ -220,10 +221,11 @@ public class StoragePolicySatisfier implements Runnable {
     while (namesystem.isRunning() && isRunning) {
       try {
         if (!namesystem.isInSafeMode()) {
-          Long blockCollectionID = storageMovementNeeded.get();
-          if (blockCollectionID != null) {
+          ItemInfo itemInfo = storageMovementNeeded.get();
+          if (itemInfo != null) {
+            long trackId = itemInfo.getTrackId();
             BlockCollection blockCollection =
-                namesystem.getBlockCollection(blockCollectionID);
+                namesystem.getBlockCollection(trackId);
             // Check blockCollectionId existence.
             if (blockCollection != null) {
               BlocksMovingAnalysisStatus status =
@@ -234,21 +236,21 @@ public class StoragePolicySatisfier implements Runnable {
                 // Just add to monitor, so it will be tracked for result and
                 // be removed on successful storage movement result.
               case ALL_BLOCKS_TARGETS_PAIRED:
-                this.storageMovementsMonitor.add(blockCollectionID, true);
+                this.storageMovementsMonitor.add(itemInfo, true);
                 break;
               // Add to monitor with allBlcoksAttemptedToSatisfy flag false, so
               // that it will be tracked and still it will be consider for retry
               // as analysis was not found targets for storage movement blocks.
               case FEW_BLOCKS_TARGETS_PAIRED:
-                this.storageMovementsMonitor.add(blockCollectionID, false);
+                this.storageMovementsMonitor.add(itemInfo, false);
                 break;
               case FEW_LOW_REDUNDANCY_BLOCKS:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID " + blockCollectionID
+                  LOG.debug("Adding trackID " + trackId
                       + " back to retry queue as some of the blocks"
                       + " are low redundant.");
                 }
-                this.storageMovementNeeded.add(blockCollectionID);
+                this.storageMovementNeeded.add(itemInfo);
                 break;
               // Just clean Xattrs
               case BLOCKS_TARGET_PAIRING_SKIPPED:
@@ -256,9 +258,13 @@ public class StoragePolicySatisfier implements Runnable {
               default:
                 LOG.info("Block analysis skipped or blocks already satisfied"
                     + " with storages. So, Cleaning up the Xattrs.");
-                postBlkStorageMovementCleanup(blockCollectionID);
+                storageMovementNeeded.removeItemTrackInfo(itemInfo);
                 break;
               }
+            } else {
+              // File doesn't exists (maybe got deleted), remove trackId from
+              // the queue
+              storageMovementNeeded.removeItemTrackInfo(itemInfo);
             }
           }
         }
@@ -828,31 +834,63 @@ public class StoragePolicySatisfier implements Runnable {
   }
 
   /**
-   * Clean all the movements in storageMovementNeeded and notify
-   * to clean up required resources.
-   * @throws IOException
+   * Set file inode in queue for which storage movement needed for its blocks.
+   *
+   * @param inodeId
+   *          - file inode/blockcollection id.
    */
-  private void clearQueuesWithNotification() {
-    Long id;
-    while ((id = storageMovementNeeded.get()) != null) {
-      try {
-        postBlkStorageMovementCleanup(id);
-      } catch (IOException ie) {
-        LOG.warn("Failed to remove SPS "
-            + "xattr for collection id " + id, ie);
-      }
+  public void satisfyStoragePolicy(Long inodeId) {
+    //For file rootId and trackId is same
+    storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Added track info for inode {} to block "
+          + "storageMovementNeeded queue", inodeId);
     }
   }
 
+  public void addInodeToPendingDirQueue(long id) {
+    storageMovementNeeded.addToPendingDirQueue(id);
+  }
+
+  /**
+   * Clear queues for given track id.
+   */
+  public void clearQueue(long trackId) {
+    storageMovementNeeded.clearQueue(trackId);
+  }
+
   /**
-   * When block movement has been finished successfully, some additional
-   * operations should be notified, for example, SPS xattr should be
-   * removed.
-   * @param trackId track id i.e., block collection id.
-   * @throws IOException
+   * ItemInfo is a file info object for which need to satisfy the
+   * policy.
    */
-  public void postBlkStorageMovementCleanup(long trackId)
-      throws IOException {
-    this.namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
+  public static class ItemInfo {
+    private long rootId;
+    private long trackId;
+
+    public ItemInfo(long rootId, long trackId) {
+      this.rootId = rootId;
+      this.trackId = trackId;
+    }
+
+    /**
+     * Return the root of the current track Id.
+     */
+    public long getRootId() {
+      return rootId;
+    }
+
+    /**
+     * Return the File inode Id for which needs to satisfy the policy.
+     */
+    public long getTrackId() {
+      return trackId;
+    }
+
+    /**
+     * Returns true if the tracking path is a directory, false otherwise.
+     */
+    public boolean isDir() {
+      return (rootId != trackId);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index 402d4d1..b84b1d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@@ -115,9 +114,7 @@ public class TestStoragePolicySatisfyWorker {
     // move to ARCHIVE
     dfs.setStoragePolicy(new Path(file), "COLD");
 
-    FSNamesystem namesystem = cluster.getNamesystem();
-    INode inode = namesystem.getFSDirectory().getINode(file);
-    namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+    dfs.satisfyStoragePolicy(new Path(file));
 
     cluster.triggerHeartbeats();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 04a63ac..55ebf9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import static org.apache.hadoop.util.Time.monotonicNow;
 import static org.junit.Assert.*;
 
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.junit.After;
 import org.junit.Before;
@@ -38,7 +39,9 @@ public class TestBlockStorageMovementAttemptedItems {
 
   @Before
   public void setup() throws Exception {
-    unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
+    unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
+        Mockito.mock(Namesystem.class),
+        Mockito.mock(StoragePolicySatisfier.class));
     StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
     bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
         selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
@@ -57,9 +60,9 @@ public class TestBlockStorageMovementAttemptedItems {
     long stopTime = monotonicNow() + (retryTimeout * 2);
     boolean isItemFound = false;
     while (monotonicNow() < (stopTime)) {
-      Long ele = null;
+      ItemInfo ele = null;
       while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
-        if (item.longValue() == ele.longValue()) {
+        if (item == ele.getTrackId()) {
           isItemFound = true;
           break;
         }
@@ -77,7 +80,7 @@ public class TestBlockStorageMovementAttemptedItems {
   public void testAddResultWithFailureResult() throws Exception {
     bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(item, true);
+    bsmAttemptedItems.add(new ItemInfo(0L, item), true);
     bsmAttemptedItems.addResults(
         new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
             item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
@@ -88,7 +91,7 @@ public class TestBlockStorageMovementAttemptedItems {
   public void testAddResultWithSucessResult() throws Exception {
     bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(item, true);
+    bsmAttemptedItems.add(new ItemInfo(0L, item), true);
     bsmAttemptedItems.addResults(
         new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
             item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@@ -99,7 +102,7 @@ public class TestBlockStorageMovementAttemptedItems {
   public void testNoResultAdded() throws Exception {
     bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(item, true);
+    bsmAttemptedItems.add(new ItemInfo(0L, item), true);
     // After self retry timeout, it should be added back for retry
     assertTrue("Failed to add to the retry list",
         checkItemMovedForRetry(item, 600));
@@ -115,7 +118,7 @@ public class TestBlockStorageMovementAttemptedItems {
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried1() throws Exception {
     Long item = new Long(1234);
-    bsmAttemptedItems.add(item, false);
+    bsmAttemptedItems.add(new ItemInfo(0L, item), false);
     bsmAttemptedItems.addResults(
         new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
             item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@@ -136,7 +139,7 @@ public class TestBlockStorageMovementAttemptedItems {
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried2() throws Exception {
     Long item = new Long(1234);
-    bsmAttemptedItems.add(item, false);
+    bsmAttemptedItems.add(new ItemInfo(0L, item), false);
     bsmAttemptedItems.addResults(
         new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
             item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@@ -153,17 +156,20 @@ public class TestBlockStorageMovementAttemptedItems {
   }
 
   /**
-   * Partial block movement with only BlocksStorageMovementResult#FAILURE result
-   * and storageMovementAttemptedItems list is empty.
+   * Partial block movement with only BlocksStorageMovementResult#FAILURE
+   * result and storageMovementAttemptedItems list is empty.
    */
   @Test(timeout = 30000)
-  public void testPartialBlockMovementShouldBeRetried3() throws Exception {
+  public void testPartialBlockMovementWithEmptyAttemptedQueue()
+      throws Exception {
     Long item = new Long(1234);
     bsmAttemptedItems.addResults(
         new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
+            item, BlocksStorageMovementResult.Status.FAILURE)});
     bsmAttemptedItems.blockStorageMovementResultCheck();
-    assertTrue("Failed to add to the retry list",
+    assertFalse(
+        "Should not add in queue again if it is not there in"
+            + " storageMovementAttemptedItems",
         checkItemMovedForRetry(item, 5000));
     assertEquals("Failed to remove from the attempted list", 0,
         bsmAttemptedItems.getAttemptedItemsCount());
@@ -176,7 +182,7 @@ public class TestBlockStorageMovementAttemptedItems {
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried4() throws Exception {
     Long item = new Long(1234);
-    bsmAttemptedItems.add(item, false);
+    bsmAttemptedItems.add(new ItemInfo(0L, item), false);
     bsmAttemptedItems.addResults(
         new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
             item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index 8516ea0..e7b9148 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -20,16 +20,22 @@ package org.apache.hadoop.hdfs.server.namenode;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 
+import com.google.common.base.Supplier;
+
 import java.io.IOException;
+import java.util.List;
 
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 import static org.junit.Assert.*;
@@ -482,6 +488,104 @@ public class TestPersistentStoragePolicySatisfier {
   }
 
   /**
+   * Test SPS xAttr on directory. xAttr should be removed from the directory
+   * once all the files blocks moved to specific storage.
+   */
+  @Test(timeout = 300000)
+  public void testSPSxAttrWhenSpsCalledForDir() throws Exception {
+    try {
+      clusterSetUp();
+      Path parent = new Path("/parent");
+      // create parent dir
+      fs.mkdirs(parent);
+
+      // create 10 child files
+      for (int i = 0; i < 5; i++) {
+        DFSTestUtil.createFile(fs, new Path(parent, "f" + i), 1024, (short) 3,
+            0);
+      }
+
+      // Set storage policy for parent directory
+      fs.setStoragePolicy(parent, "COLD");
+
+      // Stop one DN so we can check the SPS xAttr for directory.
+      DataNodeProperties stopDataNode = cluster.stopDataNode(0);
+
+      fs.satisfyStoragePolicy(parent);
+
+      // Check xAttr for parent directory
+      FSNamesystem namesystem = cluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode("/parent");
+      XAttrFeature f = inode.getXAttrFeature();
+      assertTrue("SPS xAttr should be exist",
+          f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null);
+
+      // check for the child, SPS xAttr should not be there
+      for (int i = 0; i < 5; i++) {
+        inode = namesystem.getFSDirectory().getINode("/parent/f" + i);
+        f = inode.getXAttrFeature();
+        assertTrue(f == null);
+      }
+
+      cluster.restartDataNode(stopDataNode, false);
+
+      // wait and check all the file block moved in ARCHIVE
+      for (int i = 0; i < 5; i++) {
+        DFSTestUtil.waitExpectedStorageType("/parent/f" + i,
+            StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
+      }
+      DFSTestUtil.waitForXattrRemoved("/parent", XATTR_SATISFY_STORAGE_POLICY,
+          namesystem, 10000);
+    } finally {
+      clusterShutdown();
+    }
+
+  }
+
+  /**
+   * Test SPS xAttr on file. xAttr should be removed from the file
+   * once all the blocks moved to specific storage.
+   */
+  @Test(timeout = 300000)
+  public void testSPSxAttrWhenSpsCalledForFile() throws Exception {
+    try {
+      clusterSetUp();
+      Path file = new Path("/file");
+      DFSTestUtil.createFile(fs, file, 1024, (short) 3, 0);
+
+      // Set storage policy for file
+      fs.setStoragePolicy(file, "COLD");
+
+      // Stop one DN so we can check the SPS xAttr for file.
+      DataNodeProperties stopDataNode = cluster.stopDataNode(0);
+
+      fs.satisfyStoragePolicy(file);
+
+      // Check xAttr for parent directory
+      FSNamesystem namesystem = cluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode("/file");
+      XAttrFeature f = inode.getXAttrFeature();
+      assertTrue("SPS xAttr should be exist",
+          f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null);
+
+      cluster.restartDataNode(stopDataNode, false);
+
+      // wait and check all the file block moved in ARCHIVE
+      DFSTestUtil.waitExpectedStorageType("/file", StorageType.ARCHIVE, 3,
+          30000, cluster.getFileSystem());
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+          return !existingXAttrs.contains(XATTR_SATISFY_STORAGE_POLICY);
+        }
+      }, 100, 10000);
+    } finally {
+      clusterShutdown();
+    }
+  }
+
+  /**
    * Restart the hole env and trigger the DataNode's heart beats.
    * @throws Exception
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a2c50b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 2536834..3375590 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -119,8 +119,6 @@ public class TestStoragePolicySatisfier {
   private void doTestWhenStoragePolicySetToCOLD() throws Exception {
     // Change policy to COLD
     dfs.setStoragePolicy(new Path(file), COLD);
-    FSNamesystem namesystem = hdfsCluster.getNamesystem();
-    INode inode = namesystem.getFSDirectory().getINode(file);
 
     StorageType[][] newtypes =
         new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
@@ -129,7 +127,7 @@ public class TestStoragePolicySatisfier {
     startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
         storagesPerDatanode, capacity, hdfsCluster);
 
-    namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+    dfs.satisfyStoragePolicy(new Path(file));
 
     hdfsCluster.triggerHeartbeats();
     // Wait till namenode notified about the block location details
@@ -144,8 +142,6 @@ public class TestStoragePolicySatisfier {
       createCluster();
       // Change policy to ALL_SSD
       dfs.setStoragePolicy(new Path(file), "ALL_SSD");
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      INode inode = namesystem.getFSDirectory().getINode(file);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.SSD, StorageType.DISK},
@@ -156,7 +152,7 @@ public class TestStoragePolicySatisfier {
       // datanodes.
       startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
           storagesPerDatanode, capacity, hdfsCluster);
-      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      dfs.satisfyStoragePolicy(new Path(file));
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier Identified that block to move to SSD
       // areas
@@ -174,8 +170,6 @@ public class TestStoragePolicySatisfier {
       createCluster();
       // Change policy to ONE_SSD
       dfs.setStoragePolicy(new Path(file), ONE_SSD);
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      INode inode = namesystem.getFSDirectory().getINode(file);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
@@ -184,7 +178,7 @@ public class TestStoragePolicySatisfier {
       // datanodes.
       startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
           storagesPerDatanode, capacity, hdfsCluster);
-      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      dfs.satisfyStoragePolicy(new Path(file));
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier Identified that block to move to SSD
       // areas
@@ -207,8 +201,6 @@ public class TestStoragePolicySatisfier {
       createCluster();
       // Change policy to ONE_SSD
       dfs.setStoragePolicy(new Path(file), ONE_SSD);
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      INode inode = namesystem.getFSDirectory().getINode(file);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
@@ -217,7 +209,7 @@ public class TestStoragePolicySatisfier {
       // datanodes.
       startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
           storagesPerDatanode, capacity, hdfsCluster);
-      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      dfs.satisfyStoragePolicy(new Path(file));
       hdfsCluster.triggerHeartbeats();
 
       // Wait till the block is moved to SSD areas
@@ -250,13 +242,10 @@ public class TestStoragePolicySatisfier {
         files.add(file1);
         writeContent(file1);
       }
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      List<Long> blockCollectionIds = new ArrayList<>();
       // Change policy to ONE_SSD
       for (String fileName : files) {
         dfs.setStoragePolicy(new Path(fileName), ONE_SSD);
-        INode inode = namesystem.getFSDirectory().getINode(fileName);
-        blockCollectionIds.add(inode.getId());
+        dfs.satisfyStoragePolicy(new Path(fileName));
       }
 
       StorageType[][] newtypes =
@@ -266,9 +255,6 @@ public class TestStoragePolicySatisfier {
       // datanodes.
       startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
           storagesPerDatanode, capacity, hdfsCluster);
-      for (long inodeId : blockCollectionIds) {
-        namesystem.getBlockManager().satisfyStoragePolicy(inodeId);
-      }
       hdfsCluster.triggerHeartbeats();
 
       for (String fileName : files) {
@@ -279,7 +265,7 @@ public class TestStoragePolicySatisfier {
             fileName, StorageType.DISK, 2, 30000, dfs);
       }
 
-      waitForBlocksMovementResult(blockCollectionIds.size(), 30000);
+      waitForBlocksMovementResult(files.size(), 30000);
     } finally {
       shutdownCluster();
     }
@@ -441,8 +427,6 @@ public class TestStoragePolicySatisfier {
       createCluster();
       // Change policy to COLD
       dfs.setStoragePolicy(new Path(file), COLD);
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      INode inode = namesystem.getFSDirectory().getINode(file);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
@@ -451,7 +435,7 @@ public class TestStoragePolicySatisfier {
       startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
           storagesPerDatanode, capacity, hdfsCluster);
 
-      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      dfs.satisfyStoragePolicy(new Path(file));
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier identified that block to move to
       // ARCHIVE area.
@@ -486,8 +470,6 @@ public class TestStoragePolicySatisfier {
       createCluster();
       // Change policy to COLD
       dfs.setStoragePolicy(new Path(file), COLD);
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      INode inode = namesystem.getFSDirectory().getINode(file);
 
       StorageType[][] newtypes =
           new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
@@ -495,7 +477,7 @@ public class TestStoragePolicySatisfier {
       startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
           storagesPerDatanode, capacity, hdfsCluster);
 
-      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      dfs.satisfyStoragePolicy(new Path(file));
       hdfsCluster.triggerHeartbeats();
 
       // No block movement will be scheduled as there is no target node
@@ -600,47 +582,51 @@ public class TestStoragePolicySatisfier {
    */
   @Test(timeout = 120000)
   public void testMoveWithBlockPinning() throws Exception {
-    config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
-    config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
-    hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3)
-        .storageTypes(
-            new StorageType[][] {{StorageType.DISK, StorageType.DISK},
-                {StorageType.DISK, StorageType.DISK},
-                {StorageType.DISK, StorageType.DISK}})
-        .build();
-
-    hdfsCluster.waitActive();
-    dfs = hdfsCluster.getFileSystem();
+    try{
+      config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3)
+          .storageTypes(
+              new StorageType[][] {{StorageType.DISK, StorageType.DISK},
+                  {StorageType.DISK, StorageType.DISK},
+                  {StorageType.DISK, StorageType.DISK}})
+          .build();
 
-    // create a file with replication factor 3 and mark 2 pinned block
-    // locations.
-    final String file1 = createFileAndSimulateFavoredNodes(2);
+      hdfsCluster.waitActive();
+      dfs = hdfsCluster.getFileSystem();
 
-    // Change policy to COLD
-    dfs.setStoragePolicy(new Path(file1), COLD);
-    FSNamesystem namesystem = hdfsCluster.getNamesystem();
-    INode inode = namesystem.getFSDirectory().getINode(file1);
+      // create a file with replication factor 3 and mark 2 pinned block
+      // locations.
+      final String file1 = createFileAndSimulateFavoredNodes(2);
 
-    StorageType[][] newtypes =
-        new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
-            {StorageType.ARCHIVE, StorageType.ARCHIVE},
-            {StorageType.ARCHIVE, StorageType.ARCHIVE}};
-    // Adding DISK based datanodes
-    startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
-        storagesPerDatanode, capacity, hdfsCluster);
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(file1), COLD);
 
-    namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
-    hdfsCluster.triggerHeartbeats();
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE}};
+      // Adding DISK based datanodes
+      startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
 
-    // No block movement will be scheduled as there is no target node available
-    // with the required storage type.
-    waitForAttemptedItems(1, 30000);
-    waitForBlocksMovementResult(1, 30000);
-    DFSTestUtil.waitExpectedStorageType(
-        file1, StorageType.ARCHIVE, 1, 30000, dfs);
-    DFSTestUtil.waitExpectedStorageType(
-        file1, StorageType.DISK, 2, 30000, dfs);
+      dfs.satisfyStoragePolicy(new Path(file1));
+      hdfsCluster.triggerHeartbeats();
+
+      // No block movement will be scheduled as there is no target node
+      // available with the required storage type.
+      waitForAttemptedItems(1, 30000);
+      waitForBlocksMovementResult(1, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file1, StorageType.ARCHIVE, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          file1, StorageType.DISK, 2, 30000, dfs);
+    } finally {
+      if (hdfsCluster != null) {
+        hdfsCluster.shutdown();
+      }
+    }
   }
 
   /**
@@ -682,10 +668,8 @@ public class TestStoragePolicySatisfier {
 
       // Change policy to COLD
       dfs.setStoragePolicy(new Path(file), COLD);
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      INode inode = namesystem.getFSDirectory().getINode(file);
 
-      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      dfs.satisfyStoragePolicy(new Path(file));
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier identified that block to move to
       // ARCHIVE area.
@@ -723,10 +707,8 @@ public class TestStoragePolicySatisfier {
 
       // Change policy to ONE_SSD
       dfs.setStoragePolicy(new Path(file), ONE_SSD);
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      INode inode = namesystem.getFSDirectory().getINode(file);
 
-      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      dfs.satisfyStoragePolicy(new Path(file));
       hdfsCluster.triggerHeartbeats();
       DFSTestUtil.waitExpectedStorageType(
           file, StorageType.SSD, 1, 30000, dfs);
@@ -764,10 +746,7 @@ public class TestStoragePolicySatisfier {
 
       // Change policy to WARM
       dfs.setStoragePolicy(new Path(file), "WARM");
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      INode inode = namesystem.getFSDirectory().getINode(file);
-
-      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      dfs.satisfyStoragePolicy(new Path(file));
       hdfsCluster.triggerHeartbeats();
 
       DFSTestUtil.waitExpectedStorageType(
@@ -848,8 +827,6 @@ public class TestStoragePolicySatisfier {
 
       // Change policy to ONE_SSD
       dfs.setStoragePolicy(new Path(file), ONE_SSD);
-      FSNamesystem namesystem = hdfsCluster.getNamesystem();
-      INode inode = namesystem.getFSDirectory().getINode(file);
       Path filePath = new Path("/testChooseInSameDatanode");
       final FSDataOutputStream out =
           dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
@@ -872,7 +849,7 @@ public class TestStoragePolicySatisfier {
       for (DataNode dataNode : dataNodes) {
         DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
       }
-      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      dfs.satisfyStoragePolicy(new Path(file));
 
       // Wait for items to be processed
       waitForAttemptedItems(1, 30000);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org