You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/01/26 16:58:38 UTC

[2/2] hadoop git commit: HDFS-13057: [SPS]: Revisit configurations to make SPS service modes internal/external/none. Contributed by Rakesh R.

HDFS-13057: [SPS]: Revisit configurations to make SPS service modes internal/external/none. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7309b369
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7309b369
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7309b369

Branch: refs/heads/HDFS-10285
Commit: 7309b3691c3d7ebf924054a602785ba3d5e32969
Parents: 55ddb07
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Fri Jan 26 08:57:29 2018 -0800
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Fri Jan 26 08:57:29 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/protocol/HdfsConstants.java     |  39 ++++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   9 +-
 .../server/blockmanagement/BlockManager.java    | 105 +++++++---
 .../hdfs/server/namenode/FSNamesystem.java      |   6 +-
 .../hdfs/server/namenode/FSTreeTraverser.java   |   2 +-
 .../hadoop/hdfs/server/namenode/NameNode.java   |  34 ++--
 .../sps/BlockStorageMovementNeeded.java         |   2 +-
 .../namenode/sps/IntraSPSNameNodeContext.java   |   3 +
 .../hdfs/server/namenode/sps/SPSService.java    |   4 +-
 .../namenode/sps/StoragePolicySatisfier.java    |  17 +-
 .../server/sps/ExternalSPSFileIDCollector.java  |  32 ++-
 .../hadoop/hdfs/tools/StoragePolicyAdmin.java   |  16 +-
 .../src/main/resources/hdfs-default.xml         |  13 +-
 .../src/site/markdown/ArchivalStorage.md        |  17 +-
 .../TestStoragePolicySatisfyWorker.java         |   5 +-
 .../hadoop/hdfs/server/mover/TestMover.java     |  45 +++--
 .../hdfs/server/mover/TestStorageMover.java     |   4 +-
 .../namenode/TestNameNodeReconfigure.java       | 105 +++++-----
 .../TestPersistentStoragePolicySatisfier.java   |   9 +-
 .../TestStoragePolicySatisfierWithHA.java       |  12 +-
 .../sps/TestStoragePolicySatisfier.java         | 202 +++++++------------
 ...stStoragePolicySatisfierWithStripedFile.java |  17 +-
 .../sps/TestExternalStoragePolicySatisfier.java | 112 +++++++---
 .../hdfs/tools/TestStoragePolicyCommands.java   |   5 +-
 .../TestStoragePolicySatisfyAdminCommands.java  |  14 +-
 25 files changed, 500 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index d9b5b9b..9641365 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -129,6 +129,45 @@ public final class HdfsConstants {
   }
 
   /**
+   * Storage policy satisfier service modes.
+   */
+  public enum StoragePolicySatisfierMode {
+
+    /**
+     * This mode represents that SPS service is running inside Namenode and can
+     * accept any SPS call request.
+     */
+    INTERNAL,
+
+    /**
+     * This mode represents that SPS service is running outside Namenode as an
+     * external service and can accept any SPS call request.
+     */
+    EXTERNAL,
+
+    /**
+     * This mode represents that SPS service is disabled and cannot accept any
+     * SPS call request.
+     */
+    NONE;
+
+    private static final Map<String, StoragePolicySatisfierMode> MAP =
+        new HashMap<>();
+
+    static {
+      for (StoragePolicySatisfierMode a : values()) {
+        MAP.put(a.name(), a);
+      }
+    }
+
+    /** Convert the given String to a StoragePolicySatisfierMode. */
+    public static StoragePolicySatisfierMode fromString(String s) {
+      return MAP.get(StringUtils.toUpperCase(s));
+    }
+  }
+
+
+  /**
    * Storage policy satisfy path status.
    */
   public enum StoragePolicySatisfyPathStatus {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 4666cde..bc78ce3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
@@ -598,10 +599,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int    DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
 
   // SPS related configurations
-  public static final String  DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY =
-      "dfs.storage.policy.satisfier.enabled";
-  public static final boolean DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT =
-      false;
+  public static final String  DFS_STORAGE_POLICY_SATISFIER_MODE_KEY =
+      "dfs.storage.policy.satisfier.mode";
+  public static final String DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT =
+      StoragePolicySatisfierMode.NONE.toString();
   public static final String  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY =
       "dfs.storage.policy.satisfier.queue.limit";
   public static final int  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index b8f49cb..03d355e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -440,8 +441,8 @@ public class BlockManager implements BlockStatsMXBean {
   /** For satisfying block storage policies. */
   private final StoragePolicySatisfier sps;
   private final boolean storagePolicyEnabled;
-  private boolean spsEnabled;
-  private final SPSPathIds spsPaths;
+  private StoragePolicySatisfierMode spsMode;
+  private SPSPathIds spsPaths;
 
   /** Minimum live replicas needed for the datanode to be transitioned
    * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
@@ -484,12 +485,13 @@ public class BlockManager implements BlockStatsMXBean {
     storagePolicyEnabled =
         conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
             DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
-    spsEnabled =
-        conf.getBoolean(
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
-    sps = new StoragePolicySatisfier(conf);
+    String spsModeVal =
+        conf.get(
+            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+    spsMode = StoragePolicySatisfierMode.fromString(spsModeVal);
     spsPaths = new SPSPathIds();
+    sps = new StoragePolicySatisfier(conf);
     blockTokenSecretManager = createBlockTokenSecretManager(conf);
 
     providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@@ -5024,18 +5026,22 @@ public class BlockManager implements BlockStatsMXBean {
    * Start storage policy satisfier service.
    */
   public void startSPS() {
-    if (!(storagePolicyEnabled && spsEnabled)) {
+    if (!(storagePolicyEnabled && spsMode != StoragePolicySatisfierMode.NONE)) {
       LOG.info(
           "Failed to start StoragePolicySatisfier "
               + " as {} set to {} and {} set to {}.",
           DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled,
-          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, spsEnabled);
+          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, spsMode);
       return;
     } else if (sps.isRunning()) {
-      LOG.info("Storage policy satisfier is already running.");
+      LOG.info("Storage policy satisfier is already running"
+          + " as internal service.");
       return;
     }
-    sps.start(false);
+    // starting internal SPS service
+    if (spsMode == StoragePolicySatisfierMode.INTERNAL) {
+      sps.start(false, spsMode);
+    }
   }
 
   /**
@@ -5046,11 +5052,13 @@ public class BlockManager implements BlockStatsMXBean {
    *          pending SPS work
    */
   public void stopSPS(boolean forceStop) {
-    if (!(storagePolicyEnabled && spsEnabled)) {
+    if (!(storagePolicyEnabled
+        && (spsMode != StoragePolicySatisfierMode.NONE))) {
       LOG.info("Storage policy satisfier is not enabled.");
       return;
     } else if (!sps.isRunning()) {
-      LOG.info("Storage policy satisfier is already stopped.");
+      removeAllSPSPathIds();
+      LOG.info("Storage policy satisfier is not running.");
       return;
     }
 
@@ -5060,39 +5068,75 @@ public class BlockManager implements BlockStatsMXBean {
   /**
    * Enable storage policy satisfier by starting its service.
    */
-  public void enableSPS() {
+  public void enableInternalSPS() {
     if (!storagePolicyEnabled){
       LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.",
           DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
       return;
     }
-    spsEnabled = true;
     if (sps.isRunning()) {
-      LOG.info("Storage policy satisfier is already running.");
+      LOG.info("Storage policy satisfier is already running as SPS mode:{}.",
+          spsMode);
       return;
     }
-    sps.start(true);
+    updateSPSMode(StoragePolicySatisfierMode.INTERNAL);
+    sps.start(true, spsMode);
   }
 
   /**
-   * Disable the storage policy satisfier by stopping its services.
+   * Enable storage policy satisfier by starting its service.
    */
-  public void disableSPS() {
-    spsEnabled = false;
-    if (!sps.isRunning()) {
-      LOG.info("Storage policy satisfier is already stopped.");
+  public void enableExternalSPS() {
+    if (!storagePolicyEnabled){
+      LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.",
+          DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
+      return;
+    }
+    if (spsMode == StoragePolicySatisfierMode.EXTERNAL) {
+      LOG.info("Storage policy satisfier is already enabled as SPS mode:{}.",
+          spsMode);
       return;
     }
+    updateSPSMode(StoragePolicySatisfierMode.EXTERNAL);
+    sps.stopGracefully();
+  }
 
-    LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
-        + "stop it.");
-    sps.disable(true);
+  private void updateSPSMode(StoragePolicySatisfierMode newSpsMode) {
+    LOG.debug("Updating SPS service status, current mode:{}, new mode:{}",
+        spsMode, newSpsMode);
+    spsMode = newSpsMode;
+  }
+
+  /**
+   * Disable the storage policy satisfier by stopping its services.
+   */
+  public void disableSPS() {
+    switch (spsMode) {
+    case NONE:
+      break;
+    case INTERNAL:
+    case EXTERNAL:
+      if (!sps.isRunning()) {
+        LOG.info("Storage policy satisfier is already stopped.");
+      } else {
+        LOG.info("Stopping StoragePolicySatisfier mode {}, as admin "
+            + "requested to stop it.", spsMode);
+        sps.disable(true);
+      }
+      removeAllSPSPathIds();
+      break;
+    default:
+      // nothing
+      break;
+    }
+    updateSPSMode(StoragePolicySatisfierMode.NONE);
   }
 
   /**
    * Timed wait to stop storage policy satisfier daemon threads.
    */
   public void stopSPSGracefully() {
+    removeAllSPSPathIds();
     sps.stopGracefully();
   }
   /**
@@ -5149,10 +5193,17 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * @return true if sps enabled.
+   * @return true if sps is running as an internal service or external service.
    */
   public boolean isSPSEnabled() {
-    return spsEnabled;
+    return spsMode == StoragePolicySatisfierMode.INTERNAL
+        || spsMode == StoragePolicySatisfierMode.EXTERNAL;
   }
 
+  /**
+   * @return sps service mode.
+   */
+  public StoragePolicySatisfierMode getSPSMode() {
+    return spsMode;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 58ac89b..5871a7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -209,6 +209,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -2246,8 +2247,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             DFS_STORAGE_POLICY_ENABLED_KEY));
       }
 
-      if (blockManager.getStoragePolicySatisfier() == null
-          || !blockManager.getStoragePolicySatisfier().isRunning()) {
+      if (!blockManager.isSPSEnabled()
+          || (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
+              && !blockManager.getStoragePolicySatisfier().isRunning())) {
         throw new UnsupportedActionException(
             "Cannot request to satisfy storage policy "
                 + "when storage policy satisfier feature has been disabled"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
index acc23e5..abf6a4c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
@@ -263,7 +263,7 @@ public abstract class FSTreeTraverser {
   /**
    * Check whether current batch can be submitted for the processing.
    *
-   * @return true if batch size meets meet the condition, otherwise false.
+   * @return true if batch size meets the condition, otherwise false.
    */
   protected abstract boolean canSubmitCurrentBatch();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 6a8efe5..58b8c1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
 import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@ -160,7 +161,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAUL
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.FS_PROTECTED_DIRECTORIES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
@@ -295,7 +296,7 @@ public class NameNode extends ReconfigurableBase implements
           DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
           FS_PROTECTED_DIRECTORIES,
           HADOOP_CALLER_CONTEXT_ENABLED_KEY,
-          DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY));
+          DFS_STORAGE_POLICY_SATISFIER_MODE_KEY));
 
   private static final String USAGE = "Usage: hdfs namenode ["
       + StartupOption.BACKUP.getName() + "] | \n\t["
@@ -2073,7 +2074,7 @@ public class NameNode extends ReconfigurableBase implements
       return reconfCallerContextEnabled(newVal);
     } else if (property.equals(ipcClientRPCBackoffEnable)) {
       return reconfigureIPCBackoffEnabled(newVal);
-    } else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY)) {
+    } else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) {
       return reconfigureSPSEnabled(newVal, property);
     } else {
       throw new ReconfigurationException(property, newVal, getConf().get(
@@ -2160,13 +2161,13 @@ public class NameNode extends ReconfigurableBase implements
 
   String reconfigureSPSEnabled(String newVal, String property)
       throws ReconfigurationException {
-    if (newVal == null || !(newVal.equalsIgnoreCase(Boolean.TRUE.toString())
-        || newVal.equalsIgnoreCase(Boolean.FALSE.toString()))) {
+    if (newVal == null
+        || StoragePolicySatisfierMode.fromString(newVal) == null) {
       throw new ReconfigurationException(property, newVal,
           getConf().get(property),
           new HadoopIllegalArgumentException(
-              "For enabling or disabling storage policy satisfier, "
-                  + "we must pass true/false only"));
+              "For enabling or disabling storage policy satisfier, we must "
+                  + "pass either none/internal/external string value only"));
     }
 
     if (!isActiveState()) {
@@ -2175,12 +2176,21 @@ public class NameNode extends ReconfigurableBase implements
           "Enabling or disabling storage policy satisfier service on "
               + state + " NameNode is not allowed"));
     }
-
-    boolean enableSPS = Boolean.parseBoolean(newVal);
-    if (enableSPS) {
-      namesystem.getBlockManager().enableSPS();
-    } else {
+    StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
+        .fromString(newVal);
+    switch(mode){
+    case NONE:
       namesystem.getBlockManager().disableSPS();
+      break;
+    case INTERNAL:
+      namesystem.getBlockManager().enableInternalSPS();
+      break;
+    case EXTERNAL:
+      namesystem.getBlockManager().enableExternalSPS();
+      break;
+    default:
+      // nothing
+      break;
     }
     return newVal;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index b141502..39c50a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -257,7 +257,7 @@ public class BlockStorageMovementNeeded {
     // File's directly added to storageMovementNeeded, So try to remove
     // xAttr for file
     ItemInfo itemInfo;
-    while ((itemInfo = storageMovementNeeded.poll()) != null) {
+    while ((itemInfo = get()) != null) {
       try {
         // Remove xAttr for file
         if (!itemInfo.isDir()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index aed684a..c658812 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -102,6 +102,9 @@ public class IntraSPSNameNodeContext implements Context {
     namesystem.readLock();
     try {
       BlockCollection bc = namesystem.getBlockCollection(inodeID);
+      if (bc == null) {
+        return false;
+      }
       return blockManager.hasLowRedundancyBlocks(bc);
     } finally {
       namesystem.readUnlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index ecc6ceb..ceec3f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 
 /**
@@ -55,8 +56,9 @@ public interface SPSService {
    * @param reconfigStart
    *          - to indicate whether the SPS startup requested from
    *          reconfiguration service
+   * @param spsMode sps service mode
    */
-  void start(boolean reconfigStart);
+  void start(boolean reconfigStart, StoragePolicySatisfierMode spsMode);
 
   /**
    * Stops the SPS service gracefully. Timed wait to stop storage policy

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 9ba8af7..33ad6f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@@ -82,6 +83,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
   private BlockStorageMovementNeeded storageMovementNeeded;
   private BlockStorageMovementAttemptedItems storageMovementsMonitor;
   private volatile boolean isRunning = false;
+  private volatile StoragePolicySatisfierMode spsMode =
+      StoragePolicySatisfierMode.NONE;
   private int spsWorkMultiplier;
   private long blockCount = 0L;
   private int blockMovementMaxRetry;
@@ -152,9 +155,17 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
    * movements monitor for retry the attempts if needed.
    */
   @Override
-  public synchronized void start(boolean reconfigStart) {
+  public synchronized void start(boolean reconfigStart,
+      StoragePolicySatisfierMode serviceMode) {
+    if (serviceMode == StoragePolicySatisfierMode.NONE) {
+      LOG.error("Can't start StoragePolicySatisfier for the given mode:{}",
+          serviceMode);
+      return;
+    }
     isRunning = true;
-    if (ctxt.isMoverRunning()) {
+    this.spsMode = serviceMode;
+    if (spsMode == StoragePolicySatisfierMode.INTERNAL
+        && ctxt.isMoverRunning()) {
       isRunning = false;
       LOG.error(
           "Stopping StoragePolicySatisfier thread " + "as Mover ID file "
@@ -928,7 +939,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
 
   @Override
   public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) {
-    storageMovementNeeded.add(trackInfo);
+    storageMovementNeeded.add(trackInfo, scanCompleted);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
index 597a7d3..964ee8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.sps;
  */
 
 import java.io.IOException;
+import java.util.ArrayList;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -48,8 +49,7 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
   private SPSService service;
   private int maxQueueLimitToScan;
 
-  public ExternalSPSFileIDCollector(Context cxt, SPSService service,
-      int batchSize) {
+  public ExternalSPSFileIDCollector(Context cxt, SPSService service) {
     this.cxt = cxt;
     this.service = service;
     this.maxQueueLimitToScan = service.getConf().getInt(
@@ -74,7 +74,8 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
    * Recursively scan the given path and add the file info to SPS service for
    * processing.
    */
-  private void processPath(long startID, String fullPath) {
+  private long processPath(long startID, String fullPath) {
+    long pendingWorkCount = 0; // to be satisfied file counter
     for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
       final DirectoryListing children;
       try {
@@ -82,14 +83,14 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
       } catch (IOException e) {
         LOG.warn("Failed to list directory " + fullPath
             + ". Ignore the directory and continue.", e);
-        return;
+        return pendingWorkCount;
       }
       if (children == null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("The scanning start dir/sub dir " + fullPath
               + " does not have childrens.");
         }
-        return;
+        return pendingWorkCount;
       }
 
       for (HdfsFileStatus child : children.getPartialListing()) {
@@ -97,13 +98,14 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
           service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()),
               false);
           checkProcessingQueuesFree();
+          pendingWorkCount++; // increment to be satisfied file count
         } else {
           String fullPathStr = child.getFullName(fullPath);
           if (child.isDirectory()) {
             if (!fullPathStr.endsWith(Path.SEPARATOR)) {
               fullPathStr = fullPathStr + Path.SEPARATOR;
             }
-            processPath(startID, fullPathStr);
+            pendingWorkCount += processPath(startID, fullPathStr);
           }
         }
       }
@@ -111,7 +113,7 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
       if (children.hasMore()) {
         lastReturnedName = children.getLastName();
       } else {
-        return;
+        return pendingWorkCount;
       }
     }
   }
@@ -149,8 +151,20 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
     if (dfs == null) {
       dfs = getFS(service.getConf());
     }
-    processPath(inodeId, cxt.getFilePath(inodeId));
-    service.markScanCompletedForPath(inodeId);
+    long pendingSatisfyItemsCount = processPath(inodeId,
+        cxt.getFilePath(inodeId));
+    // Check whether the given path contains any item to be tracked
+    // or the no to be satisfied paths. In case of empty list, add the given
+    // inodeId to the 'pendingWorkForDirectory' with empty list so that later
+    // SPSPathIdProcessor#run function will remove the SPS hint considering that
+    // this path is already satisfied the storage policy.
+    if (pendingSatisfyItemsCount <= 0) {
+      LOG.debug("There is no pending items to satisfy the given path "
+          + "inodeId:{}", inodeId);
+      service.addAllFileIdsToProcess(inodeId, new ArrayList<>(), true);
+    } else {
+      service.markScanCompletedForPath(inodeId);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
index ee90eff..3a2ad48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
@@ -341,12 +341,15 @@ public class StoragePolicyAdmin extends Configured implements Tool {
     }
   }
 
-  /** Command to check storage policy satisfier status. */
-  private static class IsSatisfierRunningCommand
+  /**
+   * Command to check storage policy satisfier status running internal(inside)
+   * Namenode.
+   */
+  private static class IsInternalSatisfierRunningCommand
       implements AdminHelper.Command {
     @Override
     public String getName() {
-      return "-isSatisfierRunning";
+      return "-isInternalSatisfierRunning";
     }
 
     @Override
@@ -356,8 +359,9 @@ public class StoragePolicyAdmin extends Configured implements Tool {
 
     @Override
     public String getLongUsage() {
-      return getShortUsage() + "\n" +
-          "Check the status of Storage Policy Statisfier.\n\n";
+      return getShortUsage() + "\n"
+          + "Check the status of Storage Policy Statisfier"
+          + " running inside Namenode.\n\n";
     }
 
     @Override
@@ -435,6 +439,6 @@ public class StoragePolicyAdmin extends Configured implements Tool {
       new GetStoragePolicyCommand(),
       new UnsetStoragePolicyCommand(),
       new SatisfyStoragePolicyCommand(),
-      new IsSatisfierRunningCommand()
+      new IsInternalSatisfierRunningCommand()
   };
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index a8fb3aa..729642a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4353,13 +4353,16 @@
 </property>
 
 <property>
-  <name>dfs.storage.policy.satisfier.enabled</name>
-  <value>false</value>
+  <name>dfs.storage.policy.satisfier.mode</name>
+  <value>none</value>
   <description>
-    If true, StoragePolicySatisfier will be started along with active namenode.
+    Following values are supported - internal, external, none.
+    If internal, StoragePolicySatisfier will be enabled and started along with active namenode.
+    If external, StoragePolicySatisfier will be enabled and started as an independent service outside namenode.
+    If none, StoragePolicySatisfier is disabled.
     By default, StoragePolicySatisfier is disabled.
-    Administrator can dynamically enable or disable StoragePolicySatisfier by using reconfiguration option.
-    Dynamic enabling/disabling option can be achieved in the following way.
+    Administrator can dynamically change StoragePolicySatisfier mode by using reconfiguration option.
+    Dynamic mode change can be achieved in the following way.
     1. Edit/update this configuration property values in hdfs-site.xml
     2. Execute the reconfig command on hadoop command line prompt.
        For example:$hdfs -reconfig namenode nn_host:port start

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index eecb264..6b52c8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -106,7 +106,7 @@ Following 2 options will allow users to move the blocks based on new policy set.
 When user changes the storage policy on a file/directory, user can call `HdfsAdmin` API `satisfyStoragePolicy()` to move the blocks as per the new policy set.
 The SPS daemon thread runs along with namenode and periodically scans for the storage mismatches between new policy set and the physical blocks placed. This will only track the files/directories for which user invoked satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, then it will schedule block movement tasks to datanodes. If there are any failures in movement, the SPS will re-attempt by sending new block movement tasks.
 
-SPS can be enabled and disabled dynamically without restarting the Namenode.
+SPS can be enabled as internal service to Namenode or as an external service outside Namenode or disabled dynamically without restarting the Namenode.
 
 Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285)
 
@@ -123,8 +123,9 @@ Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HD
 
 ####Configurations:
 
-*   **dfs.storage.policy.satisfier.enabled** - Used to enable or disable SPS. Configuring true represents SPS is
-   enabled and vice versa.
+*   **dfs.storage.policy.satisfier.mode** - Used to enable(internal service inside NN or external service outside NN) or disable SPS.
+   Following string values are supported - `internal`, `external`, `none`. Configuring `internal` or `external` value represents SPS is enable and `none` to disable.
+   The default value is `none`.
 
 *   **dfs.storage.policy.satisfier.recheck.timeout.millis** - A timeout to re-check the processed block storage movement
    command results from Datanodes.
@@ -151,7 +152,7 @@ Note that, when both -p and -f options are omitted, the default path is the root
 
 ####Administrator notes:
 
-`StoragePolicySatisfier` and `Mover tool` cannot run simultaneously. If a Mover instance is already triggered and running, SPS will be disabled while starting. In that case, administrator should make sure, Mover execution finished and then enable SPS again. Similarly when SPS enabled already, Mover cannot be run. If administrator is looking to run Mover tool explicitly, then he/she should make sure to disable SPS first and then run Mover. Please look at the commands section to know how to enable or disable SPS dynamically.
+`StoragePolicySatisfier` and `Mover tool` cannot run simultaneously. If a Mover instance is already triggered and running, SPS will be disabled while starting. In that case, administrator should make sure, Mover execution finished and then enable(internal service inside NN or external service outside NN) SPS again. Similarly when SPS enabled already, Mover cannot be run. If administrator is looking to run Mover tool explicitly, then he/she should make sure to disable SPS first and then run Mover. Please look at the commands section to know how to enable(internal service inside NN or external service outside NN) or disable SPS dynamically.
 
 Storage Policy Commands
 -----------------------
@@ -226,14 +227,14 @@ Schedule blocks to move based on file's/directory's current storage policy.
 
 ### SPS Running Status
 
-Check the running status of Storage Policy Satisfier in namenode. If it is running, return 'yes'. Otherwise return 'no'.
+Check the running status of Storage Policy Satisfier service in namenode. If it is running, return 'yes'. Otherwise return 'no'.
 
 * Command:
 
-        hdfs storagepolicies -isSatisfierRunning
+        hdfs storagepolicies -isInternalSatisfierRunning
 
-### Enable or Disable SPS without restarting Namenode
-If administrator wants to enable or disable SPS feature while Namenode is running, first he/she needs to update the desired value(true or false) for the configuration item `dfs.storage.policy.satisfier.enabled` in configuration file (`hdfs-site.xml`) and then run the following Namenode reconfig command
+### Enable(internal service inside NN or external service outside NN) or Disable SPS without restarting Namenode
+If administrator wants to switch modes of SPS feature while Namenode is running, first he/she needs to update the desired value(internal or external or none) for the configuration item `dfs.storage.policy.satisfier.mode` in configuration file (`hdfs-site.xml`) and then run the following Namenode reconfig command
 
 +       hdfs dfsadmin -reconfig namenode <host:ipc_port> start
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index 3681cae..5f41a86 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -66,8 +67,8 @@ public class TestStoragePolicySatisfyWorker {
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
         1L);
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
-    conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
+    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL.toString());
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index ca1b5eb..9f5e121 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
 import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
@@ -113,8 +114,8 @@ public class TestMover {
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
         1L);
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
-    conf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
   }
 
   static Mover newMover(Configuration conf) throws IOException {
@@ -135,8 +136,8 @@ public class TestMover {
   @Test
   public void testScheduleSameBlock() throws IOException {
     final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(4).build();
     try {
@@ -252,8 +253,8 @@ public class TestMover {
   @Test
   public void testMoverCli() throws Exception {
     final Configuration clusterConf = new HdfsConfiguration();
-    clusterConf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
     final MiniDFSCluster cluster = new MiniDFSCluster
         .Builder(clusterConf).numDataNodes(0).build();
     try {
@@ -287,8 +288,8 @@ public class TestMover {
   @Test
   public void testMoverCliWithHAConf() throws Exception {
     final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
     final MiniDFSCluster cluster = new MiniDFSCluster
         .Builder(conf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology())
@@ -312,15 +313,15 @@ public class TestMover {
   @Test
   public void testMoverCliWithFederation() throws Exception {
     final Configuration clusterConf = new HdfsConfiguration();
-    clusterConf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
     final MiniDFSCluster cluster = new MiniDFSCluster
         .Builder(clusterConf)
         .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
         .numDataNodes(0).build();
     final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
     DFSTestUtil.setFederatedConfiguration(cluster, conf);
     try {
       Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
@@ -365,15 +366,15 @@ public class TestMover {
   @Test
   public void testMoverCliWithFederationHA() throws Exception {
     final Configuration clusterConf = new HdfsConfiguration();
-    clusterConf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
     final MiniDFSCluster cluster = new MiniDFSCluster
         .Builder(clusterConf)
         .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(3))
         .numDataNodes(0).build();
     final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    clusterConf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
     DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
     try {
       Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
@@ -437,8 +438,8 @@ public class TestMover {
   public void testMoveWhenStoragePolicyNotSatisfying() throws Exception {
     // HDFS-8147
     final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(3)
         .storageTypes(
@@ -467,8 +468,8 @@ public class TestMover {
   @Test(timeout = 300000)
   public void testMoveWhenStoragePolicySatisfierIsRunning() throws Exception {
     final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, true);
+    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL.toString());
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(3)
         .storageTypes(
@@ -544,8 +545,8 @@ public class TestMover {
         1L);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
         false);
-    conf.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
   }
 
   @Test(timeout = 300000)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
index f29aa09..0e9784b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
@@ -96,8 +96,8 @@ public class TestStorageMover {
     DEFAULT_CONF.setLong(
         DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 2L);
     DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
-    DEFAULT_CONF.setBoolean(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    DEFAULT_CONF.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
 
     DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite();
     HOT = DEFAULT_POLICIES.getPolicy(HdfsConstants.HOT_STORAGE_POLICY_NAME);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
index b4a461a..85a101f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -45,8 +46,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
 
@@ -241,22 +242,25 @@ public class TestNameNodeReconfigure {
     cluster.waitActive();
 
     final NameNode nameNode = cluster.getNameNode();
-    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false);
+    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE, false);
 
-    // enable SPS
-    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        "true");
+    // enable SPS internally by keeping DFS_STORAGE_POLICY_ENABLED_KEY
+    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL.toString());
 
     // Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled.
     assertEquals("SPS shouldn't start as "
         + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled", false,
             nameNode.getNamesystem().getBlockManager()
             .isStoragePolicySatisfierRunning());
+    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL, false);
 
-    assertEquals(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY + " has wrong value",
-        true, nameNode.getConf()
-            .getBoolean(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-            DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT));
+    assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
+        StoragePolicySatisfierMode.INTERNAL.toString(), nameNode.getConf()
+            .get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+            DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT));
   }
 
   /**
@@ -267,42 +271,42 @@ public class TestNameNodeReconfigure {
       throws ReconfigurationException {
     final NameNode nameNode = cluster.getNameNode();
 
-    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        false);
+    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE, false);
     // try invalid values
     try {
-      nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+      nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
           "text");
       fail("ReconfigurationException expected");
     } catch (ReconfigurationException e) {
       GenericTestUtils.assertExceptionContains(
-          "For enabling or disabling storage policy satisfier, "
-              + "we must pass true/false only",
+          "For enabling or disabling storage policy satisfier, we must "
+              + "pass either none/internal/external string value only",
           e.getCause());
     }
 
-    // enable SPS
-    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        "true");
-
-    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
+    // enable internal SPS
+    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL.toString());
+    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL, true);
 
     // disable SPS
-    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        "false");
-    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        false);
-
-    // enable SPS
-    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        "true");
-    assertEquals(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY + " has wrong value",
-        true, nameNode.getNamesystem().getBlockManager()
+    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
+    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE, false);
+
+    // enable external SPS
+    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.EXTERNAL.toString());
+    assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
+        false, nameNode.getNamesystem().getBlockManager()
             .isStoragePolicySatisfierRunning());
-    assertEquals(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY + " has wrong value",
-        true, nameNode.getConf()
-            .getBoolean(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false));
+    assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
+        StoragePolicySatisfierMode.EXTERNAL.toString(),
+        nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+            DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT));
   }
 
   /**
@@ -314,10 +318,10 @@ public class TestNameNodeReconfigure {
     final NameNode nameNode = cluster.getNameNode();
 
     // disable SPS
-    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        "false");
-    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        false);
+    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE.toString());
+    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.NONE, false);
 
     Path filePath = new Path("/testSPS");
     DistributedFileSystem fileSystem = cluster.getFileSystem();
@@ -334,23 +338,26 @@ public class TestNameNodeReconfigure {
               + "or use Mover tool.", e);
     }
 
-    // revert to default
-    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        "true");
-    assertEquals(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY + " has wrong value",
+    // start internal
+    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL.toString());
+    assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
         true, nameNode.getNamesystem().getBlockManager()
             .isStoragePolicySatisfierRunning());
-    assertEquals(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY + " has wrong value",
-        true, nameNode.getConf()
-            .getBoolean(DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, false));
+    assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
+        StoragePolicySatisfierMode.INTERNAL.toString(),
+        nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+            DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT));
   }
 
   void verifySPSEnabled(final NameNode nameNode, String property,
-      boolean expected) {
-    assertEquals(property + " has wrong value", expected, nameNode
+      StoragePolicySatisfierMode expected, boolean isSatisfierRunning) {
+    assertEquals(property + " has wrong value", isSatisfierRunning, nameNode
         .getNamesystem().getBlockManager().isStoragePolicySatisfierRunning());
-    assertEquals(property + " has wrong value", expected, nameNode.getConf()
-        .getBoolean(property, DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT));
+    String actual = nameNode.getConf().get(property,
+        DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+    assertEquals(property + " has wrong value", expected,
+        StoragePolicySatisfierMode.fromString(actual));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index c301b8a..b84214c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -102,8 +103,8 @@ public class TestPersistentStoragePolicySatisfier {
     conf.set(
         DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
         "3000");
-    conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
+    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL.toString());
     final int dnNumber = storageTypes.length;
     final short replication = 3;
     MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
@@ -292,8 +293,8 @@ public class TestPersistentStoragePolicySatisfier {
     MiniDFSCluster haCluster = null;
     try {
       conf = new HdfsConfiguration();
-      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
+      conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+          StoragePolicySatisfierMode.INTERNAL.toString());
       haCluster = new MiniDFSCluster
           .Builder(conf)
           .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
index b0fd3af..e89cfa3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@@ -65,8 +66,8 @@ public class TestStoragePolicySatisfierWithHA {
 
   private void createCluster() throws IOException {
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-    config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
+    config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL.toString());
     startCluster(config, allDiskTypes, numOfDatanodes, storagesPerDatanode,
         capacity);
     dfs = cluster.getFileSystem(nnIndex);
@@ -133,13 +134,14 @@ public class TestStoragePolicySatisfierWithHA {
 
       try {
         cluster.getNameNode(0).reconfigurePropertyImpl(
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "false");
+            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+            StoragePolicySatisfierMode.EXTERNAL.toString());
         Assert.fail("It's not allowed to enable or disable"
             + " StoragePolicySatisfier on Standby NameNode");
       } catch (ReconfigurationException e) {
         GenericTestUtils.assertExceptionContains("Could not change property "
-            + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY
-            + " from 'true' to 'false'", e);
+            + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY
+            + " from 'INTERNAL' to 'EXTERNAL'", e);
         GenericTestUtils.assertExceptionContains(
             "Enabling or disabling storage policy satisfier service on "
                 + "standby NameNode is not allowed", e.getCause());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7309b369/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 8115661..935d4f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -122,6 +123,13 @@ public class TestStoragePolicySatisfier {
   }
 
   /**
+   * @return hdfs cluster.
+   */
+  public MiniDFSCluster getCluster() {
+    return hdfsCluster;
+  }
+
+  /**
    * Gets distributed file system.
    *
    * @throws IOException
@@ -139,8 +147,6 @@ public class TestStoragePolicySatisfier {
 
   public void createCluster() throws IOException {
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-    config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
     hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
         STORAGES_PER_DATANODE, CAPACITY);
     getFS();
@@ -150,6 +156,8 @@ public class TestStoragePolicySatisfier {
   @Before
   public void setUp() {
     config = new HdfsConfiguration();
+    config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.INTERNAL.toString());
   }
 
   @Test(timeout = 300000)
@@ -404,8 +412,7 @@ public class TestStoragePolicySatisfier {
       final String nonExistingFile = "/noneExistingFile";
       hdfsCluster.getConfiguration(0).
           setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
-      hdfsCluster.restartNameNodes();
-      hdfsCluster.waitActive();
+      restartNamenode();
       HdfsAdmin hdfsAdmin =
           new HdfsAdmin(FileSystem.getDefaultUri(config), config);
 
@@ -423,8 +430,8 @@ public class TestStoragePolicySatisfier {
 
       hdfsCluster.getConfiguration(0).
           setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true);
-      hdfsCluster.restartNameNodes();
-      hdfsCluster.waitActive();
+      restartNamenode();
+
       hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(config), config);
       try {
         hdfsAdmin.satisfyStoragePolicy(new Path(nonExistingFile));
@@ -552,7 +559,8 @@ public class TestStoragePolicySatisfier {
       createCluster();
       // Stop SPS
       hdfsCluster.getNameNode().reconfigureProperty(
-          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "false");
+          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+          StoragePolicySatisfierMode.NONE.toString());
       running = hdfsCluster.getFileSystem()
           .getClient().isStoragePolicySatisfierRunning();
       Assert.assertFalse("SPS should stopped as configured.", running);
@@ -563,7 +571,8 @@ public class TestStoragePolicySatisfier {
 
       // Restart SPS
       hdfsCluster.getNameNode().reconfigureProperty(
-          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "true");
+          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+          StoragePolicySatisfierMode.INTERNAL.toString());
 
       running = hdfsCluster.getFileSystem()
           .getClient().isStoragePolicySatisfierRunning();
@@ -578,7 +587,8 @@ public class TestStoragePolicySatisfier {
 
       // Restart SPS again
       hdfsCluster.getNameNode().reconfigureProperty(
-          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "true");
+          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+          StoragePolicySatisfierMode.INTERNAL.toString());
       running = hdfsCluster.getFileSystem()
           .getClient().isStoragePolicySatisfierRunning();
       Assert.assertTrue("SPS should be running as "
@@ -588,7 +598,7 @@ public class TestStoragePolicySatisfier {
       doTestWhenStoragePolicySetToCOLD();
     } catch (ReconfigurationException e) {
       throw new IOException("Exception when reconfigure "
-          + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, e);
+          + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, e);
     } finally {
       if (out != null) {
         out.close();
@@ -610,7 +620,7 @@ public class TestStoragePolicySatisfier {
       // Simulate the case by creating MOVER_ID file
       DFSTestUtil.createFile(hdfsCluster.getFileSystem(),
           HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
-      hdfsCluster.restartNameNode(true);
+      restartNamenode();
       boolean running = hdfsCluster.getFileSystem()
           .getClient().isStoragePolicySatisfierRunning();
       Assert.assertTrue("SPS should be running as "
@@ -630,14 +640,7 @@ public class TestStoragePolicySatisfier {
   public void testMoveWithBlockPinning() throws Exception {
     try{
       config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
-      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
-      hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3)
-          .storageTypes(
-              new StorageType[][] {{StorageType.DISK, StorageType.DISK},
-                  {StorageType.DISK, StorageType.DISK},
-                  {StorageType.DISK, StorageType.DISK}})
-          .build();
+      hdfsCluster = startCluster(config, allDiskTypes, 3, 2, CAPACITY);
 
       hdfsCluster.waitActive();
       dfs = hdfsCluster.getFileSystem();
@@ -699,8 +702,6 @@ public class TestStoragePolicySatisfier {
     try {
       int numOfDns = 5;
       config.setLong("dfs.block.size", 1024);
-      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
       allDiskTypes =
           new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
               {StorageType.DISK, StorageType.DISK},
@@ -743,8 +744,6 @@ public class TestStoragePolicySatisfier {
             {StorageType.DISK, StorageType.SSD},
             {StorageType.DISK, StorageType.RAM_DISK}};
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-    config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
     try {
       hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
           STORAGES_PER_DATANODE, CAPACITY);
@@ -781,8 +780,6 @@ public class TestStoragePolicySatisfier {
             {StorageType.DISK, StorageType.DISK}};
 
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-    config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
     try {
       hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
           STORAGES_PER_DATANODE, CAPACITY);
@@ -816,8 +813,6 @@ public class TestStoragePolicySatisfier {
         {StorageType.DISK, StorageType.ARCHIVE}};
 
     try {
-      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
       hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
           STORAGES_PER_DATANODE, CAPACITY);
       dfs = hdfsCluster.getFileSystem();
@@ -861,8 +856,6 @@ public class TestStoragePolicySatisfier {
             {StorageType.DISK, StorageType.SSD},
             {StorageType.DISK, StorageType.DISK}};
     config.setLong("dfs.block.size", 2 * DEFAULT_BLOCK_SIZE);
-    config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
     long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1);
     try {
       hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
@@ -949,8 +942,6 @@ public class TestStoragePolicySatisfier {
         1L);
     config.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
         false);
-    config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
     try {
       hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
           STORAGES_PER_DATANODE, CAPACITY);
@@ -1003,29 +994,25 @@ public class TestStoragePolicySatisfier {
    */
   @Test(timeout = 300000)
   public void testSPSWhenFileLengthIsZero() throws Exception {
-    MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
-      cluster.waitActive();
-      DistributedFileSystem fs = cluster.getFileSystem();
+      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
       Path filePath = new Path("/zeroSizeFile");
       DFSTestUtil.createFile(fs, filePath, 0, (short) 1, 0);
-      FSEditLog editlog = cluster.getNameNode().getNamesystem().getEditLog();
+      FSEditLog editlog = hdfsCluster.getNameNode().getNamesystem()
+          .getEditLog();
       long lastWrittenTxId = editlog.getLastWrittenTxId();
       fs.satisfyStoragePolicy(filePath);
       Assert.assertEquals("Xattr should not be added for the file",
           lastWrittenTxId, editlog.getLastWrittenTxId());
-      INode inode = cluster.getNameNode().getNamesystem().getFSDirectory()
+      INode inode = hdfsCluster.getNameNode().getNamesystem().getFSDirectory()
           .getINode(filePath.toString());
       Assert.assertTrue("XAttrFeature should be null for file",
           inode.getXAttrFeature() == null);
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      shutdownCluster();
     }
   }
 
@@ -1042,42 +1029,36 @@ public class TestStoragePolicySatisfier {
    */
   @Test(timeout = 300000)
   public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
-    MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
-      conf.set(DFSConfigKeys
+      config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
       StorageType[][] newtypes = new StorageType[][] {
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK}};
-      cluster = startCluster(conf, newtypes, 3, 2, CAPACITY);
-      cluster.waitActive();
-      DistributedFileSystem fs = cluster.getFileSystem();
+      hdfsCluster = startCluster(config, newtypes, 3, 2, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
       Path filePath = new Path("/zeroSizeFile");
       DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0);
       fs.setStoragePolicy(filePath, "COLD");
       List<DataNodeProperties> list = new ArrayList<>();
-      list.add(cluster.stopDataNode(0));
-      list.add(cluster.stopDataNode(0));
-      list.add(cluster.stopDataNode(0));
-      cluster.restartNameNodes();
-      cluster.restartDataNode(list.get(0), false);
-      cluster.restartDataNode(list.get(1), false);
-      cluster.waitActive();
+      list.add(hdfsCluster.stopDataNode(0));
+      list.add(hdfsCluster.stopDataNode(0));
+      list.add(hdfsCluster.stopDataNode(0));
+      restartNamenode();
+      hdfsCluster.restartDataNode(list.get(0), false);
+      hdfsCluster.restartDataNode(list.get(1), false);
+      hdfsCluster.waitActive();
       fs.satisfyStoragePolicy(filePath);
       DFSTestUtil.waitExpectedStorageType(filePath.toString(),
-          StorageType.ARCHIVE, 2, 30000, cluster.getFileSystem());
-      cluster.restartDataNode(list.get(2), false);
+          StorageType.ARCHIVE, 2, 30000, hdfsCluster.getFileSystem());
+      hdfsCluster.restartDataNode(list.get(2), false);
       DFSTestUtil.waitExpectedStorageType(filePath.toString(),
-          StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
+          StorageType.ARCHIVE, 3, 30000, hdfsCluster.getFileSystem());
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      shutdownCluster();
     }
   }
 
@@ -1091,12 +1072,8 @@ public class TestStoragePolicySatisfier {
    */
   @Test(timeout = 300000)
   public void testSPSWhenFileHasExcessRedundancyBlocks() throws Exception {
-    MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
-      conf.set(DFSConfigKeys
+      config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
       StorageType[][] newtypes = new StorageType[][] {
@@ -1105,10 +1082,9 @@ public class TestStoragePolicySatisfier {
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK}};
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5)
-          .storageTypes(newtypes).build();
-      cluster.waitActive();
-      DistributedFileSystem fs = cluster.getFileSystem();
+      hdfsCluster = startCluster(config, newtypes, 5, 2, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
       Path filePath = new Path("/zeroSizeFile");
       DFSTestUtil.createFile(fs, filePath, 1024, (short) 5, 0);
       fs.setReplication(filePath, (short) 3);
@@ -1117,13 +1093,11 @@ public class TestStoragePolicySatisfier {
       fs.setStoragePolicy(filePath, "COLD");
       fs.satisfyStoragePolicy(filePath);
       DFSTestUtil.waitExpectedStorageType(filePath.toString(),
-          StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
+          StorageType.ARCHIVE, 3, 30000, hdfsCluster.getFileSystem());
       assertFalse("Log output does not contain expected log message: ",
           logs.getOutput().contains("some of the blocks are low redundant"));
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      shutdownCluster();
     }
   }
 
@@ -1133,24 +1107,19 @@ public class TestStoragePolicySatisfier {
   @Test(timeout = 300000)
   public void testSPSForEmptyDirectory() throws IOException, TimeoutException,
       InterruptedException {
-    MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      cluster.waitActive();
-      DistributedFileSystem fs = cluster.getFileSystem();
+      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
       Path emptyDir = new Path("/emptyDir");
       fs.mkdirs(emptyDir);
       fs.satisfyStoragePolicy(emptyDir);
       // Make sure satisfy xattr has been removed.
       DFSTestUtil.waitForXattrRemoved("/emptyDir",
-          XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+          XATTR_SATISFY_STORAGE_POLICY, hdfsCluster.getNamesystem(), 30000);
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      shutdownCluster();
     }
   }
 
@@ -1159,14 +1128,11 @@ public class TestStoragePolicySatisfier {
    */
   @Test(timeout = 300000)
   public void testSPSForNonExistDirectory() throws Exception {
-    MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      cluster.waitActive();
-      DistributedFileSystem fs = cluster.getFileSystem();
+      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
       Path emptyDir = new Path("/emptyDir");
       try {
         fs.satisfyStoragePolicy(emptyDir);
@@ -1175,9 +1141,7 @@ public class TestStoragePolicySatisfier {
         // nothing to do
       }
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      shutdownCluster();
     }
   }
 
@@ -1186,13 +1150,10 @@ public class TestStoragePolicySatisfier {
    */
   @Test(timeout = 300000)
   public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
-    MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      cluster.waitActive();
+      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
+      hdfsCluster.waitActive();
       // Create directories
       /*
        *                   root
@@ -1203,7 +1164,7 @@ public class TestStoragePolicySatisfier {
        *                    |
        *                    O
        */
-      DistributedFileSystem fs = cluster.getFileSystem();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
       fs.mkdirs(new Path("/root/C/H/O"));
       fs.mkdirs(new Path("/root/A"));
       fs.mkdirs(new Path("/root/D"));
@@ -1212,11 +1173,9 @@ public class TestStoragePolicySatisfier {
       fs.satisfyStoragePolicy(new Path("/root"));
       // Make sure satisfy xattr has been removed.
       DFSTestUtil.waitForXattrRemoved("/root",
-          XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+          XATTR_SATISFY_STORAGE_POLICY, hdfsCluster.getNamesystem(), 30000);
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      shutdownCluster();
     }
   }
 
@@ -1232,8 +1191,6 @@ public class TestStoragePolicySatisfier {
           {StorageType.ARCHIVE, StorageType.SSD},
           {StorageType.DISK, StorageType.DISK}};
       config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
       hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
           STORAGES_PER_DATANODE, CAPACITY);
       dfs = hdfsCluster.getFileSystem();
@@ -1263,8 +1220,6 @@ public class TestStoragePolicySatisfier {
           {StorageType.ARCHIVE, StorageType.SSD},
           {StorageType.DISK, StorageType.DISK}};
       config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
       // Set queue max capacity
       config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
           5);
@@ -1461,8 +1416,6 @@ public class TestStoragePolicySatisfier {
     try {
       config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3);
       config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
       config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
@@ -1473,8 +1426,8 @@ public class TestStoragePolicySatisfier {
       StorageType[][] storagetypes = new StorageType[][] {
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK}};
-      hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
-          .storageTypes(storagetypes).build();
+
+      hdfsCluster = startCluster(config, storagetypes, 2, 2, CAPACITY);
       hdfsCluster.waitActive();
       dfs = hdfsCluster.getFileSystem();
 
@@ -1523,8 +1476,6 @@ public class TestStoragePolicySatisfier {
   @Test(timeout = 300000)
   public void testStoragePolicySatisfyPathStatus() throws Exception {
     try {
-      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
       config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
@@ -1535,8 +1486,7 @@ public class TestStoragePolicySatisfier {
       StorageType[][] storagetypes = new StorageType[][] {
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK}};
-      hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
-          .storageTypes(storagetypes).build();
+      hdfsCluster = startCluster(config, storagetypes, 2, 2, CAPACITY);
       hdfsCluster.waitActive();
       // BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(200000);
       dfs = hdfsCluster.getFileSystem();
@@ -1592,8 +1542,6 @@ public class TestStoragePolicySatisfier {
   @Test(timeout = 300000)
   public void testMaxRetryForFailedBlock() throws Exception {
     try {
-      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-          true);
       config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "1000");
@@ -1603,8 +1551,7 @@ public class TestStoragePolicySatisfier {
       StorageType[][] storagetypes = new StorageType[][] {
           {StorageType.DISK, StorageType.DISK},
           {StorageType.DISK, StorageType.DISK}};
-      hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
-          .storageTypes(storagetypes).build();
+      hdfsCluster = startCluster(config, storagetypes, 2, 2, CAPACITY);
       hdfsCluster.waitActive();
       dfs = hdfsCluster.getFileSystem();
 
@@ -1835,4 +1782,9 @@ public class TestStoragePolicySatisfier {
     cluster.waitActive();
     return cluster;
   }
+
+  public void restartNamenode() throws IOException {
+    hdfsCluster.restartNameNodes();
+    hdfsCluster.waitActive();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org