You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/12 10:22:25 UTC

[42/50] [abbrv] hadoop git commit: HDFS-13076: [SPS]: Cleanup work for HDFS-10285 merge. Contributed by Rakesh R.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index 18acb50..d9a93fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -32,34 +32,57 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
+import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.StripedFileTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
-import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.SecurityUtil;
@@ -67,29 +90,57 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Supplier;
 
 /**
  * Tests the external sps service plugins.
  */
-public class TestExternalStoragePolicySatisfier
-    extends TestStoragePolicySatisfier {
+public class TestExternalStoragePolicySatisfier {
+  private static final String ONE_SSD = "ONE_SSD";
+  private static final String COLD = "COLD";
   private StorageType[][] allDiskTypes =
       new StorageType[][]{{StorageType.DISK, StorageType.DISK},
           {StorageType.DISK, StorageType.DISK},
           {StorageType.DISK, StorageType.DISK}};
-  private NameNodeConnector nnc;
   private File keytabFile;
   private String principal;
   private MiniKdc kdc;
   private File baseDir;
+  private NameNodeConnector nnc;
   private StoragePolicySatisfier externalSps;
   private ExternalSPSContext externalCtxt;
+  private DistributedFileSystem dfs = null;
+  private MiniDFSCluster hdfsCluster = null;
+  private Configuration config = null;
+  private static final int NUM_OF_DATANODES = 3;
+  private static final int STORAGES_PER_DATANODE = 2;
+  private static final long CAPACITY = 2 * 256 * 1024 * 1024;
+  private static final String FILE = "/testMoveToSatisfyStoragePolicy";
+  private static final int DEFAULT_BLOCK_SIZE = 1024;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestExternalStoragePolicySatisfier.class);
+
+  @Before
+  public void setUp() {
+    config = new HdfsConfiguration();
+    config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.EXTERNAL.toString());
+    // Most of the tests are restarting DNs and NN. So, reduced refresh cycle to
+    // update latest datanodes.
+    config.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
+        3000);
+    config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        StoragePolicySatisfierMode.EXTERNAL.toString());
+  }
 
   @After
   public void destroy() throws Exception {
@@ -97,26 +148,49 @@ public class TestExternalStoragePolicySatisfier
       kdc.stop();
       FileUtil.fullyDelete(baseDir);
     }
+    if (hdfsCluster != null) {
+      hdfsCluster.shutdown();
+    }
   }
 
-  @Override
-  public void shutdownCluster() {
-    if (externalSps != null) {
-      externalSps.stopGracefully();
-    }
-    super.shutdownCluster();
+  /**
+   * Sets hdfs cluster.
+   */
+  private void setCluster(MiniDFSCluster cluster) {
+    this.hdfsCluster = cluster;
   }
 
-  @Override
-  public void setUp() {
-    super.setUp();
+  /**
+   * @return conf.
+   */
+  private Configuration getConf() {
+    return this.config;
+  }
 
-    getConf().set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.EXTERNAL.toString());
+  /**
+   * @return hdfs cluster.
+   */
+  private MiniDFSCluster getCluster() {
+    return hdfsCluster;
+  }
+
+  /**
+   * Gets distributed file system.
+   *
+   * @throws IOException
+   */
+  private DistributedFileSystem getFS() throws IOException {
+    this.dfs = hdfsCluster.getFileSystem();
+    return this.dfs;
+  }
+
+  private void shutdownCluster() {
+    if (externalSps != null) {
+      externalSps.stopGracefully();
+    }
   }
 
-  @Override
-  public void createCluster() throws IOException {
+  private void createCluster() throws IOException {
     getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
         STORAGES_PER_DATANODE, CAPACITY));
@@ -124,8 +198,7 @@ public class TestExternalStoragePolicySatisfier
     writeContent(FILE);
   }
 
