You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/29 04:12:33 UTC

[28/50] [abbrv] hadoop git commit: HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy storage policy of all the files under the given dir. Contributed by Surendra Singh Lilhore.

HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy storage policy of all the files under the given dir. Contributed by Surendra Singh Lilhore.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e94f1deb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e94f1deb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e94f1deb

Branch: refs/heads/HDFS-10285
Commit: e94f1deb35c19c93fa8cf3dd3721860bca4b0310
Parents: 8d29166
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Sat Sep 30 06:31:52 2017 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Mon Jan 29 09:21:08 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   8 +
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  22 +-
 .../BlockStorageMovementAttemptedItems.java     |   8 +-
 .../namenode/BlockStorageMovementNeeded.java    | 277 +++++++--
 .../hdfs/server/namenode/FSTreeTraverser.java   | 313 ++++++++++
 .../server/namenode/ReencryptionHandler.java    | 618 ++++++++-----------
 .../server/namenode/ReencryptionUpdater.java    |   2 +-
 .../server/namenode/StoragePolicySatisfier.java |  43 +-
 .../src/main/resources/hdfs-default.xml         |  23 +
 .../src/site/markdown/ArchivalStorage.md        |   3 +-
 .../TestBlockStorageMovementAttemptedItems.java |   2 +-
 .../TestPersistentStoragePolicySatisfier.java   |   8 +-
 .../hdfs/server/namenode/TestReencryption.java  |   3 -
 .../namenode/TestReencryptionHandler.java       |  10 +-
 .../namenode/TestStoragePolicySatisfier.java    | 377 ++++++++++-
 15 files changed, 1260 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8c4fa69..c435739 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -602,6 +602,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.storage.policy.satisfier.enabled";
   public static final boolean DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT =
       false;
+  public static final String  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY =
+      "dfs.storage.policy.satisfier.queue.limit";
+  public static final int  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT =
+      1000;
+  public static final String DFS_SPS_WORK_MULTIPLIER_PER_ITERATION =
+      "dfs.storage.policy.satisfier.work.multiplier.per.iteration";
+  public static final int DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT =
+      1;
   public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY =
       "dfs.storage.policy.satisfier.recheck.timeout.millis";
   public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 7465853..570b85d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -1422,7 +1422,27 @@ public class DFSUtil {
         "It should be a positive, non-zero integer value.");
     return blocksReplWorkMultiplier;
   }