-  @Override
-  public MiniDFSCluster startCluster(final Configuration conf,
+  private MiniDFSCluster startCluster(final Configuration conf,
       StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
       long nodeCapacity) throws IOException {
     long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
@@ -139,7 +212,8 @@ public class TestExternalStoragePolicySatisfier
         .storageTypes(storageTypes).storageCapacities(capacities).build();
     cluster.waitActive();
 
-    nnc = getNameNodeConnector(getConf());
+    nnc = DFSTestUtil.getNameNodeConnector(getConf(),
+        HdfsServerConstants.MOVER_ID_PATH, 1, false);
 
     externalSps = new StoragePolicySatisfier(getConf());
     externalCtxt = new ExternalSPSContext(externalSps, nnc);
@@ -149,7 +223,7 @@ public class TestExternalStoragePolicySatisfier
     return cluster;
   }
 
-  public void restartNamenode() throws IOException{
+  private void restartNamenode() throws IOException{
     if (externalSps != null) {
       externalSps.stopGracefully();
     }
@@ -163,60 +237,6 @@ public class TestExternalStoragePolicySatisfier
     externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
   }
 
-  private NameNodeConnector getNameNodeConnector(Configuration conf)
-      throws IOException {
-    final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-    Assert.assertEquals(1, namenodes.size());
-    final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH;
-    NameNodeConnector.checkOtherInstanceRunning(false);
-    while (true) {
-      try {
-        final List<NameNodeConnector> nncs = NameNodeConnector
-            .newNameNodeConnectors(namenodes,
-                StoragePolicySatisfier.class.getSimpleName(),
-                externalSPSPathId, conf,
-                NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
-        return nncs.get(0);
-      } catch (IOException e) {
-        LOG.warn("Failed to connect with namenode", e);
-        // Ignore
-      }
-
-    }
-  }
-
-  public void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
-      int timeout) throws TimeoutException, InterruptedException {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
-            expectedBlkMovAttemptedCount,
-            ((BlockStorageMovementAttemptedItems) (externalSps
-                .getAttemptedItemsMonitor())).getAttemptedItemsCount());
-        return ((BlockStorageMovementAttemptedItems) (externalSps
-            .getAttemptedItemsMonitor()))
-                .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
-      }
-    }, 100, timeout);
-  }
-
-  public void waitForBlocksMovementAttemptReport(
-      long expectedMovementFinishedBlocksCount, int timeout)
-          throws TimeoutException, InterruptedException {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        int actualCount = externalSps.getAttemptedItemsMonitor()
-            .getAttemptedItemsCount();
-        LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
-            expectedMovementFinishedBlocksCount, actualCount);
-        return actualCount
-            >= expectedMovementFinishedBlocksCount;
-      }
-    }, 100, timeout);
-  }
-
   private void initSecureConf(Configuration conf) throws Exception {
     String username = "externalSPS";
     baseDir = GenericTestUtils
@@ -344,22 +364,6 @@ public class TestExternalStoragePolicySatisfier
   }
 
   /**
-   * Test verifies status check when Satisfier is not running inside namenode.
-   */
-  @Test(timeout = 90000)
-  public void testStoragePolicySatisfyPathStatus() throws Exception {
-    createCluster();
-    DistributedFileSystem fs = getFS();
-    try {
-      fs.getClient().checkStoragePolicySatisfyPathStatus(FILE);
-      Assert.fail("Should throw exception as SPS is not running inside NN!");
-    } catch (IOException e) {
-      GenericTestUtils.assertExceptionContains("Satisfier is not running"
-          + " inside namenode, so status can't be returned.", e);
-    }
-  }
-
-  /**
    * Tests to verify that SPS should be able to start when the Mover ID file
    * is not being hold by a Mover. This can be the case when Mover exits
    * ungracefully without deleting the ID file from HDFS.
@@ -399,17 +403,9 @@ public class TestExternalStoragePolicySatisfier
   }
 
   /**
-   * Status won't be supported for external SPS, now. So, ignoring it.
-   */
-  @Ignore("Status is not supported for external SPS. So, ignoring it.")
-  public void testMaxRetryForFailedBlock() throws Exception {
-  }
-
-  /**
    * This test is specific to internal SPS. So, ignoring it.
    */
   @Ignore("This test is specific to internal SPS. So, ignoring it.")
-  @Override
   public void testTraverseWhenParentDeleted() throws Exception {
   }
 
@@ -417,7 +413,1238 @@ public class TestExternalStoragePolicySatisfier
    * This test is specific to internal SPS. So, ignoring it.
    */
   @Ignore("This test is specific to internal SPS. So, ignoring it.")
-  @Override
   public void testTraverseWhenRootParentDeleted() throws Exception {
   }