-  
+
+  /**
+   * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
+   * configuration.
+   *
+   * @param conf Configuration
+   * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
+   */
+  public static int getSPSWorkMultiplier(Configuration conf) {
+    int spsWorkMultiplier = conf
+        .getInt(
+            DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
+            DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
+    Preconditions.checkArgument(
+        (spsWorkMultiplier > 0),
+        DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
+        " = '" + spsWorkMultiplier + "' is invalid. " +
+        "It should be a positive, non-zero integer value.");
+    return spsWorkMultiplier;
+  }
+
   /**
    * Get SPNEGO keytab Key from configuration
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 278b62b..549819f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -101,7 +101,7 @@ public class BlockStorageMovementAttemptedItems {
   public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
     synchronized (storageMovementAttemptedItems) {
       AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
-          itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(),
+          itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(),
           allBlockLocsAttemptedToSatisfy);
       storageMovementAttemptedItems.put(itemInfo.getTrackId(),
           attemptedItemInfo);
@@ -260,7 +260,7 @@ public class BlockStorageMovementAttemptedItems {
           synchronized (storageMovementAttemptedResults) {
             if (!isExistInResult(blockCollectionID)) {
               ItemInfo candidate = new ItemInfo(
-                  itemInfo.getRootId(), blockCollectionID);
+                  itemInfo.getStartId(), blockCollectionID);
               blockStorageMovementNeeded.add(candidate);
               iter.remove();
               LOG.info("TrackID: {} becomes timed out and moved to needed "
@@ -315,7 +315,7 @@ public class BlockStorageMovementAttemptedItems {
           // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
           // the xAttr
           ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
-              ? attemptedItemInfo.getRootId() : trackId, trackId);
+              ? attemptedItemInfo.getStartId() : trackId, trackId);
           switch (status) {
           case FAILURE:
             if (attemptedItemInfo != null) {
@@ -345,7 +345,7 @@ public class BlockStorageMovementAttemptedItems {
             if (attemptedItemInfo != null) {
               if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
                 blockStorageMovementNeeded
-                    .add(new ItemInfo(attemptedItemInfo.getRootId(), trackId));
+                    .add(new ItemInfo(attemptedItemInfo.getStartId(), trackId));
                 LOG.warn("{} But adding trackID back to retry queue as some of"
                     + " the blocks couldn't find matching target nodes in"
                     + " previous SPS iteration.", msg);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
index 41a3a6c..788a98b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
@@ -29,12 +29,15 @@ import java.util.Map;
 import java.util.Queue;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
 import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A Class to track the block collection IDs (Inode's ID) for which physical
  * storage movement needed as per the Namespace and StorageReports from DN.
@@ -53,11 +56,11 @@ public class BlockStorageMovementNeeded {
       new LinkedList<ItemInfo>();
 
   /**
-   * Map of rootId and number of child's. Number of child's indicate the number
-   * of files pending to satisfy the policy.
+   * Map of startId and number of child's. Number of child's indicate the
+   * number of files pending to satisfy the policy.
    */
-  private final Map<Long, Integer> pendingWorkForDirectory =
-      new HashMap<Long, Integer>();
+  private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
+      new HashMap<Long, DirPendingWorkInfo>();
 
   private final Namesystem namesystem;
 
@@ -66,12 +69,15 @@ public class BlockStorageMovementNeeded {
 
   private final StoragePolicySatisfier sps;
 
-  private Daemon fileInodeIdCollector;
+  private Daemon inodeIdCollector;
+
+  private final int maxQueuedItem;
 
   public BlockStorageMovementNeeded(Namesystem namesystem,
-      StoragePolicySatisfier sps) {
+      StoragePolicySatisfier sps, int queueLimit) {
     this.namesystem = namesystem;
     this.sps = sps;
+    this.maxQueuedItem = queueLimit;
   }
 
   /**
@@ -88,15 +94,24 @@ public class BlockStorageMovementNeeded {
   /**
    * Add the itemInfo to tracking list for which storage movement
    * expected if necessary.
-   * @param rootId
-   *            - root inode id
+   * @param startId
+   *            - start id
    * @param itemInfoList
    *            - List of child in the directory
    */
-  private synchronized void addAll(Long rootId,
-      List<ItemInfo> itemInfoList) {
+  @VisibleForTesting
+  public synchronized void addAll(long startId,
+      List<ItemInfo> itemInfoList, boolean scanCompleted) {
     storageMovementNeeded.addAll(itemInfoList);
-    pendingWorkForDirectory.put(rootId, itemInfoList.size());
+    DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+    if (pendingWork == null) {
+      pendingWork = new DirPendingWorkInfo();
+      pendingWorkForDirectory.put(startId, pendingWork);
+    }
+    pendingWork.addPendingWorkCount(itemInfoList.size());
+    if (scanCompleted) {
+      pendingWork.markScanCompleted();
+    }
   }
 
   /**
@@ -118,6 +133,25 @@ public class BlockStorageMovementNeeded {
     }
   }
 
+  /**
+   * Returns queue remaining capacity.
+   */
+  public synchronized int remainingCapacity() {
+    int size = storageMovementNeeded.size();
+    if (size >= maxQueuedItem) {
+      return 0;
+    } else {
+      return (maxQueuedItem - size);
+    }
+  }
+
+  /**
+   * Returns queue size.
+   */
+  public synchronized int size() {
+    return storageMovementNeeded.size();
+  }
+
   public synchronized void clearAll() {
     spsDirsToBeTraveresed.clear();
     storageMovementNeeded.clear();
@@ -131,20 +165,20 @@ public class BlockStorageMovementNeeded {
   public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
       throws IOException {
     if (trackInfo.isDir()) {
-      // If track is part of some root then reduce the pending directory work
-      // count.
-      long rootId = trackInfo.getRootId();
-      INode inode = namesystem.getFSDirectory().getInode(rootId);
+      // If track is part of some start inode then reduce the pending
+      // directory work count.
+      long startId = trackInfo.getStartId();
+      INode inode = namesystem.getFSDirectory().getInode(startId);
       if (inode == null) {
         // directory deleted just remove it.
-        this.pendingWorkForDirectory.remove(rootId);
+        this.pendingWorkForDirectory.remove(startId);
       } else {
-        if (pendingWorkForDirectory.get(rootId) != null) {
-          Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1;
-          pendingWorkForDirectory.put(rootId, pendingWork);
-          if (pendingWork <= 0) {
-            namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY);
-            pendingWorkForDirectory.remove(rootId);
+        DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+        if (pendingWork != null) {
+          pendingWork.decrementPendingWorkCount();
+          if (pendingWork.isDirWorkDone()) {
+            namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
+            pendingWorkForDirectory.remove(startId);
           }
         }
       }
@@ -161,7 +195,7 @@ public class BlockStorageMovementNeeded {
     Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
     while (iterator.hasNext()) {
       ItemInfo next = iterator.next();
-      if (next.getRootId() == trackId) {
+      if (next.getStartId() == trackId) {
         iterator.remove();
       }
     }
@@ -208,7 +242,17 @@ public class BlockStorageMovementNeeded {
    * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
    * ID's to process for satisfy the policy.
    */
-  private class FileInodeIdCollector implements Runnable {
+  private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
+      implements Runnable {
+
+    private int remainingCapacity = 0;
+
+    private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
+
+    StorageMovementPendingInodeIdCollector(FSDirectory dir) {
+      super(dir);
+    }
+
     @Override
     public void run() {
       LOG.info("Starting FileInodeIdCollector!.");
@@ -216,38 +260,36 @@ public class BlockStorageMovementNeeded {
         try {
           if (!namesystem.isInSafeMode()) {
             FSDirectory fsd = namesystem.getFSDirectory();
-            Long rootINodeId = spsDirsToBeTraveresed.poll();
-            if (rootINodeId == null) {
+            Long startINodeId = spsDirsToBeTraveresed.poll();
+            if (startINodeId == null) {
               // Waiting for SPS path
               synchronized (spsDirsToBeTraveresed) {
                 spsDirsToBeTraveresed.wait(5000);
               }
             } else {
-              INode rootInode = fsd.getInode(rootINodeId);
-              if (rootInode != null) {
-                // TODO : HDFS-12291
-                // 1. Implement an efficient recursive directory iteration
-                // mechanism and satisfies storage policy for all the files
-                // under the given directory.
-                // 2. Process files in batches,so datanodes workload can be
-                // handled.
-                List<ItemInfo> itemInfoList =
-                    new ArrayList<>();
-                for (INode childInode : rootInode.asDirectory()
-                    .getChildrenList(Snapshot.CURRENT_STATE_ID)) {
-                  if (childInode.isFile()
-                      && childInode.asFile().numBlocks() != 0) {
-                    itemInfoList.add(
-                        new ItemInfo(rootINodeId, childInode.getId()));
-                  }
+              INode startInode = fsd.getInode(startINodeId);
+              if (startInode != null) {
+                try {
+                  remainingCapacity = remainingCapacity();
+                  readLock();
+                  traverseDir(startInode.asDirectory(), startINodeId,
+                      HdfsFileStatus.EMPTY_NAME,
+                      new SPSTraverseInfo(startINodeId));
+                } finally {
+                  readUnlock();
                 }
-                if (itemInfoList.isEmpty()) {
-                  // satisfy track info is empty, so remove the xAttr from the
-                  // directory
-                  namesystem.removeXattr(rootINodeId,
+                // Mark startInode traverse is done
+                addAll(startInode.getId(), currentBatch, true);
+                currentBatch.clear();
+
+                // check if directory was empty and no child added to queue
+                DirPendingWorkInfo dirPendingWorkInfo =
+                    pendingWorkForDirectory.get(startInode.getId());
+                if (dirPendingWorkInfo.isDirWorkDone()) {
+                  namesystem.removeXattr(startInode.getId(),
                       XATTR_SATISFY_STORAGE_POLICY);
+                  pendingWorkForDirectory.remove(startInode.getId());
                 }
-                addAll(rootINodeId, itemInfoList);
               }
             }
           }
@@ -256,17 +298,140 @@ public class BlockStorageMovementNeeded {
         }
       }
     }
+
+    @Override
+    protected void checkPauseForTesting() throws InterruptedException {
+      // TODO implement if needed
+    }
+
+    @Override
+    protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+        throws IOException, InterruptedException {
+      assert getFSDirectory().hasReadLock();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Processing {} for statisy the policy",
+            inode.getFullPathName());
+      }
+      if (!inode.isFile()) {
+        return false;
+      }
+      if (inode.isFile() && inode.asFile().numBlocks() != 0) {
+        currentBatch.add(new ItemInfo(
+            ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
+        remainingCapacity--;
+      }
+      return true;
+    }
+
+    @Override
+    protected boolean canSubmitCurrentBatch() {
+      return remainingCapacity <= 0;
+    }
+
+    @Override
+    protected void checkINodeReady(long startId) throws IOException {
+      FSNamesystem fsn = ((FSNamesystem) namesystem);
+      fsn.checkNameNodeSafeMode("NN is in safe mode,"
+          + "cannot satisfy the policy.");
+      // SPS work should be cancelled when NN goes to standby. Just
+      // double checking for sanity.
+      fsn.checkOperation(NameNode.OperationCategory.WRITE);
+    }
+
+    @Override
+    protected void submitCurrentBatch(long startId)
+        throws IOException, InterruptedException {
+      // Add current child's to queue
+      addAll(startId, currentBatch, false);
+      currentBatch.clear();
+    }
+
+    @Override
+    protected void throttle() throws InterruptedException {
+      assert !getFSDirectory().hasReadLock();
+      assert !namesystem.hasReadLock();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+            + " waiting for some free slots.");
+      }
+      remainingCapacity = remainingCapacity();
+      // wait for queue to be free
+      while (remainingCapacity <= 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+        }
+        Thread.sleep(5000);
+        remainingCapacity = remainingCapacity();
+      }
+    }
+
+    @Override
+    protected boolean canTraverseDir(INode inode) throws IOException {
+      return true;
+    }
   }
 
-  public void start() {
-    fileInodeIdCollector = new Daemon(new FileInodeIdCollector());
-    fileInodeIdCollector.setName("FileInodeIdCollector");
-    fileInodeIdCollector.start();
+  /**
+   * Info for directory recursive scan.
+   */
+  public static class DirPendingWorkInfo {
+
+    private int pendingWorkCount = 0;
+    private boolean fullyScanned = false;
+
+    /**
+     * Increment the pending work count for directory.
+     */
+    public synchronized void addPendingWorkCount(int count) {
+      this.pendingWorkCount = this.pendingWorkCount + count;
+    }
+
+    /**
+     * Decrement the pending work count for directory one track info is
+     * completed.
+     */
+    public synchronized void decrementPendingWorkCount() {
+      this.pendingWorkCount--;
+    }
+
+    /**
+     * Return true if all the pending work is done and directory fully
+     * scanned, otherwise false.
+     */
+    public synchronized boolean isDirWorkDone() {
+      return (pendingWorkCount <= 0 && fullyScanned);
+    }
+
+    /**
+     * Mark directory scan is completed.
+     */
+    public synchronized void markScanCompleted() {
+      this.fullyScanned = true;
+    }
   }
 
-  public void stop() {
-    if (fileInodeIdCollector != null) {
-      fileInodeIdCollector.interrupt();
+  public void init() {
+    inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
+        namesystem.getFSDirectory()));
+    inodeIdCollector.setName("FileInodeIdCollector");
+    inodeIdCollector.start();
+  }
+
+  public void close() {
+    if (inodeIdCollector != null) {
+      inodeIdCollector.interrupt();
+    }
+  }
+
+  class SPSTraverseInfo extends TraverseInfo {
+    private long startId;
+
+    SPSTraverseInfo(long startId) {
+      this.startId = startId;
+    }
+
+    public long getStartId() {
+      return startId;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
new file mode 100644
index 0000000..acc23e5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * FSTreeTraverser traverse directory recursively and process files
+ * in batches.
+ */
+@InterfaceAudience.Private
+public abstract class FSTreeTraverser {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(FSTreeTraverser.class);
+
+  private FSDirectory dir;
+
+  public FSTreeTraverser(FSDirectory dir) {
+    this.dir = dir;
+  }
+
+  public FSDirectory getFSDirectory() {
+    return dir;
+  }
+
+  /**
+   * Iterate through all files directly inside parent, and recurse down
+   * directories. The listing is done in batch, and can optionally start after
+   * a position. The iteration of the inode tree is done in a depth-first
+   * fashion. But instead of holding all {@link INodeDirectory}'s in memory
+   * on the fly, only the path components to the current inode is held. This
+   * is to reduce memory consumption.
+   *
+   * @param parent
+   *          The inode id of parent directory
+   * @param startId
+   *          Id of the start inode.
+   * @param startAfter
+   *          Full path of a file the traverse should start after.
+   * @param traverseInfo
+   *          info which may required for processing the child's.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected void traverseDir(final INodeDirectory parent, final long startId,
+      byte[] startAfter, final TraverseInfo traverseInfo)
+      throws IOException, InterruptedException {
+    List<byte[]> startAfters = new ArrayList<>();
+    if (parent == null) {
+      return;
+    }
+    INode curr = parent;
+    // construct startAfters all the way up to the zone inode.
+    startAfters.add(startAfter);
+    while (curr.getId() != startId) {
+      startAfters.add(0, curr.getLocalNameBytes());
+      curr = curr.getParent();
+    }
+    curr = traverseDirInt(startId, parent, startAfters, traverseInfo);
+    while (!startAfters.isEmpty()) {
+      if (curr == null) {
+        // lock was reacquired, re-resolve path.
+        curr = resolvePaths(startId, startAfters);
+      }
+      curr = traverseDirInt(startId, curr, startAfters, traverseInfo);
+    }
+  }
+
+  /**
+   * Iterates the parent directory, and add direct children files to current
+   * batch. If batch size meets configured threshold, current batch will be
+   * submitted for the processing.
+   * <p>
+   * Locks could be released and reacquired when a batch submission is
+   * finished.
+   *
+   * @param startId
+   *          Id of the start inode.
+   * @return The inode which was just processed, if lock is held in the entire
+   *         process. Null if lock is released.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected INode traverseDirInt(final long startId, INode curr,
+      List<byte[]> startAfters, final TraverseInfo traverseInfo)
+      throws IOException, InterruptedException {
+    assert dir.hasReadLock();
+    assert dir.getFSNamesystem().hasReadLock();
+    Preconditions.checkNotNull(curr, "Current inode can't be null");
+    checkINodeReady(startId);
+    final INodeDirectory parent = curr.isDirectory() ? curr.asDirectory()
+        : curr.getParent();
+    ReadOnlyList<INode> children = parent
+        .getChildrenList(Snapshot.CURRENT_STATE_ID);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Traversing directory {}", parent.getFullPathName());
+    }
+
+    final byte[] startAfter = startAfters.get(startAfters.size() - 1);
+    boolean lockReleased = false;
+    for (int i = INodeDirectory.nextChild(children, startAfter); i < children
+        .size(); ++i) {
+      final INode inode = children.get(i);
+      if (!processFileInode(inode, traverseInfo)) {
+        // inode wasn't processes. Recurse down if it's a dir,
+        // skip otherwise.
+        if (!inode.isDirectory()) {
+          continue;
+        }
+
+        if (!canTraverseDir(inode)) {
+          continue;
+        }
+        // add 1 level to the depth-first search.
+        curr = inode;
+        if (!startAfters.isEmpty()) {
+          startAfters.remove(startAfters.size() - 1);
+          startAfters.add(curr.getLocalNameBytes());
+        }
+        startAfters.add(HdfsFileStatus.EMPTY_NAME);
+        return lockReleased ? null : curr;
+      }
+      if (canSubmitCurrentBatch()) {
+        final byte[] currentStartAfter = inode.getLocalNameBytes();
+        final String parentPath = parent.getFullPathName();
+        lockReleased = true;
+        readUnlock();
+        submitCurrentBatch(startId);
+        try {
+          throttle();
+          checkPauseForTesting();
+        } finally {
+          readLock();
+        }
+        checkINodeReady(startId);
+
+        // Things could have changed when the lock was released.
+        // Re-resolve the parent inode.
+        FSPermissionChecker pc = dir.getPermissionChecker();
+        INode newParent = dir
+            .resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
+            .getLastINode();
+        if (newParent == null || !newParent.equals(parent)) {
+          // parent dir is deleted or recreated. We're done.
+          return null;
+        }
+        children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
+        // -1 to counter the ++ on the for loop
+        i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
+      }
+    }
+    // Successfully finished this dir, adjust pointers to 1 level up, and
+    // startAfter this dir.
+    startAfters.remove(startAfters.size() - 1);
+    if (!startAfters.isEmpty()) {
+      startAfters.remove(startAfters.size() - 1);
+      startAfters.add(curr.getLocalNameBytes());
+    }
+    curr = curr.getParent();
+    return lockReleased ? null : curr;
+  }
+
+  /**
+   * Resolve the cursor of traverse to an inode.
+   * <p>
+   * The parent of the lowest level startAfter is returned. If somewhere in the
+   * middle of startAfters changed, the parent of the lowest unchanged level is
+   * returned.
+   *
+   * @param startId
+   *          Id of the start inode.
+   * @param startAfters
+   *          the cursor, represented by a list of path bytes.
+   * @return the parent inode corresponding to the startAfters, or null if the
+   *         furthest parent is deleted.
+   */
+  private INode resolvePaths(final long startId, List<byte[]> startAfters)
+      throws IOException {
+    // If the readlock was reacquired, we need to resolve the paths again
+    // in case things have changed. If our cursor file/dir is changed,
+    // continue from the next one.
+    INode zoneNode = dir.getInode(startId);
+    if (zoneNode == null) {
+      throw new FileNotFoundException("Zone " + startId + " is deleted.");
+    }
+    INodeDirectory parent = zoneNode.asDirectory();
+    for (int i = 0; i < startAfters.size(); ++i) {
+      if (i == startAfters.size() - 1) {
+        // last startAfter does not need to be resolved, since search for
+        // nextChild will cover that automatically.
+        break;
+      }
+      INode curr = parent.getChild(startAfters.get(i),
+          Snapshot.CURRENT_STATE_ID);
+      if (curr == null) {
+        // inode at this level has changed. Update startAfters to point to
+        // the next dir at the parent level (and dropping any startAfters
+        // at lower levels).
+        for (; i < startAfters.size(); ++i) {
+          startAfters.remove(startAfters.size() - 1);
+        }
+        break;
+      }
+      parent = curr.asDirectory();
+    }
+    return parent;
+  }
+
+  protected void readLock() {
+    dir.getFSNamesystem().readLock();
+    dir.readLock();
+  }
+
+  protected void readUnlock() {
+    dir.readUnlock();
+    dir.getFSNamesystem().readUnlock("FSTreeTraverser");
+  }
+
+
+  protected abstract void checkPauseForTesting() throws InterruptedException;
+
+  /**
+   * Process an Inode. Add to current batch if it's a file, no-op otherwise.
+   *
+   * @param inode
+   *          the inode
+   * @return true if inode is added to currentBatch and should be process for
+   *         next operation. false otherwise: could be inode is not a file.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected abstract boolean processFileInode(INode inode,
+      TraverseInfo traverseInfo) throws IOException, InterruptedException;
+
+  /**
+   * Check whether current batch can be submitted for the processing.
+   *
+   * @return true if batch size meets meet the condition, otherwise false.
+   */
+  protected abstract boolean canSubmitCurrentBatch();
+
+  /**
+   * Check whether inode is ready for traverse. Throws IOE if it's not.
+   *
+   * @param startId
+   *          Id of the start inode.
+   * @throws IOException
+   */
+  protected abstract void checkINodeReady(long startId) throws IOException;
+
+  /**
+   * Submit the current batch for processing.
+   *
+   * @param startId
+   *          Id of the start inode.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected abstract void submitCurrentBatch(long startId)
+      throws IOException, InterruptedException;
+
+  /**
+   * Throttles the FSTreeTraverser.
+   *
+   * @throws InterruptedException
+   */
+  protected abstract void throttle() throws InterruptedException;
+
+  /**
+   * Check whether dir is traversable or not.
+   *
+   * @param inode
+   *          Dir inode
+   * @return true if dir is traversable otherwise false.
+   * @throws IOException
+   */
+  protected abstract boolean canTraverseDir(INode inode) throws IOException;
+
+  /**
+   * Class will represent the additional info required for traverse.
+   */
+  public static class TraverseInfo {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
index 01c2038..9b00519 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
@@ -30,18 +31,16 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
 import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
 import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
 import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
@@ -118,6 +117,8 @@ public class ReencryptionHandler implements Runnable {
   // be single-threaded, see class javadoc for more details.
   private ReencryptionBatch currentBatch;
 
+  private ReencryptionPendingInodeIdCollector traverser;
+
   private final ReencryptionUpdater reencryptionUpdater;
   private ExecutorService updaterExecutor;
 
@@ -186,16 +187,6 @@ public class ReencryptionHandler implements Runnable {
     reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
   }
 
-  private synchronized void checkPauseForTesting() throws InterruptedException {
-    assert !dir.hasReadLock();
-    assert !dir.getFSNamesystem().hasReadLock();
-    while (shouldPauseForTesting) {
-      LOG.info("Sleeping in the re-encrypt handler for unit test.");
-      wait();
-      LOG.info("Continuing re-encrypt handler after pausing.");
-    }
-  }
-
   ReencryptionHandler(final EncryptionZoneManager ezMgr,
       final Configuration conf) {
     this.ezManager = ezMgr;
@@ -256,6 +247,7 @@ public class ReencryptionHandler implements Runnable {
     reencryptionUpdater =
         new ReencryptionUpdater(dir, batchService, this, conf);
     currentBatch = new ReencryptionBatch(reencryptBatchSize);
+    traverser = new ReencryptionPendingInodeIdCollector(dir, this);
   }
 
   ReencryptionStatus getReencryptionStatus() {
@@ -339,7 +331,7 @@ public class ReencryptionHandler implements Runnable {
         synchronized (this) {
           wait(interval);
         }
-        checkPauseForTesting();
+        traverser.checkPauseForTesting();
       } catch (InterruptedException ie) {
         LOG.info("Re-encrypt handler interrupted. Exiting");
         Thread.currentThread().interrupt();
@@ -397,7 +389,7 @@ public class ReencryptionHandler implements Runnable {
     final INode zoneNode;
     final ZoneReencryptionStatus zs;
 
-    readLock();
+    traverser.readLock();
     try {
       zoneNode = dir.getInode(zoneId);
       // start re-encrypting the zone from the beginning
@@ -419,18 +411,19 @@ public class ReencryptionHandler implements Runnable {
           zoneId);
       if (zs.getLastCheckpointFile() == null) {
         // new re-encryption
-        reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME,
-            zs.getEzKeyVersionName());
+        traverser.traverseDir(zoneNode.asDirectory(), zoneId,
+            HdfsFileStatus.EMPTY_NAME,
+            new ZoneTraverseInfo(zs.getEzKeyVersionName()));
       } else {
         // resuming from a past re-encryption
         restoreFromLastProcessedFile(zoneId, zs);
       }
       // save the last batch and mark complete
-      submitCurrentBatch(zoneId);
+      traverser.submitCurrentBatch(zoneId);
       LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
       reencryptionUpdater.markZoneSubmissionDone(zoneId);
     } finally {
-      readUnlock();
+      traverser.readUnlock();
     }
   }
 
@@ -479,131 +472,8 @@ public class ReencryptionHandler implements Runnable {
         dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
     parent = lpfIIP.getLastINode().getParent();
     startAfter = lpfIIP.getLastINode().getLocalNameBytes();
-    reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName());
-  }
-
-  /**
-   * Iterate through all files directly inside parent, and recurse down
-   * directories. The listing is done in batch, and can optionally start after
-   * a position.
-   * <p>
-   * Each batch is then send to the threadpool, where KMS will be contacted and
-   * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed
-   * from the threadpool.
-   * <p>
-   * The iteration of the inode tree is done in a depth-first fashion. But
-   * instead of holding all INodeDirectory's in memory on the fly, only the
-   * path components to the current inode is held. This is to reduce memory
-   * consumption.
-   *
-   * @param parent     The inode id of parent directory
-   * @param zoneId     Id of the EZ inode
-   * @param startAfter Full path of a file the re-encrypt should start after.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private void reencryptDir(final INodeDirectory parent, final long zoneId,
-      byte[] startAfter, final String ezKeyVerName)
-      throws IOException, InterruptedException {
-    List<byte[]> startAfters = new ArrayList<>();
-    if (parent == null) {
-      return;
-    }
-    INode curr = parent;
-    // construct startAfters all the way up to the zone inode.
-    startAfters.add(startAfter);
-    while (curr.getId() != zoneId) {
-      startAfters.add(0, curr.getLocalNameBytes());
-      curr = curr.getParent();
-    }
-    curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName);
-    while (!startAfters.isEmpty()) {
-      if (curr == null) {
-        // lock was reacquired, re-resolve path.
-        curr = resolvePaths(zoneId, startAfters);
-      }
-      curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName);
-    }
-  }
-
-  /**
-   * Resolve the cursor of re-encryption to an inode.
-   * <p>
-   * The parent of the lowest level startAfter is returned. If somewhere in the
-   * middle of startAfters changed, the parent of the lowest unchanged level is
-   * returned.
-   *
-   * @param zoneId      Id of the EZ inode.
-   * @param startAfters the cursor, represented by a list of path bytes.
-   * @return the parent inode corresponding to the startAfters, or null if
-   * the EZ node (furthest parent) is deleted.
-   */
-  private INode resolvePaths(final long zoneId, List<byte[]> startAfters)
-      throws IOException {
-    // If the readlock was reacquired, we need to resolve the paths again
-    // in case things have changed. If our cursor file/dir is changed,
-    // continue from the next one.
-    INode zoneNode = dir.getInode(zoneId);
-    if (zoneNode == null) {
-      throw new FileNotFoundException("Zone " + zoneId + " is deleted.");
-    }
-    INodeDirectory parent = zoneNode.asDirectory();
-    for (int i = 0; i < startAfters.size(); ++i) {
-      if (i == startAfters.size() - 1) {
-        // last startAfter does not need to be resolved, since search for
-        // nextChild will cover that automatically.
-        break;
-      }
-      INode curr =
-          parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID);
-      if (curr == null) {
-        // inode at this level has changed. Update startAfters to point to
-        // the next dir at the parent level (and dropping any startAfters
-        // at lower levels).
-        for (; i < startAfters.size(); ++i) {
-          startAfters.remove(startAfters.size() - 1);
-        }
-        break;
-      }
-      parent = curr.asDirectory();
-    }
-    return parent;
-  }
-
-  /**
-   * Submit the current batch to the thread pool.
-   *
-   * @param zoneId Id of the EZ INode
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private void submitCurrentBatch(final long zoneId)
-      throws IOException, InterruptedException {
-    assert dir.hasReadLock();
-    if (currentBatch.isEmpty()) {
-      return;
-    }
-    ZoneSubmissionTracker zst;
-    synchronized (this) {
-      zst = submissions.get(zoneId);
-      if (zst == null) {
-        zst = new ZoneSubmissionTracker();
-        submissions.put(zoneId, zst);
-      }
-    }
-    Future future = batchService
-        .submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
-    zst.addTask(future);
-    LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
-        currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
-    currentBatch = new ReencryptionBatch(reencryptBatchSize);
-    // flip the pause flag if this is nth submission.
-    // The actual pause need to happen outside of the lock.
-    if (pauseAfterNthSubmission > 0) {
-      if (--pauseAfterNthSubmission == 0) {
-        shouldPauseForTesting = true;
-      }
-    }
+    traverser.traverseDir(parent, zoneId, startAfter,
+        new ZoneTraverseInfo(zs.getEzKeyVersionName()));
   }
 
   final class ReencryptionBatch {
@@ -711,256 +581,270 @@ public class ReencryptionHandler implements Runnable {
     }
   }
 
+
   /**
-   * Iterates the parent directory, and add direct children files to
-   * current batch. If batch size meets configured threshold, a Callable
-   * is created and sent to the thread pool, which will communicate to the KMS
-   * to get new edeks.
-   * <p>
-   * Locks could be released and reacquired when a Callable is created.
-   *
-   * @param zoneId Id of the EZ INode
-   * @return The inode which was just processed, if lock is held in the entire
-   * process. Null if lock is released.
-   * @throws IOException
-   * @throws InterruptedException
+   * Called when a new zone is submitted for re-encryption. This will interrupt
+   * the background thread if it's waiting for the next
+   * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
    */
-  private INode reencryptDirInt(final long zoneId, INode curr,
-      List<byte[]> startAfters, final String ezKeyVerName)
-      throws IOException, InterruptedException {
-    assert dir.hasReadLock();
-    assert dir.getFSNamesystem().hasReadLock();
-    Preconditions.checkNotNull(curr, "Current inode can't be null");
-    checkZoneReady(zoneId);
-    final INodeDirectory parent =
-        curr.isDirectory() ? curr.asDirectory() : curr.getParent();
-    ReadOnlyList<INode> children =
-        parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Re-encrypting directory {}", parent.getFullPathName());
-    }
-
-    final byte[] startAfter = startAfters.get(startAfters.size() - 1);
-    boolean lockReleased = false;
-    for (int i = INodeDirectory.nextChild(children, startAfter);
-         i < children.size(); ++i) {
-      final INode inode = children.get(i);
-      if (!reencryptINode(inode, ezKeyVerName)) {
-        // inode wasn't added for re-encryption. Recurse down if it's a dir,
-        // skip otherwise.
-        if (!inode.isDirectory()) {
-          continue;
-        }
-        if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
-          // nested EZ, ignore.
-          LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
-              inode.getFullPathName(), inode.getId());
-          continue;
+  synchronized void notifyNewSubmission() {
+    LOG.debug("Notifying handler for new re-encryption command.");
+    this.notify();
+  }
+
+  public ReencryptionPendingInodeIdCollector getTraverser() {
+    return traverser;
+  }
+
+  /**
+   * ReencryptionPendingInodeIdCollector which throttle based on configured
+   * throttle ratio.
+   */
+  class ReencryptionPendingInodeIdCollector extends FSTreeTraverser {
+
+    private ReencryptionHandler reencryptionHandler;
+
+    ReencryptionPendingInodeIdCollector(FSDirectory dir,
+        ReencryptionHandler rHandler) {
+      super(dir);
+      this.reencryptionHandler = rHandler;
+    }
+
+    @Override
+    protected void checkPauseForTesting()
+        throws InterruptedException {
+      assert !dir.hasReadLock();
+      assert !dir.getFSNamesystem().hasReadLock();
+      while (shouldPauseForTesting) {
+        LOG.info("Sleeping in the re-encrypt handler for unit test.");
+        synchronized (reencryptionHandler) {
+          reencryptionHandler.wait(30000);
         }
-        // add 1 level to the depth-first search.
-        curr = inode;
-        if (!startAfters.isEmpty()) {
-          startAfters.remove(startAfters.size() - 1);
-          startAfters.add(curr.getLocalNameBytes());
+        LOG.info("Continuing re-encrypt handler after pausing.");
+      }
+    }
+
+    /**
+     * Process an Inode for re-encryption. Add to current batch if it's a file,
+     * no-op otherwise.
+     *
+     * @param inode
+     *          the inode
+     * @return true if inode is added to currentBatch and should be
+     *         re-encrypted. false otherwise: could be inode is not a file, or
+     *         inode's edek's key version is not changed.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+        throws IOException, InterruptedException {
+      assert dir.hasReadLock();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
+      }
+      if (!inode.isFile()) {
+        return false;
+      }
+      FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
+          dir, INodesInPath.fromINode(inode));
+      if (feInfo == null) {
+        LOG.warn("File {} skipped re-encryption because it is not encrypted! "
+            + "This is very likely a bug.", inode.getId());
+        return false;
+      }
+      if (traverseInfo instanceof ZoneTraverseInfo
+          && ((ZoneTraverseInfo) traverseInfo).getEzKeyVerName().equals(
+              feInfo.getEzKeyVersionName())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("File {} skipped re-encryption because edek's key version"
+              + " name is not changed.", inode.getFullPathName());
         }
-        startAfters.add(HdfsFileStatus.EMPTY_NAME);
-        return lockReleased ? null : curr;
+        return false;
+      }
+      currentBatch.add(inode.asFile());
+      return true;
+    }
+
+    /**
+     * Check whether zone is ready for re-encryption. Throws IOE if it's not. 1.
+     * If EZ is deleted. 2. if the re-encryption is canceled. 3. If NN is not
+     * active or is in safe mode.
+     *
+     * @throws IOException
+     *           if zone does not exist / is cancelled, or if NN is not ready
+     *           for write.
+     */
+    @Override
+    protected void checkINodeReady(long zoneId) throws IOException {
+      final ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(
+          zoneId);
+      if (zs == null) {
+        throw new IOException("Zone " + zoneId + " status cannot be found.");
+      }
+      if (zs.isCanceled()) {
+        throw new IOException("Re-encryption is canceled for zone " + zoneId);
       }
-      if (currentBatch.size() >= reencryptBatchSize) {
-        final byte[] currentStartAfter = inode.getLocalNameBytes();
-        final String parentPath = parent.getFullPathName();
-        submitCurrentBatch(zoneId);
-        lockReleased = true;
-        readUnlock();
-        try {
-          throttle();
-          checkPauseForTesting();
-        } finally {
-          readLock();
+      dir.getFSNamesystem().checkNameNodeSafeMode(
+          "NN is in safe mode, cannot re-encrypt.");
+      // re-encryption should be cancelled when NN goes to standby. Just
+      // double checking for sanity.
+      dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
+    }
+
+    /**
+     * Submit the current batch to the thread pool.
+     *
+     * @param zoneId
+     *          Id of the EZ INode
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    protected void submitCurrentBatch(final long zoneId) throws IOException,
+        InterruptedException {
+      if (currentBatch.isEmpty()) {
+        return;
+      }
+      ZoneSubmissionTracker zst;
+      synchronized (ReencryptionHandler.this) {
+        zst = submissions.get(zoneId);
+        if (zst == null) {
+          zst = new ZoneSubmissionTracker();
+          submissions.put(zoneId, zst);
         }
-        checkZoneReady(zoneId);
-
-        // Things could have changed when the lock was released.
-        // Re-resolve the parent inode.
-        FSPermissionChecker pc = dir.getPermissionChecker();
-        INode newParent =
-            dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
-                .getLastINode();
-        if (newParent == null || !newParent.equals(parent)) {
-          // parent dir is deleted or recreated. We're done.
-          return null;
+      }
+      Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
+          currentBatch, reencryptionHandler));
+      zst.addTask(future);
+      LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
+          currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
+      currentBatch = new ReencryptionBatch(reencryptBatchSize);
+      // flip the pause flag if this is nth submission.
+      // The actual pause need to happen outside of the lock.
+      if (pauseAfterNthSubmission > 0) {
+        if (--pauseAfterNthSubmission == 0) {
+          shouldPauseForTesting = true;
         }
-        children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
-        // -1 to counter the ++ on the for loop
-        i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
       }
     }
-    // Successfully finished this dir, adjust pointers to 1 level up, and
-    // startAfter this dir.
-    startAfters.remove(startAfters.size() - 1);
-    if (!startAfters.isEmpty()) {
-      startAfters.remove(startAfters.size() - 1);
-      startAfters.add(curr.getLocalNameBytes());
-    }
-    curr = curr.getParent();
-    return lockReleased ? null : curr;
-  }
 
-  private void readLock() {
-    dir.getFSNamesystem().readLock();
-    dir.readLock();
-    throttleTimerLocked.start();
-  }
+    /**
+     * Throttles the ReencryptionHandler in 3 aspects:
+     * 1. Prevents generating more Callables than the CPU could possibly
+     * handle.
+     * 2. Prevents generating more Callables than the ReencryptionUpdater
+     * can handle, under its own throttling.
+     * 3. Prevents contending FSN/FSD read locks. This is done based
+     * on the DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
+     * <p>
+     * Item 1 and 2 are to control NN heap usage.
+     *
+     * @throws InterruptedException
+     */
+    @VisibleForTesting
+    @Override
+    protected void throttle() throws InterruptedException {
+      assert !dir.hasReadLock();
+      assert !dir.getFSNamesystem().hasReadLock();
+      final int numCores = Runtime.getRuntime().availableProcessors();
+      if (taskQueue.size() >= numCores) {
+        LOG.debug("Re-encryption handler throttling because queue size {} is"
+            + "larger than number of cores {}", taskQueue.size(), numCores);
+        while (taskQueue.size() >= numCores) {
+          Thread.sleep(100);
+        }
+      }
 
-  private void readUnlock() {
-    dir.readUnlock();
-    dir.getFSNamesystem().readUnlock("reencryptHandler");
-    throttleTimerLocked.stop();
-  }
+      // 2. if tasks are piling up on the updater, don't create new callables
+      // until the queue size goes down.
+      final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
+      int numTasks = numTasksSubmitted();
+      if (numTasks >= maxTasksPiled) {
+        LOG.debug("Re-encryption handler throttling because total tasks pending"
+            + " re-encryption updater is {}", numTasks);
+        while (numTasks >= maxTasksPiled) {
+          Thread.sleep(500);
+          numTasks = numTasksSubmitted();
+        }
+      }
 
-  /**
-   * Throttles the ReencryptionHandler in 3 aspects:
-   * 1. Prevents generating more Callables than the CPU could possibly handle.
-   * 2. Prevents generating more Callables than the ReencryptionUpdater can
-   *   handle, under its own throttling
-   * 3. Prevents contending FSN/FSD read locks. This is done based on the
-   *   DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
-   * <p>
-   * Item 1 and 2 are to control NN heap usage.
-   *
-   * @throws InterruptedException
-   */
-  @VisibleForTesting
-  void throttle() throws InterruptedException {
-    // 1.
-    final int numCores = Runtime.getRuntime().availableProcessors();
-    if (taskQueue.size() >= numCores) {
-      LOG.debug("Re-encryption handler throttling because queue size {} is"
-          + "larger than number of cores {}", taskQueue.size(), numCores);
-      while (taskQueue.size() >= numCores) {
-        Thread.sleep(100);
+      // 3.
+      if (throttleLimitHandlerRatio >= 1.0) {
+        return;
+      }
+      final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
+          * throttleLimitHandlerRatio);
+      final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
+            + " throttleTimerAll:{}", expect, actual,
+            throttleTimerAll.now(TimeUnit.MILLISECONDS));
       }
+      if (expect - actual < 0) {
+        // in case throttleLimitHandlerRatio is very small, expect will be 0.
+        // so sleepMs should not be calculated from expect, to really meet the
+        // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
+        // should be 1000 - throttleTimerAll.now()
+        final long sleepMs = (long) (actual / throttleLimitHandlerRatio)
+            - throttleTimerAll.now(TimeUnit.MILLISECONDS);
+        LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
+        Thread.sleep(sleepMs);
+      }
+      throttleTimerAll.reset().start();
+      throttleTimerLocked.reset();
     }
 
-    // 2. if tasks are piling up on the updater, don't create new callables
-    // until the queue size goes down.
-    final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
-    int numTasks = numTasksSubmitted();
-    if (numTasks >= maxTasksPiled) {
-      LOG.debug("Re-encryption handler throttling because total tasks pending"
-          + " re-encryption updater is {}", numTasks);
-      while (numTasks >= maxTasksPiled) {
-        Thread.sleep(500);
-        numTasks = numTasksSubmitted();
+    private int numTasksSubmitted() {
+      int ret = 0;
+      synchronized (ReencryptionHandler.this) {
+        for (ZoneSubmissionTracker zst : submissions.values()) {
+          ret += zst.getTasks().size();
+        }
       }
+      return ret;
     }
 
-    // 3.
-    if (throttleLimitHandlerRatio >= 1.0) {
-      return;
-    }
-    final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
-        * throttleLimitHandlerRatio);
-    final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
-              + " throttleTimerAll:{}", expect, actual,
-          throttleTimerAll.now(TimeUnit.MILLISECONDS));
-    }
-    if (expect - actual < 0) {
-      // in case throttleLimitHandlerRatio is very small, expect will be 0.
-      // so sleepMs should not be calculated from expect, to really meet the
-      // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
-      // should be 1000 - throttleTimerAll.now()
-      final long sleepMs =
-          (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll
-              .now(TimeUnit.MILLISECONDS);
-      LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
-      Thread.sleep(sleepMs);
+    @Override
+    public boolean canSubmitCurrentBatch() {
+      return currentBatch.size() >= reencryptBatchSize;
     }
-    throttleTimerAll.reset().start();
-    throttleTimerLocked.reset();
-  }
 
-  private synchronized int numTasksSubmitted() {
-    int ret = 0;
-    for (ZoneSubmissionTracker zst : submissions.values()) {
-      ret += zst.getTasks().size();
+    @Override
+    public boolean canTraverseDir(INode inode) throws IOException {
+      if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
+        // nested EZ, ignore.
+        LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
+            inode.getFullPathName(), inode.getId());
+        return false;
+      }
+      return true;
     }
-    return ret;
-  }
 
-  /**
-   * Process an Inode for re-encryption. Add to current batch if it's a file,
-   * no-op otherwise.
-   *
-   * @param inode the inode
-   * @return true if inode is added to currentBatch and should be re-encrypted.
-   * false otherwise: could be inode is not a file, or inode's edek's
-   * key version is not changed.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private boolean reencryptINode(final INode inode, final String ezKeyVerName)
-      throws IOException, InterruptedException {
-    assert dir.hasReadLock();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
-    }
-    if (!inode.isFile()) {
-      return false;
-    }
-    FileEncryptionInfo feInfo = FSDirEncryptionZoneOp
-        .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
-    if (feInfo == null) {
-      LOG.warn("File {} skipped re-encryption because it is not encrypted! "
-          + "This is very likely a bug.", inode.getId());
-      return false;
+    @Override
+    protected void readLock() {
+      dir.getFSNamesystem().readLock();
+      dir.readLock();
+      throttleTimerLocked.start();
     }
-    if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("File {} skipped re-encryption because edek's key version"
-            + " name is not changed.", inode.getFullPathName());
-      }
-      return false;
+
+    @Override
+    protected void readUnlock() {
+      dir.readUnlock();
+      dir.getFSNamesystem().readUnlock("reencryptHandler");
+      throttleTimerLocked.stop();
     }
-    currentBatch.add(inode.asFile());
-    return true;
   }
 
-  /**
-   * Check whether zone is ready for re-encryption. Throws IOE if it's not.
-   * 1. If EZ is deleted.
-   * 2. if the re-encryption is canceled.
-   * 3. If NN is not active or is in safe mode.
-   *
-   * @throws IOException if zone does not exist / is cancelled, or if NN is not
-   *                     ready for write.
-   */
-  void checkZoneReady(final long zoneId)
-      throws RetriableException, SafeModeException, IOException {
-    final ZoneReencryptionStatus zs =
-        getReencryptionStatus().getZoneStatus(zoneId);
-    if (zs == null) {
-      throw new IOException("Zone " + zoneId + " status cannot be found.");
-    }
-    if (zs.isCanceled()) {
-      throw new IOException("Re-encryption is canceled for zone " + zoneId);
+  class ZoneTraverseInfo extends TraverseInfo {
+    private String ezKeyVerName;
+
+    ZoneTraverseInfo(String ezKeyVerName) {
+      this.ezKeyVerName = ezKeyVerName;
     }
-    dir.getFSNamesystem()
-        .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt.");
-    // re-encryption should be cancelled when NN goes to standby. Just
-    // double checking for sanity.
-    dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
-  }
 
-  /**
-   * Called when a new zone is submitted for re-encryption. This will interrupt
-   * the background thread if it's waiting for the next
-   * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
-   */
-  synchronized void notifyNewSubmission() {
-    LOG.debug("Notifying handler for new re-encryption command.");
-    this.notify();
+    public String getEzKeyVerName() {
+      return ezKeyVerName;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
index 3b7badb..a5923a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
@@ -464,7 +464,7 @@ public final class ReencryptionUpdater implements Runnable {
     final String zonePath;
     dir.writeLock();
     try {
-      handler.checkZoneReady(task.zoneId);
+      handler.getTraverser().checkINodeReady(task.zoneId);
       final INode zoneNode = dir.getInode(task.zoneId);
       if (zoneNode == null) {
         // ez removed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 48d0598..a4372d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -77,7 +77,8 @@ public class StoragePolicySatisfier implements Runnable {
   private final BlockStorageMovementNeeded storageMovementNeeded;
   private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
   private volatile boolean isRunning = false;
-
+  private int spsWorkMultiplier;
+  private long blockCount = 0L;
   /**
    * Represents the collective analysis status for all blocks.
    */
@@ -106,7 +107,9 @@ public class StoragePolicySatisfier implements Runnable {
       final BlockManager blkManager, Configuration conf) {
     this.namesystem = namesystem;
     this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
-        this);
+        this, conf.getInt(
+            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
     this.blockManager = blkManager;
     this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
         conf.getLong(
@@ -117,6 +120,7 @@ public class StoragePolicySatisfier implements Runnable {
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
         storageMovementNeeded,
         this);
+    this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
   }
 
   /**
@@ -143,7 +147,7 @@ public class StoragePolicySatisfier implements Runnable {
     // Ensure that all the previously submitted block movements(if any) have to
     // be stopped in all datanodes.
     addDropSPSWorkCommandsToAllDNs();
-    storageMovementNeeded.start();
+    storageMovementNeeded.init();
     storagePolicySatisfierThread = new Daemon(this);
     storagePolicySatisfierThread.setName("StoragePolicySatisfier");
     storagePolicySatisfierThread.start();
@@ -164,7 +168,7 @@ public class StoragePolicySatisfier implements Runnable {
       return;
     }
 
-    storageMovementNeeded.stop();
+    storageMovementNeeded.close();
 
     storagePolicySatisfierThread.interrupt();
     this.storageMovementsMonitor.stop();
@@ -268,9 +272,13 @@ public class StoragePolicySatisfier implements Runnable {
             }
           }
         }
-        // TODO: We can think to make this as configurable later, how frequently
-        // we want to check block movements.
-        Thread.sleep(3000);
+        int numLiveDn = namesystem.getFSDirectory().getBlockManager()
+            .getDatanodeManager().getNumLiveDataNodes();
+        if (storageMovementNeeded.size() == 0
+            || blockCount > (numLiveDn * spsWorkMultiplier)) {
+          Thread.sleep(3000);
+          blockCount = 0L;
+        }
       } catch (Throwable t) {
         handleException(t);
       }
@@ -380,6 +388,11 @@ public class StoragePolicySatisfier implements Runnable {
 
     assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
         blockMovingInfos, coordinatorNode);
+    int count = 0;
+    for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+      count = count + blkMovingInfo.getSources().length;
+    }
+    blockCount = blockCount + count;
     return status;
   }
 
@@ -840,7 +853,7 @@ public class StoragePolicySatisfier implements Runnable {
    *          - file inode/blockcollection id.
    */
   public void satisfyStoragePolicy(Long inodeId) {
-    //For file rootId and trackId is same
+    //For file startId and trackId is same
     storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
     if (LOG.isDebugEnabled()) {
       LOG.debug("Added track info for inode {} to block "
@@ -864,19 +877,19 @@ public class StoragePolicySatisfier implements Runnable {
    * policy.
    */
   public static class ItemInfo {
-    private long rootId;
+    private long startId;
     private long trackId;
 
-    public ItemInfo(long rootId, long trackId) {
-      this.rootId = rootId;
+    public ItemInfo(long startId, long trackId) {
+      this.startId = startId;
       this.trackId = trackId;
     }
 
     /**
-     * Return the root of the current track Id.
+     * Return the start inode id of the current track Id.
      */
-    public long getRootId() {
-      return rootId;
+    public long getStartId() {
+      return startId;
     }
 
     /**
@@ -890,7 +903,7 @@ public class StoragePolicySatisfier implements Runnable {
      * Returns true if the tracking path is a directory, false otherwise.
      */
     public boolean isDir() {
-      return (rootId != trackId);
+      return (startId != trackId);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 48799ea..0298997 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4367,6 +4367,29 @@
 </property>
 
 <property>
+  <name>dfs.storage.policy.satisfier.queue.limit</name>
+  <value>1000</value>
+  <description>
+    Storage policy satisfier queue size. This queue contains the currently
+    scheduled file's inode ID for statisfy the policy.
+    Default value is 1000.
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.work.multiplier.per.iteration</name>
+  <value>1</value>
+  <description>
+    *Note*: Advanced property. Change with caution.
+    This determines the total amount of block transfers to begin in
+    one iteration, for satisfy the policy. The actual number is obtained by
+    multiplying this multiplier with the total number of live nodes in the
+    cluster. The result number is the number of blocks to begin transfers
+    immediately. This number can be any positive, non-zero integer.
+  </description>
+</property>
+
+<property>
   <name>dfs.storage.policy.satisfier.recheck.timeout.millis</name>
   <value>300000</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index 87817cf..da61842 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -110,7 +110,7 @@ SPS can be enabled and disabled dynamically without restarting the Namenode.
 
 Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285)
 
-* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will consider the files which are immediate to that directory. Sub-directories won't be considered for satisfying the policy. Its user responsibility to call this API on directories recursively, to track all files under the sub tree.
+* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will scan all sub-directories and consider all the files for satisfy the policy..
 
 * HdfsAdmin API :
         `public void satisfyStoragePolicy(final Path path) throws IOException`
@@ -212,7 +212,6 @@ Get the storage policy of a file or a directory.
 ### Satisfy Storage Policy
 
 Schedule blocks to move based on file's/directory's current storage policy.
-Note: For directory case, it will consider immediate files under that directory and it will not consider sub directories recursively.
 
 * Command:
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 55ebf9c..7918821 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -41,7 +41,7 @@ public class TestBlockStorageMovementAttemptedItems {
   public void setup() throws Exception {
     unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
         Mockito.mock(Namesystem.class),
-        Mockito.mock(StoragePolicySatisfier.class));
+        Mockito.mock(StoragePolicySatisfier.class), 100);
     StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
     bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
         selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index e7b9148..5bce296 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -191,7 +191,7 @@ public class TestPersistentStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
       DFSTestUtil.waitExpectedStorageType(
-          childFileName, StorageType.DEFAULT, 3, timeout, fs);
+          childFileName, StorageType.ARCHIVE, 3, timeout, fs);
 
     } finally {
       clusterShutdown();
@@ -232,7 +232,9 @@ public class TestPersistentStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           parentFileName, StorageType.ARCHIVE, 2, timeout, fs);
       DFSTestUtil.waitExpectedStorageType(
-          childFileName, StorageType.DEFAULT, 3, timeout, fs);
+          childFileName, StorageType.DISK, 1, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          childFileName, StorageType.ARCHIVE, 2, timeout, fs);
     } finally {
       clusterShutdown();
     }
@@ -269,7 +271,7 @@ public class TestPersistentStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
       DFSTestUtil.waitExpectedStorageType(
-          childFileName, StorageType.DEFAULT, 3, timeout, fs);
+          childFileName, StorageType.ARCHIVE, 3, timeout, fs);
     } finally {
       clusterShutdown();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
index f0d6834..7685f31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Supplier;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -64,7 +63,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -72,7 +70,6 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import org.junit.rules.Timeout;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
index e2035ed..3481b42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
@@ -75,6 +75,10 @@ public class TestReencryptionHandler {
         CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
     Mockito.when(ezm.getProvider()).thenReturn(
         KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp));
+    FSDirectory fsd = Mockito.mock(FSDirectory.class);
+    FSNamesystem fns = Mockito.mock(FSNamesystem.class);
+    Mockito.when(fsd.getFSNamesystem()).thenReturn(fns);
+    Mockito.when(ezm.getFSDirectory()).thenReturn(fsd);
     return new ReencryptionHandler(ezm, conf);
   }
 
@@ -99,7 +103,7 @@ public class TestReencryptionHandler {
     Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
     Whitebox.setInternalState(rh, "taskQueue", queue);
     final StopWatch sw = new StopWatch().start();
-    rh.throttle();
+    rh.getTraverser().throttle();
     sw.stop();
     assertTrue("should have throttled for at least 8 second",
         sw.now(TimeUnit.MILLISECONDS) > 8000);
@@ -130,7 +134,7 @@ public class TestReencryptionHandler {
         submissions = new HashMap<>();
     Whitebox.setInternalState(rh, "submissions", submissions);
     StopWatch sw = new StopWatch().start();
-    rh.throttle();
+    rh.getTraverser().throttle();
     sw.stop();
     assertTrue("should not have throttled",
         sw.now(TimeUnit.MILLISECONDS) < 1000);
@@ -189,7 +193,7 @@ public class TestReencryptionHandler {
     Whitebox.setInternalState(rh, "submissions", submissions);
     final StopWatch sw = new StopWatch().start();
     removeTaskThread.start();
-    rh.throttle();
+    rh.getTraverser().throttle();
     sw.stop();
     LOG.info("Throttle completed, consumed {}", sw.now(TimeUnit.MILLISECONDS));
     assertTrue("should have throttled for at least 3 second",


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org