+
+
+  @Test(timeout = 300000)
+  public void testWhenStoragePolicySetToCOLD()
+      throws Exception {
+
+    try {
+      createCluster();
+      doTestWhenStoragePolicySetToCOLD();
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  private void doTestWhenStoragePolicySetToCOLD() throws Exception {
+    // Change policy to COLD
+    dfs.setStoragePolicy(new Path(FILE), COLD);
+
+    StorageType[][] newtypes =
+        new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE}};
+    startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+        STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+
+    hdfsCluster.triggerHeartbeats();
+    dfs.satisfyStoragePolicy(new Path(FILE));
+    // Wait till namenode notified about the block location details
+    DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 35000,
+        dfs);
+  }
+
+  @Test(timeout = 300000)
+  public void testWhenStoragePolicySetToALLSSD()
+      throws Exception {
+    try {
+      createCluster();
+      // Change policy to ALL_SSD
+      dfs.setStoragePolicy(new Path(FILE), "ALL_SSD");
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.SSD, StorageType.DISK},
+              {StorageType.SSD, StorageType.DISK},
+              {StorageType.SSD, StorageType.DISK}};
+
+      // Making sure SDD based nodes added to cluster. Adding SSD based
+      // datanodes.
+      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+      dfs.satisfyStoragePolicy(new Path(FILE));
+      hdfsCluster.triggerHeartbeats();
+      // Wait till StorgePolicySatisfier Identified that block to move to SSD
+      // areas
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 3, 30000, dfs);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  @Test(timeout = 300000)
+  public void testWhenStoragePolicySetToONESSD()
+      throws Exception {
+    try {
+      createCluster();
+      // Change policy to ONE_SSD
+      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
+
+      // Making sure SDD based nodes added to cluster. Adding SSD based
+      // datanodes.
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+      dfs.satisfyStoragePolicy(new Path(FILE));
+      hdfsCluster.triggerHeartbeats();
+      // Wait till StorgePolicySatisfier Identified that block to move to SSD
+      // areas
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests to verify that the block storage movement report will be propagated
+   * to Namenode via datanode heartbeat.
+   */
+  @Test(timeout = 300000)
+  public void testBlksStorageMovementAttemptFinishedReport() throws Exception {
+    try {
+      createCluster();
+      // Change policy to ONE_SSD
+      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
+
+      // Making sure SDD based nodes added to cluster. Adding SSD based
+      // datanodes.
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+      dfs.satisfyStoragePolicy(new Path(FILE));
+      hdfsCluster.triggerHeartbeats();
+
+      // Wait till the block is moved to SSD areas
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
+
+      waitForBlocksMovementAttemptReport(1, 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests to verify that multiple files are giving to satisfy storage policy
+   * and should work well altogether.
+   */
+  @Test(timeout = 300000)
+  public void testMultipleFilesForSatisfyStoragePolicy() throws Exception {
+    try {
+      createCluster();
+      List<String> files = new ArrayList<>();
+      files.add(FILE);
+
+      // Creates 4 more files. Send all of them for satisfying the storage
+      // policy together.
+      for (int i = 0; i < 4; i++) {
+        String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i;
+        files.add(file1);
+        writeContent(file1);
+      }
+      // Change policy to ONE_SSD
+      for (String fileName : files) {
+        dfs.setStoragePolicy(new Path(fileName), ONE_SSD);
+        dfs.satisfyStoragePolicy(new Path(fileName));
+      }
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
+
+      // Making sure SDD based nodes added to cluster. Adding SSD based
+      // datanodes.
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+      hdfsCluster.triggerHeartbeats();
+
+      for (String fileName : files) {
+        // Wait till the block is moved to SSD areas
+        DFSTestUtil.waitExpectedStorageType(
+            fileName, StorageType.SSD, 1, 30000, dfs);
+        DFSTestUtil.waitExpectedStorageType(
+            fileName, StorageType.DISK, 2, 30000, dfs);
+      }
+
+      waitForBlocksMovementAttemptReport(files.size(), 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests to verify hdfsAdmin.satisfyStoragePolicy works well for file.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testSatisfyFileWithHdfsAdmin() throws Exception {
+    try {
+      createCluster();
+      HdfsAdmin hdfsAdmin =
+          new HdfsAdmin(FileSystem.getDefaultUri(config), config);
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(FILE), COLD);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+              {StorageType.DISK, StorageType.ARCHIVE},
+              {StorageType.DISK, StorageType.ARCHIVE}};
+      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+
+      hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
+
+      hdfsCluster.triggerHeartbeats();
+      // Wait till namenode notified about the block location details
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 30000,
+          dfs);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests to verify hdfsAdmin.satisfyStoragePolicy works well for dir.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testSatisfyDirWithHdfsAdmin() throws Exception {
+    try {
+      createCluster();
+      HdfsAdmin hdfsAdmin =
+          new HdfsAdmin(FileSystem.getDefaultUri(config), config);
+      final String subDir = "/subDir";
+      final String subFile1 = subDir + "/subFile1";
+      final String subDir2 = subDir + "/subDir2";
+      final String subFile2 = subDir2 + "/subFile2";
+      dfs.mkdirs(new Path(subDir));
+      writeContent(subFile1);
+      dfs.mkdirs(new Path(subDir2));
+      writeContent(subFile2);
+
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(subDir), ONE_SSD);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+
+      hdfsAdmin.satisfyStoragePolicy(new Path(subDir));
+
+      hdfsCluster.triggerHeartbeats();
+
+      // take effect for the file in the directory.
+      DFSTestUtil.waitExpectedStorageType(
+          subFile1, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          subFile1, StorageType.DISK, 2, 30000, dfs);
+
+      // take no effect for the sub-dir's file in the directory.
+      DFSTestUtil.waitExpectedStorageType(
+          subFile2, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          subFile2, StorageType.DISK, 2, 30000, dfs);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests to verify hdfsAdmin.satisfyStoragePolicy exceptions.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testSatisfyWithExceptions() throws Exception {
+    try {
+      createCluster();
+      final String nonExistingFile = "/noneExistingFile";
+      hdfsCluster.getConfiguration(0).
+          setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
+      restartNamenode();
+      HdfsAdmin hdfsAdmin =
+          new HdfsAdmin(FileSystem.getDefaultUri(config), config);
+
+      try {
+        hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
+        Assert.fail(String.format(
+            "Should failed to satisfy storage policy "
+                + "for %s since %s is set to false.",
+            FILE, DFS_STORAGE_POLICY_ENABLED_KEY));
+      } catch (IOException e) {
+        GenericTestUtils.assertExceptionContains(String.format(
+            "Failed to satisfy storage policy since %s is set to false.",
+            DFS_STORAGE_POLICY_ENABLED_KEY), e);
+      }
+
+      hdfsCluster.getConfiguration(0).
+          setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true);
+      restartNamenode();
+
+      hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(config), config);
+      try {
+        hdfsAdmin.satisfyStoragePolicy(new Path(nonExistingFile));
+        Assert.fail("Should throw FileNotFoundException for " +
+            nonExistingFile);
+      } catch (FileNotFoundException e) {
+
+      }
+
+      try {
+        hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
+        hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
+      } catch (Exception e) {
+        Assert.fail(String.format("Allow to invoke mutlipe times "
+            + "#satisfyStoragePolicy() api for a path %s , internally just "
+            + "skipping addtion to satisfy movement queue.", FILE));
+      }
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests to verify that for the given path, some of the blocks or block src
+   * locations(src nodes) under the given path will be scheduled for block
+   * movement.
+   *
+   * For example, there are two block for a file:
+   *
+   * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
+   * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
+   * Only one datanode is available with storage type ARCHIVE, say D.
+   *
+   * SPS will schedule block movement to the coordinator node with the details,
+   * blk_1[move A(DISK) -> D(ARCHIVE)], blk_2[move A(DISK) -> D(ARCHIVE)].
+   */
+  @Test(timeout = 300000)
+  public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
+      throws Exception {
+    try {
+      createCluster();
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(FILE), COLD);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
+
+      // Adding ARCHIVE based datanodes.
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+
+      dfs.satisfyStoragePolicy(new Path(FILE));
+      hdfsCluster.triggerHeartbeats();
+      // Wait till StorgePolicySatisfier identified that block to move to
+      // ARCHIVE area.
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
+
+      waitForBlocksMovementAttemptReport(1, 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests to verify that for the given path, no blocks or block src
+   * locations(src nodes) under the given path will be scheduled for block
+   * movement as there are no available datanode with required storage type.
+   *
+   * For example, there are two block for a file:
+   *
+   * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
+   * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
+   * No datanode is available with storage type ARCHIVE.
+   *
+   * SPS won't schedule any block movement for this path.
+   */
+  @Test(timeout = 300000)
+  public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
+      throws Exception {
+    try {
+      createCluster();
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(FILE), COLD);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
+      // Adding DISK based datanodes
+      startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+
+      dfs.satisfyStoragePolicy(new Path(FILE));
+      hdfsCluster.triggerHeartbeats();
+
+      // No block movement will be scheduled as there is no target node
+      // available with the required storage type.
+      waitForAttemptedItems(1, 30000);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+          dfs);
+      // Since there is no target node the item will get timed out and then
+      // re-attempted.
+      waitForAttemptedItems(1, 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test to verify that satisfy worker can't move blocks. If the given block is
+   * pinned it shouldn't be considered for retries.
+   */
+  @Test(timeout = 120000)
+  public void testMoveWithBlockPinning() throws Exception {
+    try{
+      config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+      hdfsCluster = startCluster(config, allDiskTypes, 3, 2, CAPACITY);
+
+      hdfsCluster.waitActive();
+      dfs = hdfsCluster.getFileSystem();
+
+      // create a file with replication factor 3 and mark 2 pinned block
+      // locations.
+      final String file1 = createFileAndSimulateFavoredNodes(2);
+
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(file1), COLD);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE}};
+      // Adding DISK based datanodes
+      startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+
+      dfs.satisfyStoragePolicy(new Path(file1));
+      hdfsCluster.triggerHeartbeats();
+
+      // No block movement will be scheduled as there is no target node
+      // available with the required storage type.
+      waitForAttemptedItems(1, 30000);
+      waitForBlocksMovementAttemptReport(1, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file1, StorageType.ARCHIVE, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          file1, StorageType.DISK, 2, 30000, dfs);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests to verify that for the given path, only few of the blocks or block
+   * src locations(src nodes) under the given path will be scheduled for block
+   * movement.
+   *
+   * For example, there are two block for a file:
+   *
+   * File1 => two blocks and default storage policy(HOT).
+   * blk_1[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)],
+   * blk_2[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)].
+   *
+   * Now, set storage policy to COLD.
+   * Only two Dns are available with expected storage type ARCHIVE, say A, E.
+   *
+   * SPS will schedule block movement to the coordinator node with the details,
+   * blk_1[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)],
+   * blk_2[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)].
+   */
+  @Test(timeout = 300000)
+  public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes()
+      throws Exception {
+    try {
+      int numOfDns = 5;
+      config.setLong("dfs.block.size", 1024);
+      allDiskTypes =
+          new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+              {StorageType.DISK, StorageType.DISK},
+              {StorageType.DISK, StorageType.DISK},
+              {StorageType.DISK, StorageType.DISK},
+              {StorageType.DISK, StorageType.ARCHIVE}};
+      hdfsCluster = startCluster(config, allDiskTypes, numOfDns,
+          STORAGES_PER_DATANODE, CAPACITY);
+      dfs = hdfsCluster.getFileSystem();
+      writeContent(FILE, (short) 5);
+
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(FILE), COLD);
+
+      dfs.satisfyStoragePolicy(new Path(FILE));
+      hdfsCluster.triggerHeartbeats();
+      // Wait till StorgePolicySatisfier identified that block to move to
+      // ARCHIVE area.
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+          dfs);
+
+      waitForBlocksMovementAttemptReport(1, 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests that moving block storage with in the same datanode. Let's say we
+   * have DN1[DISK,ARCHIVE], DN2[DISK, SSD], DN3[DISK,RAM_DISK] when
+   * storagepolicy set to ONE_SSD and request satisfyStoragePolicy, then block
+   * should move to DN2[SSD] successfully.
+   */
+  @Test(timeout = 300000)
+  public void testBlockMoveInSameDatanodeWithONESSD() throws Exception {
+    StorageType[][] diskTypes =
+        new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.RAM_DISK}};
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    try {
+      hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
+      dfs = hdfsCluster.getFileSystem();
+      writeContent(FILE);
+
+      // Change policy to ONE_SSD
+      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
+
+      dfs.satisfyStoragePolicy(new Path(FILE));
+      hdfsCluster.triggerHeartbeats();
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
+
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests that moving block storage with in the same datanode and remote node.
+   * Let's say we have DN1[DISK,ARCHIVE], DN2[ARCHIVE, SSD], DN3[DISK,DISK],
+   * DN4[DISK,DISK] when storagepolicy set to WARM and request
+   * satisfyStoragePolicy, then block should move to DN1[ARCHIVE] and
+   * DN2[ARCHIVE] successfully.
+   */
+  @Test(timeout = 300000)
+  public void testBlockMoveInSameAndRemoteDatanodesWithWARM() throws Exception {
+    StorageType[][] diskTypes =
+        new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.SSD},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK}};
+
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    try {
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          STORAGES_PER_DATANODE, CAPACITY);
+      dfs = hdfsCluster.getFileSystem();
+      writeContent(FILE);
+
+      // Change policy to WARM
+      dfs.setStoragePolicy(new Path(FILE), "WARM");
+      dfs.satisfyStoragePolicy(new Path(FILE));
+      hdfsCluster.triggerHeartbeats();
+
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 1, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
+          dfs);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * If replica with expected storage type already exist in source DN then that
+   * DN should be skipped.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWhenReplicaWithExpectedStorageAlreadyAvailableInSource()
+      throws Exception {
+    StorageType[][] diskTypes = new StorageType[][] {
+        {StorageType.DISK, StorageType.ARCHIVE},
+        {StorageType.DISK, StorageType.ARCHIVE},
+        {StorageType.DISK, StorageType.ARCHIVE}};
+
+    try {
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          STORAGES_PER_DATANODE, CAPACITY);
+      dfs = hdfsCluster.getFileSystem();
+      // 1. Write two replica on disk
+      DFSTestUtil.createFile(dfs, new Path(FILE), DEFAULT_BLOCK_SIZE,
+          (short) 2, 0);
+      // 2. Change policy to COLD, so third replica will be written to ARCHIVE.
+      dfs.setStoragePolicy(new Path(FILE), "COLD");
+
+      // 3.Change replication factor to 3.
+      dfs.setReplication(new Path(FILE), (short) 3);
+
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
+          dfs);
+
+      // 4. Change policy to HOT, so we can move the all block to DISK.
+      dfs.setStoragePolicy(new Path(FILE), "HOT");
+
+      // 4. Satisfy the policy.
+      dfs.satisfyStoragePolicy(new Path(FILE));
+
+      // 5. Block should move successfully .
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+          dfs);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests that movements should not be assigned when there is no space in
+   * target DN.
+   */
+  @Test(timeout = 300000)
+  public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace()
+      throws Exception {
+    StorageType[][] diskTypes =
+        new StorageType[][]{{StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.DISK}};
+    config.setLong("dfs.block.size", 2 * DEFAULT_BLOCK_SIZE);
+    long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1);
+    try {
+      hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, dnCapacity);
+      dfs = hdfsCluster.getFileSystem();
+      writeContent(FILE);
+
+      // Change policy to ONE_SSD
+      dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
+      Path filePath = new Path("/testChooseInSameDatanode");
+      final FSDataOutputStream out =
+          dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
+      try {
+        dfs.setStoragePolicy(filePath, ONE_SSD);
+        // Try to fill up SSD part by writing content
+        long remaining = dfs.getStatus().getRemaining() / (3 * 2);
+        for (int i = 0; i < remaining; i++) {
+          out.write(i);
+        }
+      } finally {
+        out.close();
+      }
+      hdfsCluster.triggerHeartbeats();
+      ArrayList<DataNode> dataNodes = hdfsCluster.getDataNodes();
+      // Temporarily disable heart beats, so that we can assert whether any
+      // items schedules for DNs even though DN's does not have space to write.
+      // Disabling heart beats can keep scheduled items on DatanodeDescriptor
+      // itself.
+      for (DataNode dataNode : dataNodes) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
+      }
+      dfs.satisfyStoragePolicy(new Path(FILE));
+
+      // Wait for items to be processed
+      waitForAttemptedItems(1, 30000);
+
+      // Enable heart beats now
+      for (DataNode dataNode : dataNodes) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, false);
+      }
+      hdfsCluster.triggerHeartbeats();
+
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+          dfs);
+      DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 0, 30000, dfs);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Tests that Xattrs should be cleaned if satisfy storage policy called on EC
+   * file with unsuitable storage policy set.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles()
+      throws Exception {
+    StorageType[][] diskTypes =
+        new StorageType[][]{{StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.SSD},
+            {StorageType.DISK, StorageType.SSD}};
+
+    int defaultStripedBlockSize =
+        StripedFileTestUtil.getDefaultECPolicy().getCellSize() * 4;
+    config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultStripedBlockSize);
+    config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    config.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+        1L);
+    config.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
+        false);
+    try {
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          STORAGES_PER_DATANODE, CAPACITY);
+      dfs = hdfsCluster.getFileSystem();
+      dfs.enableErasureCodingPolicy(
+          StripedFileTestUtil.getDefaultECPolicy().getName());
+
+      // set "/foo" directory with ONE_SSD storage policy.
+      ClientProtocol client = NameNodeProxies.createProxy(config,
+          hdfsCluster.getFileSystem(0).getUri(), ClientProtocol.class)
+          .getProxy();
+      String fooDir = "/foo";
+      client.mkdirs(fooDir, new FsPermission((short) 777), true);
+      // set an EC policy on "/foo" directory
+      client.setErasureCodingPolicy(fooDir,
+          StripedFileTestUtil.getDefaultECPolicy().getName());
+
+      // write file to fooDir
+      final String testFile = "/foo/bar";
+      long fileLen = 20 * defaultStripedBlockSize;
+      DFSTestUtil.createFile(dfs, new Path(testFile), fileLen, (short) 3, 0);
+
+      // ONESSD is unsuitable storage policy on EC files
+      client.setStoragePolicy(fooDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+      dfs.satisfyStoragePolicy(new Path(testFile));
+
+      // Thread.sleep(9000); // To make sure SPS triggered
+      // verify storage types and locations
+      LocatedBlocks locatedBlocks =
+          client.getBlockLocations(testFile, 0, fileLen);
+      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        for (StorageType type : lb.getStorageTypes()) {
+          Assert.assertEquals(StorageType.DISK, type);
+        }
+      }
+
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved(testFile, XATTR_SATISFY_STORAGE_POLICY,
+          hdfsCluster.getNamesystem(), 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test SPS with empty file.
+   * 1. Create one empty file.
+   * 2. Call satisfyStoragePolicy for empty file.
+   * 3. SPS should skip this file and xattr should not be added for empty file.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWhenFileLengthIsZero() throws Exception {
+    try {
+      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
+      Path filePath = new Path("/zeroSizeFile");
+      DFSTestUtil.createFile(fs, filePath, 0, (short) 1, 0);
+      FSEditLog editlog = hdfsCluster.getNameNode().getNamesystem()
+          .getEditLog();
+      long lastWrittenTxId = editlog.getLastWrittenTxId();
+      fs.satisfyStoragePolicy(filePath);
+      Assert.assertEquals("Xattr should not be added for the file",
+          lastWrittenTxId, editlog.getLastWrittenTxId());
+      INode inode = hdfsCluster.getNameNode().getNamesystem().getFSDirectory()
+          .getINode(filePath.toString());
+      Assert.assertTrue("XAttrFeature should be null for file",
+          inode.getXAttrFeature() == null);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test SPS for low redundant file blocks.
+   * 1. Create cluster with 3 datanode.
+   * 1. Create one file with 3 replica.
+   * 2. Set policy and call satisfyStoragePolicy for file.
+   * 3. Stop NameNode and Datanodes.
+   * 4. Start NameNode with 2 datanode and wait for block movement.
+   * 5. Start third datanode.
+   * 6. Third Datanode replica also should be moved in proper
+   * sorage based on policy.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
+    try {
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+          "3000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "5000");
+      StorageType[][] newtypes = new StorageType[][] {
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK}};
+      hdfsCluster = startCluster(config, newtypes, 3, 2, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
+      Path filePath = new Path("/zeroSizeFile");
+      DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0);
+      fs.setStoragePolicy(filePath, "COLD");
+      List<DataNodeProperties> list = new ArrayList<>();
+      list.add(hdfsCluster.stopDataNode(0));
+      list.add(hdfsCluster.stopDataNode(0));
+      list.add(hdfsCluster.stopDataNode(0));
+      restartNamenode();
+      hdfsCluster.restartDataNode(list.get(0), false);
+      hdfsCluster.restartDataNode(list.get(1), false);
+      hdfsCluster.waitActive();
+      fs.satisfyStoragePolicy(filePath);
+      DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+          StorageType.ARCHIVE, 2, 30000, hdfsCluster.getFileSystem());
+      hdfsCluster.restartDataNode(list.get(2), false);
+      DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+          StorageType.ARCHIVE, 3, 30000, hdfsCluster.getFileSystem());
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test SPS for extra redundant file blocks.
+   * 1. Create cluster with 5 datanode.
+   * 2. Create one file with 5 replica.
+   * 3. Set file replication to 3.
+   * 4. Set policy and call satisfyStoragePolicy for file.
+   * 5. Block should be moved successfully.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWhenFileHasExcessRedundancyBlocks() throws Exception {
+    try {
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+          "3000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "5000");
+      StorageType[][] newtypes = new StorageType[][] {
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK}};
+      hdfsCluster = startCluster(config, newtypes, 5, 2, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
+      Path filePath = new Path("/zeroSizeFile");
+      DFSTestUtil.createFile(fs, filePath, 1024, (short) 5, 0);
+      fs.setReplication(filePath, (short) 3);
+      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+          LogFactory.getLog(BlockStorageMovementAttemptedItems.class));
+      fs.setStoragePolicy(filePath, "COLD");
+      fs.satisfyStoragePolicy(filePath);
+      DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+          StorageType.ARCHIVE, 3, 60000, hdfsCluster.getFileSystem());
+      assertFalse("Log output does not contain expected log message: ",
+          logs.getOutput().contains("some of the blocks are low redundant"));
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test SPS for empty directory, xAttr should be removed.
+   */
+  @Test(timeout = 300000)
+  public void testSPSForEmptyDirectory() throws IOException, TimeoutException,
+      InterruptedException {
+    try {
+      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
+      Path emptyDir = new Path("/emptyDir");
+      fs.mkdirs(emptyDir);
+      fs.satisfyStoragePolicy(emptyDir);
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved("/emptyDir",
+          XATTR_SATISFY_STORAGE_POLICY, hdfsCluster.getNamesystem(), 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test SPS for not exist directory.
+   */
+  @Test(timeout = 300000)
+  public void testSPSForNonExistDirectory() throws Exception {
+    try {
+      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
+      hdfsCluster.waitActive();
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
+      Path emptyDir = new Path("/emptyDir");
+      try {
+        fs.satisfyStoragePolicy(emptyDir);
+        fail("FileNotFoundException should throw");
+      } catch (FileNotFoundException e) {
+        // nothing to do
+      }
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test SPS for directory tree which doesn't have files.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
+    try {
+      hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+          STORAGES_PER_DATANODE, CAPACITY);
+      hdfsCluster.waitActive();
+      // Create directories
+      /*
+       *                   root
+       *                    |
+       *           A--------C--------D
+       *                    |
+       *               G----H----I
+       *                    |
+       *                    O
+       */
+      DistributedFileSystem fs = hdfsCluster.getFileSystem();
+      fs.mkdirs(new Path("/root/C/H/O"));
+      fs.mkdirs(new Path("/root/A"));
+      fs.mkdirs(new Path("/root/D"));
+      fs.mkdirs(new Path("/root/C/G"));
+      fs.mkdirs(new Path("/root/C/I"));
+      fs.satisfyStoragePolicy(new Path("/root"));
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved("/root",
+          XATTR_SATISFY_STORAGE_POLICY, hdfsCluster.getNamesystem(), 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test SPS for directory which has multilevel directories.
+   */
+  @Test(timeout = 300000)
+  public void testMultipleLevelDirectoryForSatisfyStoragePolicy()
+      throws Exception {
+    try {
+      StorageType[][] diskTypes = new StorageType[][] {
+          {StorageType.DISK, StorageType.ARCHIVE},
+          {StorageType.ARCHIVE, StorageType.SSD},
+          {StorageType.DISK, StorageType.DISK}};
+      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          STORAGES_PER_DATANODE, CAPACITY);
+      dfs = hdfsCluster.getFileSystem();
+      createDirectoryTree(dfs);
+
+      List<String> files = getDFSListOfTree();
+      dfs.setStoragePolicy(new Path("/root"), COLD);
+      dfs.satisfyStoragePolicy(new Path("/root"));
+      for (String fileName : files) {
+        // Wait till the block is moved to ARCHIVE
+        DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
+            30000, dfs);
+      }
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test storage move blocks while under replication block tasks exists in the
+   * system. So, both will share the max transfer streams.
+   *
+   * 1. Create cluster with 3 datanode.
+   * 2. Create 20 files with 2 replica.
+   * 3. Start 2 more DNs with DISK & SSD types
+   * 4. SetReplication factor for the 1st 10 files to 4 to trigger replica task
+   * 5. Set policy to SSD to the 2nd set of files from 11-20
+   * 6. Call SPS for 11-20 files to trigger move block tasks to new DNs
+   * 7. Wait for the under replica and SPS tasks completion
+   */
+  @Test(timeout = 300000)
+  public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
+    try {
+      config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3);
+      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+          "3000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "5000");
+
+      StorageType[][] storagetypes = new StorageType[][] {
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK}};
+
+      hdfsCluster = startCluster(config, storagetypes, 2, 2, CAPACITY);
+      hdfsCluster.waitActive();
+      dfs = hdfsCluster.getFileSystem();
+
+      // Below files will be used for pending replication block tasks.
+      for (int i=1; i<=20; i++){
+        Path filePath = new Path("/file" + i);
+        DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE * 5, (short) 2,
+            0);
+      }
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.DISK, StorageType.SSD},
+              {StorageType.DISK, StorageType.SSD}};
+      startAdditionalDNs(config, 2, NUM_OF_DATANODES, newtypes,
+          STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+
+      // increase replication factor to 4 for the first 10 files and thus
+      // initiate replica tasks
+      for (int i=1; i<=10; i++){
+        Path filePath = new Path("/file" + i);
+        dfs.setReplication(filePath, (short) 4);
+      }
+
+      // invoke SPS for 11-20 files
+      for (int i = 11; i <= 20; i++) {
+        Path filePath = new Path("/file" + i);
+        dfs.setStoragePolicy(filePath, "ALL_SSD");
+        dfs.satisfyStoragePolicy(filePath);
+      }
+
+      for (int i = 1; i <= 10; i++) {
+        Path filePath = new Path("/file" + i);
+        DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+            StorageType.DISK, 4, 60000, hdfsCluster.getFileSystem());
+      }
+      for (int i = 11; i <= 20; i++) {
+        Path filePath = new Path("/file" + i);
+        DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+            StorageType.SSD, 2, 30000, hdfsCluster.getFileSystem());
+      }
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  private static void createDirectoryTree(DistributedFileSystem dfs)
+      throws Exception {
+    // tree structure
+    /*
+     *                           root
+     *                             |
+     *           A--------B--------C--------D--------E
+     *                    |                 |
+     *          F----G----H----I       J----K----L----M
+     *               |                           |
+     *          N----O----P                 Q----R----S
+     *                    |                 |
+     *                    T                 U
+     */
+    // create root Node and child
+    dfs.mkdirs(new Path("/root"));
+    DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/B"));
+    DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/D"));
+    DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0);
+
+    // Create /root/B child
+    DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/B/G"));
+    DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0);
+
+    // Create /root/D child
+    DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/D/L"));
+    DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0);
+
+    // Create /root/B/G child
+    DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/B/G/P"));
+
+    // Create /root/D/L child
+    dfs.mkdirs(new Path("/root/D/L/Q"));
+    DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0);
+
+    // Create /root/B/G/P child
+    DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0);
+
+    // Create /root/D/L/Q child
+    DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0);
+  }
+
+  private List<String> getDFSListOfTree() {
+    List<String> dfsList = new ArrayList<>();
+    dfsList.add("/root/A");
+    dfsList.add("/root/B/F");
+    dfsList.add("/root/B/G/N");
+    dfsList.add("/root/B/G/O");
+    dfsList.add("/root/B/G/P/T");
+    dfsList.add("/root/B/H");
+    dfsList.add("/root/B/I");
+    dfsList.add("/root/C");
+    dfsList.add("/root/D/J");
+    dfsList.add("/root/D/K");
+    dfsList.add("/root/D/L/Q/U");
+    dfsList.add("/root/D/L/R");
+    dfsList.add("/root/D/L/S");
+    dfsList.add("/root/D/M");
+    dfsList.add("/root/E");
+    return dfsList;
+  }
+
+  private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
+      throws IOException {
+    ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
+    final String file1 = "/testMoveWithBlockPinning";
+    // replication factor 3
+    InetSocketAddress[] favoredNodes = new InetSocketAddress[favoredNodesCount];
+    for (int i = 0; i < favoredNodesCount; i++) {
+      favoredNodes[i] = dns.get(i).getXferAddress();
+    }
+    DFSTestUtil.createFile(dfs, new Path(file1), false, 1024, 100,
+        DEFAULT_BLOCK_SIZE, (short) 3, 0, false, favoredNodes);
+
+    LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0);
+    Assert.assertEquals("Wrong block count", 1,
+        locatedBlocks.locatedBlockCount());
+
+    // verify storage type before movement
+    LocatedBlock lb = locatedBlocks.get(0);
+    StorageType[] storageTypes = lb.getStorageTypes();
+    for (StorageType storageType : storageTypes) {
+      Assert.assertTrue(StorageType.DISK == storageType);
+    }
+
+    // Mock FsDatasetSpi#getPinning to show that the block is pinned.
+    DatanodeInfo[] locations = lb.getLocations();
+    Assert.assertEquals(3, locations.length);
+    Assert.assertTrue(favoredNodesCount < locations.length);
+    for(DatanodeInfo dnInfo: locations){
+      LOG.info("Simulate block pinning in datanode {}",
+          locations[favoredNodesCount]);
+      DataNode dn = hdfsCluster.getDataNode(dnInfo.getIpcPort());
+      InternalDataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
+      favoredNodesCount--;
+      if (favoredNodesCount <= 0) {
+        break; // marked favoredNodesCount number of pinned block location
+      }
+    }
+    return file1;
+  }
+
+  public void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
+      int timeout) throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
+            expectedBlkMovAttemptedCount,
+            ((BlockStorageMovementAttemptedItems) (externalSps
+                .getAttemptedItemsMonitor())).getAttemptedItemsCount());
+        return ((BlockStorageMovementAttemptedItems) (externalSps
+            .getAttemptedItemsMonitor()))
+                .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
+      }
+    }, 100, timeout);
+  }
+
+  public void waitForBlocksMovementAttemptReport(
+      long expectedMovementFinishedBlocksCount, int timeout)
+          throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        int actualCount = externalSps.getAttemptedItemsMonitor()
+            .getAttemptedItemsCount();
+        LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
+            expectedMovementFinishedBlocksCount, actualCount);
+        return actualCount
+            >= expectedMovementFinishedBlocksCount;
+      }
+    }, 100, timeout);
+  }
+
+  public void writeContent(final String fileName) throws IOException {
+    writeContent(fileName, (short) 3);
+  }
+
+  private void writeContent(final String fileName, short replicatonFactor)
+      throws IOException {
+    // write to DISK
+    final FSDataOutputStream out = dfs.create(new Path(fileName),
+        replicatonFactor);
+    for (int i = 0; i < 1024; i++) {
+      out.write(i);
+    }
+    out.close();
+  }
+
+  private void startAdditionalDNs(final Configuration conf,
+      int newNodesRequired, int existingNodesNum, StorageType[][] newTypes,
+      int storagesPerDn, long nodeCapacity, final MiniDFSCluster cluster)
+          throws IOException {
+    long[][] capacities;
+    existingNodesNum += newNodesRequired;
+    capacities = new long[newNodesRequired][storagesPerDn];
+    for (int i = 0; i < newNodesRequired; i++) {
+      for (int j = 0; j < storagesPerDn; j++) {
+        capacities[i][j] = nodeCapacity;
+      }
+    }
+
+    cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
+        null, capacities, null, false, false, false, null);
+    cluster.triggerHeartbeats();
+  }
+
+  /**
+   * Implementation of listener callback, where it collects all the sps move
+   * attempted blocks for assertion.
+   */
+  public static final class ExternalBlockMovementListener
+      implements BlockMovementListener {
+
+    private List<Block> actualBlockMovements = new ArrayList<>();
+
+    @Override
+    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+      for (Block block : moveAttemptFinishedBlks) {
+        actualBlockMovements.add(block);
+      }
+      LOG.info("Movement attempted blocks:{}", actualBlockMovements);
+    }
+
+    public List<Block> getActualBlockMovements() {
+      return actualBlockMovements;
+    }
+
+    public void clear() {
+      actualBlockMovements.clear();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicyCommands.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicyCommands.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicyCommands.java
index 28838a6..ad77684 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicyCommands.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicyCommands.java
@@ -50,7 +50,7 @@ public class TestStoragePolicyCommands {
   public void clusterSetUp() throws IOException, URISyntaxException {
     conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL.toString());
+        StoragePolicySatisfierMode.EXTERNAL.toString());
     StorageType[][] newtypes = new StorageType[][] {
         {StorageType.ARCHIVE, StorageType.DISK}};
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicySatisfyAdminCommands.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicySatisfyAdminCommands.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicySatisfyAdminCommands.java
index 8a62e0e..1ab7788 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicySatisfyAdminCommands.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicySatisfyAdminCommands.java
@@ -29,6 +29,11 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,12 +48,13 @@ public class TestStoragePolicySatisfyAdminCommands {
   private Configuration conf = null;
   private MiniDFSCluster cluster = null;
   private DistributedFileSystem dfs = null;
+  private StoragePolicySatisfier externalSps = null;
 
   @Before
   public void clusterSetUp() throws IOException, URISyntaxException {
     conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL.toString());
+        StoragePolicySatisfierMode.EXTERNAL.toString());
     // Reduced refresh cycle to update latest datanodes.
     conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
         1000);
@@ -58,6 +64,14 @@ public class TestStoragePolicySatisfyAdminCommands {
         .storageTypes(newtypes).build();
     cluster.waitActive();
     dfs = cluster.getFileSystem();
+    NameNodeConnector nnc = DFSTestUtil.getNameNodeConnector(conf,
+        HdfsServerConstants.MOVER_ID_PATH, 1, false);
+
+    StoragePolicySatisfier externalSps = new StoragePolicySatisfier(conf);
+    Context externalCtxt = new ExternalSPSContext(externalSps, nnc);
+
+    externalSps.init(externalCtxt);
+    externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
   }
 
   @After
@@ -70,6 +84,9 @@ public class TestStoragePolicySatisfyAdminCommands {
       cluster.shutdown();
       cluster = null;
     }
+    if (externalSps != null) {
+      externalSps.stopGracefully();
+    }
   }
 
   @Test(timeout = 30000)
@@ -92,41 +109,4 @@ public class TestStoragePolicySatisfyAdminCommands {
     DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000,
         dfs);
   }
-
-  @Test(timeout = 30000)
-  public void testIsSatisfierRunningCommand() throws Exception {
-    final String file = "/testIsSatisfierRunningCommand";
-    DFSTestUtil.createFile(dfs, new Path(file), SIZE, REPL, 0);
-    final StoragePolicyAdmin admin = new StoragePolicyAdmin(conf);
-    DFSTestUtil.toolRun(admin, "-isInternalSatisfierRunning", 0, "yes");
-
-    cluster.getNameNode().reconfigureProperty(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.NONE.toString());
-    cluster.waitActive();
-
-    DFSTestUtil.toolRun(admin, "-isInternalSatisfierRunning", 0, "no");
-
-    // Test with unnecessary args
-    DFSTestUtil.toolRun(admin, "-isInternalSatisfierRunning status", 1,
-        "Can't understand arguments: ");
-  }
-
-  @Test(timeout = 90000)
-  public void testSatisfyStoragePolicyCommandWithWaitOption()
-      throws Exception {
-    final String file = "/testSatisfyStoragePolicyCommandWithWaitOption";
-    DFSTestUtil.createFile(dfs, new Path(file), SIZE, REPL, 0);
-
-    final StoragePolicyAdmin admin = new StoragePolicyAdmin(conf);
-
-    DFSTestUtil.toolRun(admin, "-setStoragePolicy -path " + file
-        + " -policy COLD", 0, "Set storage policy COLD on " + file.toString());
-
-    DFSTestUtil.toolRun(admin, "-satisfyStoragePolicy -w -path " + file, 0,
-        "Waiting for satisfy the policy");
-
-    DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000,
-        dfs);
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org