You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/12 10:22:27 UTC

[44/50] [abbrv] hadoop git commit: HDFS-13076: [SPS]: Cleanup work for HDFS-10285 merge. Contributed by Rakesh R.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
deleted file mode 100644
index d6e92d2..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.namenode.sps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-
-/**
- * This class handles the internal SPS block movements. This will assign block
- * movement tasks to target datanode descriptors.
- */
-@InterfaceAudience.Private
-public class IntraSPSNameNodeBlockMoveTaskHandler
-    implements BlockMoveTaskHandler {
-
-  private BlockManager blockManager;
-  private Namesystem namesystem;
-
-  public IntraSPSNameNodeBlockMoveTaskHandler(BlockManager blockManager,
-      Namesystem namesytem) {
-    this.blockManager = blockManager;
-    this.namesystem = namesytem;
-  }
-
-  @Override
-  public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
-    namesystem.readLock();
-    try {
-      DatanodeDescriptor dn = blockManager.getDatanodeManager()
-          .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid());
-      if (dn == null) {
-        throw new IOException("Failed to schedule block movement task:"
-            + blkMovingInfo + " as target datanode: "
-            + blkMovingInfo.getTarget() + " doesn't exists");
-      }
-      dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
-      dn.addBlocksToMoveStorage(blkMovingInfo);
-    } finally {
-      namesystem.readUnlock();
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
deleted file mode 100644
index 2bf4810..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode.sps;
-
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.AccessControlException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is the Namenode implementation for analyzing the file blocks which
- * are expecting to change its storages and assigning the block storage
- * movements to satisfy the storage policy.
- */
-@InterfaceAudience.Private
-public class IntraSPSNameNodeContext implements Context {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(IntraSPSNameNodeContext.class);
-
-  private final Namesystem namesystem;
-  private final BlockManager blockManager;
-
-  private SPSService service;
-  private final FileCollector fileCollector;
-  private final BlockMoveTaskHandler blockMoveTaskHandler;
-
-  public IntraSPSNameNodeContext(Namesystem namesystem,
-      BlockManager blockManager, SPSService service) {
-    this.namesystem = namesystem;
-    this.blockManager = blockManager;
-    this.service = service;
-    fileCollector = new IntraSPSNameNodeFileIdCollector(
-        namesystem.getFSDirectory(), service);
-    blockMoveTaskHandler = new IntraSPSNameNodeBlockMoveTaskHandler(
-        blockManager, namesystem);
-  }
-
-  @Override
-  public int getNumLiveDataNodes() {
-    return blockManager.getDatanodeManager().getNumLiveDataNodes();
-  }
-
-  /**
-   * @return object containing information regarding the file.
-   */
-  @Override
-  public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
-    Path filePath = DFSUtilClient.makePathFromFileId(inodeID);
-    return namesystem.getFileInfo(filePath.toString(), true, true);
-  }
-
-  @Override
-  public DatanodeStorageReport[] getLiveDatanodeStorageReport()
-      throws IOException {
-    namesystem.readLock();
-    try {
-      return blockManager.getDatanodeManager()
-          .getDatanodeStorageReport(DatanodeReportType.LIVE);
-    } finally {
-      namesystem.readUnlock();
-    }
-  }
-
-  @Override
-  public boolean isFileExist(long inodeId) {
-    return namesystem.getFSDirectory().getInode(inodeId) != null;
-  }
-
-  @Override
-  public void removeSPSHint(long inodeId) throws IOException {
-    this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
-  }
-
-  @Override
-  public boolean isRunning() {
-    return namesystem.isRunning() && service.isRunning();
-  }
-
-  @Override
-  public boolean isInSafeMode() {
-    return namesystem.isInSafeMode();
-  }
-
-  @Override
-  public boolean isMoverRunning() {
-    String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
-    return namesystem.isFileOpenedForWrite(moverId);
-  }
-
-  @Override
-  public void addDropPreviousSPSWorkAtDNs() {
-    namesystem.readLock();
-    try {
-      blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
-    } finally {
-      namesystem.readUnlock();
-    }
-  }
-
-  @Override
-  public BlockStoragePolicy getStoragePolicy(byte policyID) {
-    return blockManager.getStoragePolicy(policyID);
-  }
-
-  @Override
-  public NetworkTopology getNetworkTopology(DatanodeMap datanodeMap) {
-    return blockManager.getDatanodeManager().getNetworkTopology();
-  }
-
-  @Override
-  public long getFileID(String path) throws UnresolvedLinkException,
-      AccessControlException, ParentNotDirectoryException {
-    namesystem.readLock();
-    try {
-      INode inode = namesystem.getFSDirectory().getINode(path);
-      return inode == null ? -1 : inode.getId();
-    } finally {
-      namesystem.readUnlock();
-    }
-  }
-
-  @Override
-  public Long getNextSPSPath() {
-    return blockManager.getSPSManager().getNextPathId();
-  }
-
-  @Override
-  public void removeSPSPathId(long trackId) {
-    blockManager.getSPSManager().removePathId(trackId);
-  }
-
-  @Override
-  public void removeAllSPSPathIds() {
-    blockManager.getSPSManager().removeAllPathIds();
-  }
-
-  @Override
-  public void scanAndCollectFiles(long filePath)
-      throws IOException, InterruptedException {
-    fileCollector.scanAndCollectFiles(filePath);
-  }
-
-  @Override
-  public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
-    blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
-  }
-
-  @Override
-  public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
-    LOG.info("Movement attempted blocks: {}",
-        Arrays.asList(moveAttemptFinishedBlks));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
deleted file mode 100644
index 0473b9d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.namenode.sps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-
-/**
- * A specific implementation for scanning the directory with Namenode internal
- * Inode structure and collects the file ids under the given directory ID.
- */
-@InterfaceAudience.Private
-public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
-    implements FileCollector {
-  private int maxQueueLimitToScan;
-  private final SPSService service;
-
-  private int remainingCapacity = 0;
-
-  private List<ItemInfo> currentBatch;
-
-  public IntraSPSNameNodeFileIdCollector(FSDirectory dir,
-      SPSService service) {
-    super(dir, service.getConf());
-    this.service = service;
-    this.maxQueueLimitToScan = service.getConf().getInt(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
-    currentBatch = new ArrayList<>(maxQueueLimitToScan);
-  }
-
-  @Override
-  protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
-      throws IOException, InterruptedException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Processing {} for statisy the policy",
-          inode.getFullPathName());
-    }
-    if (!inode.isFile()) {
-      return false;
-    }
-    if (inode.isFile() && inode.asFile().numBlocks() != 0) {
-      currentBatch.add(new ItemInfo(
-          ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
-      remainingCapacity--;
-    }
-    return true;
-  }
-
-  @Override
-  protected boolean shouldSubmitCurrentBatch() {
-    return remainingCapacity <= 0;
-  }
-
-  @Override
-  protected void checkINodeReady(long startId) throws IOException {
-    // SPS work won't be scheduled if NN is in standby. So, skipping NN
-    // standby check.
-    return;
-  }
-
-  @Override
-  protected void submitCurrentBatch(Long startId)
-      throws IOException, InterruptedException {
-    // Add current child's to queue
-    service.addAllFilesToProcess(startId,
-        currentBatch, false);
-    currentBatch.clear();
-  }
-
-  @Override
-  protected void throttle() throws InterruptedException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
-          + " waiting for some free slots.");
-    }
-    remainingCapacity = remainingCapacity();
-    // wait for queue to be free
-    while (remainingCapacity <= 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Waiting for storageMovementNeeded queue to be free!");
-      }
-      Thread.sleep(5000);
-      remainingCapacity = remainingCapacity();
-    }
-  }
-
-  @Override
-  protected boolean canTraverseDir(INode inode) throws IOException {
-    return true;
-  }
-
-  @Override
-  protected void checkPauseForTesting() throws InterruptedException {
-    // Nothing to do
-  }
-
-  @Override
-  public void scanAndCollectFiles(final long startINodeId)
-      throws IOException, InterruptedException {
-    FSDirectory fsd = getFSDirectory();
-    INode startInode = fsd.getInode(startINodeId);
-    if (startInode != null) {
-      remainingCapacity = remainingCapacity();
-      if (remainingCapacity == 0) {
-        throttle();
-      }
-      if (startInode.isFile()) {
-        currentBatch
-            .add(new ItemInfo(startInode.getId(), startInode.getId()));
-      } else {
-        readLock();
-        // NOTE: this lock will not be held for full directory scanning. It is
-        // basically a sliced locking. Once it collects a batch size( at max the
-        // size of maxQueueLimitToScan (default 1000)) file ids, then it will
-        // unlock and submits the current batch to SPSService. Once
-        // service.processingQueueSize() shows empty slots, then lock will be
-        // re-acquired and scan will be resumed. This logic was re-used from
-        // EDEK feature.
-        try {
-          traverseDir(startInode.asDirectory(), startINodeId,
-              HdfsFileStatus.EMPTY_NAME, new SPSTraverseInfo(startINodeId));
-        } finally {
-          readUnlock();
-        }
-      }
-      // Mark startInode traverse is done, this is last-batch
-      service.addAllFilesToProcess(startInode.getId(), currentBatch, true);
-      currentBatch.clear();
-    }
-  }
-
-  /**
-   * Returns queue remaining capacity.
-   */
-  public synchronized int remainingCapacity() {
-    int size = service.processingQueueSize();
-    int remainingSize = 0;
-    if (size < maxQueueLimitToScan) {
-      remainingSize = maxQueueLimitToScan - size;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
-          + " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
-    }
-    return remainingSize;
-  }
-
-  class SPSTraverseInfo extends TraverseInfo {
-    private long startId;
-
-    SPSTraverseInfo(long startId) {
-      this.startId = startId;
-    }
-
-    public long getStartId() {
-      return startId;
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index 86634d8..a62dd93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -102,11 +102,6 @@ public interface SPSService {
   int processingQueueSize();
 
   /**
-   * Clear inodeId present in the processing queue.
-   */
-  void clearQueue(long spsPath);
-
-  /**
    * @return the configuration.
    */
   Configuration getConf();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 4af6c8f..7ebd23d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -43,14 +43,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Matcher;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
@@ -159,15 +157,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
           serviceMode);
       return;
     }
-    if (serviceMode == StoragePolicySatisfierMode.INTERNAL
-        && ctxt.isMoverRunning()) {
-      isRunning = false;
-      LOG.error(
-          "Stopping StoragePolicySatisfier thread " + "as Mover ID file "
-              + HdfsServerConstants.MOVER_ID_PATH.toString()
-              + " been opened. Maybe a Mover instance is running!");
-      return;
-    }
     if (reconfigStart) {
       LOG.info("Starting {} StoragePolicySatisfier, as admin requested to "
           + "start it.", StringUtils.toLowerCase(serviceMode.toString()));
@@ -177,9 +166,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
     }
 
     isRunning = true;
-    // Ensure that all the previously submitted block movements(if any) have to
-    // be stopped in all datanodes.
-    addDropSPSWorkCommandsToAllDNs();
     storagePolicySatisfierThread = new Daemon(this);
     storagePolicySatisfierThread.setName("StoragePolicySatisfier");
     storagePolicySatisfierThread.start();
@@ -201,7 +187,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
     this.storageMovementsMonitor.stop();
     if (forceStop) {
       storageMovementNeeded.clearQueuesWithNotification();
-      addDropSPSWorkCommandsToAllDNs();
     } else {
       LOG.info("Stopping StoragePolicySatisfier.");
     }
@@ -234,14 +219,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
     return isRunning;
   }
 
-  /**
-   * Adding drop commands to all datanodes to stop performing the satisfier
-   * block movements, if any.
-   */
-  private void addDropSPSWorkCommandsToAllDNs() {
-    ctxt.addDropPreviousSPSWorkAtDNs();
-  }
-
   @Override
   public void run() {
     while (isRunning) {
@@ -1101,13 +1078,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
   }
 
   /**
-   * Clear queues for given track id.
-   */
-  public void clearQueue(long trackId) {
-    storageMovementNeeded.clearQueue(trackId);
-  }
-
-  /**
    * This class contains information of an attempted blocks and its last
    * attempted or reported time stamp. This is used by
    * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
@@ -1158,20 +1128,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
     }
   }
 
-  /**
-   * Returns sps invoked path status. This method is used by internal satisfy
-   * storage policy service.
-   *
-   * @param path
-   *          sps path
-   * @return storage policy satisfy path status
-   * @throws IOException
-   */
-  public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
-      String path) throws IOException {
-    return storageMovementNeeded.getStatus(ctxt.getFileID(path));
-  }
-
   @Override
   public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) {
     storageMovementNeeded.add(trackInfo, scanCompleted);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
index 0507d6b..074eab6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
@@ -18,30 +18,27 @@
 package org.apache.hadoop.hdfs.server.namenode.sps;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This manages satisfy storage policy invoked path ids and expose methods to
- * process these path ids. It maintains sps mode(INTERNAL/EXTERNAL/NONE)
+ * process these path ids. It maintains sps mode(EXTERNAL/NONE)
  * configured by the administrator.
  *
  * <p>
- * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
- * it will start internal sps daemon service inside namenode and process sps
- * invoked path ids to satisfy the storage policy.
- *
- * <p>
  * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
  * it won't do anything, just maintains the sps invoked path ids. Administrator
  * requires to start external sps service explicitly, to fetch the sps invoked
@@ -66,10 +63,9 @@ public class StoragePolicySatisfyManager {
   private final Queue<Long> pathsToBeTraveresed;
   private final int outstandingPathsLimit;
   private final Namesystem namesystem;
-  private final BlockManager blkMgr;
 
-  public StoragePolicySatisfyManager(Configuration conf, Namesystem namesystem,
-      BlockManager blkMgr) {
+  public StoragePolicySatisfyManager(Configuration conf,
+      Namesystem namesystem) {
     // StoragePolicySatisfier(SPS) configs
     storagePolicyEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
@@ -82,21 +78,16 @@ public class StoragePolicySatisfyManager {
         DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
     mode = StoragePolicySatisfierMode.fromString(modeVal);
     pathsToBeTraveresed = new LinkedList<Long>();
+    this.namesystem = namesystem;
     // instantiate SPS service by just keeps config reference and not starting
     // any supporting threads.
     spsService = new StoragePolicySatisfier(conf);
-    this.namesystem = namesystem;
-    this.blkMgr = blkMgr;
   }
 
   /**
    * This function will do following logic based on the configured sps mode:
    *
    * <p>
-   * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
-   * starts internal daemon service inside namenode.
-   *
-   * <p>
    * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
    * it won't do anything. Administrator requires to start external sps service
    * explicitly.
@@ -113,17 +104,6 @@ public class StoragePolicySatisfyManager {
     }
 
     switch (mode) {
-    case INTERNAL:
-      if (spsService.isRunning()) {
-        LOG.info("Storage policy satisfier is already running"
-            + " as internal daemon service inside namenode.");
-        return;
-      }
-      // starts internal daemon service inside namenode
-      spsService.init(
-          new IntraSPSNameNodeContext(namesystem, blkMgr, spsService));
-      spsService.start(false, mode);
-      break;
     case EXTERNAL:
       LOG.info("Storage policy satisfier is configured as external, "
           + "please start external sps service explicitly to satisfy policy");
@@ -141,10 +121,6 @@ public class StoragePolicySatisfyManager {
    * This function will do following logic based on the configured sps mode:
    *
    * <p>
-   * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
-   * stops internal daemon service inside namenode.
-   *
-   * <p>
    * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
    * it won't do anything. Administrator requires to stop external sps service
    * explicitly, if needed.
@@ -162,16 +138,6 @@ public class StoragePolicySatisfyManager {
     }
 
     switch (mode) {
-    case INTERNAL:
-      removeAllPathIds();
-      if (!spsService.isRunning()) {
-        LOG.info("Internal storage policy satisfier daemon service"
-            + " is not running");
-        return;
-      }
-      // stops internal daemon service running inside namenode
-      spsService.stop(false);
-      break;
     case EXTERNAL:
       removeAllPathIds();
       if (LOG.isDebugEnabled()) {
@@ -194,11 +160,8 @@ public class StoragePolicySatisfyManager {
   }
 
   /**
-   * Sets new sps mode. If the new mode is internal, then it will start internal
-   * sps service inside namenode. If the new mode is external, then stops
-   * internal sps service running(if any) inside namenode. If the new mode is
-   * none, then it will disable the sps feature completely by clearing all
-   * queued up sps path's hint.
+   * Sets new sps mode. If the new mode is none, then it will disable the sps
+   * feature completely by clearing all queued up sps path's hint.
    */
   public void changeModeEvent(StoragePolicySatisfierMode newMode) {
     if (!storagePolicyEnabled) {
@@ -212,16 +175,6 @@ public class StoragePolicySatisfyManager {
     }
 
     switch (newMode) {
-    case INTERNAL:
-      if (spsService.isRunning()) {
-        LOG.info("Storage policy satisfier is already running as {} mode.",
-            mode);
-        return;
-      }
-      spsService.init(new IntraSPSNameNodeContext(this.namesystem, this.blkMgr,
-          spsService));
-      spsService.start(true, newMode);
-      break;
     case EXTERNAL:
       if (mode == newMode) {
         LOG.info("Storage policy satisfier is already in mode:{},"
@@ -238,7 +191,7 @@ public class StoragePolicySatisfyManager {
       }
       LOG.info("Disabling StoragePolicySatisfier, mode:{}", newMode);
       spsService.stop(true);
-      removeAllPathIds();
+      clearPathIds();
       break;
     default:
       if (LOG.isDebugEnabled()) {
@@ -252,77 +205,15 @@ public class StoragePolicySatisfyManager {
   }
 
   /**
-   * This function will do following logic based on the configured sps mode:
-   *
-   * <p>
-   * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
-   * timed wait to stop internal storage policy satisfier daemon threads.
-   *
-   * <p>
-   * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
-   * it won't do anything, just ignore it.
-   *
-   * <p>
-   * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the
-   * service is disabled. It won't do any action, just ignore it.
-   */
-  public void stopGracefully() {
-    switch (mode) {
-    case INTERNAL:
-      spsService.stopGracefully();
-      break;
-    case EXTERNAL:
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Ignoring, StoragePolicySatisfier feature is running"
-            + " outside namenode");
-      }
-      break;
-    case NONE:
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Ignoring, StoragePolicySatisfier feature is disabled");
-      }
-      break;
-    default:
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Invalid mode:{}", mode);
-      }
-      break;
-    }
-  }
-
-  /**
    * @return true if the internal storage policy satisfier daemon is running,
    *         false otherwise.
    */
-  public boolean isInternalSatisfierRunning() {
+  @VisibleForTesting
+  public boolean isSatisfierRunning() {
     return spsService.isRunning();
   }
 
   /**
-   * @return internal SPS service instance.
-   */
-  public SPSService getInternalSPSService() {
-    return this.spsService;
-  }
-
-  /**
-   * @return status Storage policy satisfy status of the path. It is supported
-   *         only for the internal sps daemon service.
-   * @throws IOException
-   *           if the Satisfier is not running inside namenode.
-   */
-  public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
-      String path) throws IOException {
-    if (mode != StoragePolicySatisfierMode.INTERNAL) {
-      LOG.debug("Satisfier is not running inside namenode, so status "
-          + "can't be returned.");
-      throw new IOException("Satisfier is not running inside namenode, "
-          + "so status can't be returned.");
-    }
-    return spsService.checkStoragePolicySatisfyPathStatus(path);
-  }
-
-  /**
    * @return the next SPS path id, on which path users has invoked to satisfy
    *         storages.
    */
@@ -348,10 +239,22 @@ public class StoragePolicySatisfyManager {
 
   /**
    * Removes the SPS path id from the list of sps paths.
+   *
+   * @throws IOException
    */
-  public void removePathId(long trackId) {
+  private void clearPathIds(){
     synchronized (pathsToBeTraveresed) {
-      pathsToBeTraveresed.remove(trackId);
+      Iterator<Long> iterator = pathsToBeTraveresed.iterator();
+      while (iterator.hasNext()) {
+        Long trackId = iterator.next();
+        try {
+          namesystem.removeXattr(trackId,
+              HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
+        } catch (IOException e) {
+          LOG.debug("Failed to remove sps xatttr!", e);
+        }
+        iterator.remove();
+      }
     }
   }
 
@@ -374,12 +277,11 @@ public class StoragePolicySatisfyManager {
   }
 
   /**
-   * @return true if sps is configured as an internal service or external
+   * @return true if sps is configured as an external
    *         service, false otherwise.
    */
   public boolean isEnabled() {
-    return mode == StoragePolicySatisfierMode.INTERNAL
-        || mode == StoragePolicySatisfierMode.EXTERNAL;
+    return mode == StoragePolicySatisfierMode.EXTERNAL;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
index 189bc2b..3293035 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -150,11 +150,6 @@ public class ExternalSPSContext implements Context {
   }
 
   @Override
-  public void addDropPreviousSPSWorkAtDNs() {
-    // Nothing todo
-  }
-
-  @Override
   public void removeSPSHint(long inodeId) throws IOException {
     Path filePath = DFSUtilClient.makePathFromFileId(inodeId);
     nnc.getDistributedFileSystem().removeXAttr(filePath,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
index af90f0d..8e19a7c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -68,15 +68,6 @@ public final class ExternalStoragePolicySatisfier {
       StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
       nnc = getNameNodeConnector(spsConf);
 
-      boolean spsRunning;
-      spsRunning = nnc.getDistributedFileSystem().getClient()
-          .isInternalSatisfierRunning();
-      if (spsRunning) {
-        throw new RuntimeException(
-            "Startup failed due to StoragePolicySatisfier"
-                + " running inside Namenode.");
-      }
-
       ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
       sps.init(context);
       sps.start(true, StoragePolicySatisfierMode.EXTERNAL);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
index d8392fa..e02208c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.tools.TableListing;
 import org.apache.hadoop.util.StringUtils;
@@ -34,7 +33,6 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 import java.io.FileNotFoundException;
-import com.google.common.base.Joiner;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -297,88 +295,6 @@ public class StoragePolicyAdmin extends Configured implements Tool {
         dfs.satisfyStoragePolicy(new Path(path));
         System.out.println("Scheduled blocks to move based on the current"
             + " storage policy on " + path);
-        boolean waitOpt = StringUtils.popOption("-w", args);
-        if (waitOpt) {
-          waitForSatisfyPolicy(dfs, path);
-        }
-      } catch (Exception e) {
-        System.err.println(AdminHelper.prettifyException(e));
-        return 2;
-      }
-      return 0;
-    }
-
-    private void waitForSatisfyPolicy(DistributedFileSystem dfs, String path)
-        throws IOException {
-      System.out.println("Waiting for satisfy the policy ...");
-      boolean running = true;
-      while (running) {
-        StoragePolicySatisfyPathStatus status = dfs.getClient()
-            .checkStoragePolicySatisfyPathStatus(path);
-        switch (status) {
-        case SUCCESS:
-        case FAILURE:
-        case NOT_AVAILABLE:
-          System.out.println(status);
-          running = false;
-          break;
-        case PENDING:
-        case IN_PROGRESS:
-          System.out.println(status);
-        default:
-          System.err.println("Unexpected storage policy satisfyer status,"
-              + " Exiting");
-          running = false;
-          break;
-        }
-
-        try {
-          Thread.sleep(10000);
-        } catch (InterruptedException e) {
-        }
-      }
-      System.out.println(" done");
-    }
-  }
-
-  /**
-   * Command to check storage policy satisfier status running internal(inside)
-   * Namenode.
-   */
-  private static class IsInternalSatisfierRunningCommand
-      implements AdminHelper.Command {
-    @Override
-    public String getName() {
-      return "-isInternalSatisfierRunning";
-    }
-
-    @Override
-    public String getShortUsage() {
-      return "[" + getName() + "]\n";
-    }
-
-    @Override
-    public String getLongUsage() {
-      return getShortUsage() + "\n"
-          + "Check the status of Storage Policy Statisfier"
-          + " running inside Namenode.\n\n";
-    }
-
-    @Override
-    public int run(Configuration conf, List<String> args) throws IOException {
-      if (!args.isEmpty()) {
-        System.err.print("Can't understand arguments: "
-            + Joiner.on(" ").join(args) + "\n");
-        System.err.println("Usage is " + getLongUsage());
-        return 1;
-      }
-      final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
-      try {
-        if(dfs.getClient().isInternalSatisfierRunning()){
-          System.out.println("yes");
-        }else{
-          System.out.println("no");
-        }
       } catch (Exception e) {
         System.err.println(AdminHelper.prettifyException(e));
         return 2;
@@ -438,7 +354,6 @@ public class StoragePolicyAdmin extends Configured implements Tool {
       new SetStoragePolicyCommand(),
       new GetStoragePolicyCommand(),
       new UnsetStoragePolicyCommand(),
-      new SatisfyStoragePolicyCommand(),
-      new IsInternalSatisfierRunningCommand()
+      new SatisfyStoragePolicyCommand()
   };
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index baf7ec7..4a8f9f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -60,8 +60,6 @@ message DatanodeCommandProto {
     NullDatanodeCommand = 7;
     BlockIdCommand = 8;
     BlockECReconstructionCommand = 9;
-    BlockStorageMovementCommand = 10;
-    DropSPSWorkCommand = 11;
   }
 
   required Type cmdType = 1;    // Type of the command
@@ -76,8 +74,6 @@ message DatanodeCommandProto {
   optional RegisterCommandProto registerCmd = 7;
   optional BlockIdCommandProto blkIdCmd = 8;
   optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
-  optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10;
-  optional DropSPSWorkCommandProto dropSPSWorkCmd = 11;
 }
 
 /**
@@ -158,32 +154,6 @@ message BlockECReconstructionCommandProto {
   repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1;
 }
 
- /**
- * Block storage movement command
- */
-message BlockStorageMovementCommandProto {
-  required string blockPoolId = 1;
-  repeated BlockMovingInfoProto blockMovingInfo = 2;
-}
-
-/**
- * Instruct datanode to drop SPS work queues
- */
-message DropSPSWorkCommandProto {
-  // void
-}
-
-/**
- * Block storage movement information
- */
-message BlockMovingInfoProto {
-  required BlockProto block = 1;
-  required DatanodeInfoProto sourceDnInfo = 2;
-  required DatanodeInfoProto targetDnInfo = 3;
-  required StorageTypeProto sourceStorageType = 4;
-  required StorageTypeProto targetStorageType = 5;
-}
-
 /**
  * registration - Information of the datanode registering with the namenode
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 0b533c2..f720d0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4501,8 +4501,7 @@
   <name>dfs.storage.policy.satisfier.mode</name>
   <value>none</value>
   <description>
-    Following values are supported - internal, external, none.
-    If internal, StoragePolicySatisfier will be enabled and started along with active namenode.
+    Following values are supported - external, none.
     If external, StoragePolicySatisfier will be enabled and started as an independent service outside namenode.
     If none, StoragePolicySatisfier is disabled.
     By default, StoragePolicySatisfier is disabled.
@@ -4561,17 +4560,6 @@
 </property>
 
 <property>
-  <name>dfs.storage.policy.satisfier.low.max-streams.preference</name>
-  <value>true</value>
-  <description>
-    If false, blocks to move tasks will share equal ratio of number of highest-priority
-    replication streams (dfs.namenode.replication.max-streams) with pending replica and
-    erasure-coded reconstruction tasks. If true, blocks to move tasks will only use
-    the delta number of replication streams. The default value is true.
-  </description>
-</property>
-
-<property>
   <name>dfs.storage.policy.satisfier.retry.max.attempts</name>
   <value>3</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index 5872ef8..3789779 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -106,9 +106,9 @@ Following 2 options will allow users to move the blocks based on new policy set.
 ### <u>S</u>torage <u>P</u>olicy <u>S</u>atisfier (SPS)
 
 When user changes the storage policy on a file/directory, user can call `HdfsAdmin` API `satisfyStoragePolicy()` to move the blocks as per the new policy set.
-The SPS daemon thread runs along with namenode and periodically scans for the storage mismatches between new policy set and the physical blocks placed. This will only track the files/directories for which user invoked satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, then it will schedule block movement tasks to datanodes. If there are any failures in movement, the SPS will re-attempt by sending new block movement tasks.
+The SPS tool running external to namenode periodically scans for the storage mismatches between new policy set and the physical blocks placed. This will only track the files/directories for which user invoked satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, then it will schedule block movement tasks to datanodes. If there are any failures in movement, the SPS will re-attempt by sending new block movement tasks.
 
-SPS can be enabled as internal service to Namenode or as an external service outside Namenode or disabled dynamically without restarting the Namenode.
+SPS can be enabled as an external service outside Namenode or disabled dynamically without restarting the Namenode.
 
 Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285)
 
@@ -125,8 +125,8 @@ Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HD
 
 ####Configurations:
 
-*   **dfs.storage.policy.satisfier.mode** - Used to enable(internal service inside NN or external service outside NN) or disable SPS.
-   Following string values are supported - `internal`, `external`, `none`. Configuring `internal` or `external` value represents SPS is enable and `none` to disable.
+*   **dfs.storage.policy.satisfier.mode** - Used to enable external service outside NN or disable SPS.
+   Following string values are supported - `external`, `none`. Configuring `external` value represents SPS is enable and `none` to disable.
    The default value is `none`.
 
 *   **dfs.storage.policy.satisfier.recheck.timeout.millis** - A timeout to re-check the processed block storage movement
@@ -218,25 +218,17 @@ Schedule blocks to move based on file's/directory's current storage policy.
 
 * Command:
 
-        hdfs storagepolicies -satisfyStoragePolicy [-w] -path <path>
+        hdfs storagepolicies -satisfyStoragePolicy -path <path>
 
 * Arguments:
 
 | | |
 |:---- |:---- |
 | `-path <path>` | The path referring to either a directory or a file. |
-| `-w` | It requests that the command wait till all the files satisfy the policy in given path. This will print the current status of the path in each 10 sec and status are:<br/>PENDING - Path is in queue and not processed for satisfying the policy.<br/>IN_PROGRESS - Satisfying the storage policy for path.<br/>SUCCESS - Storage policy satisfied for the path.<br/>FAILURE : Few blocks failed to move.<br/>NOT_AVAILABLE - Status not available. |
 
-### SPS Running Status
 
-Check the running status of Storage Policy Satisfier service in namenode. If it is running, return 'yes'. Otherwise return 'no'.
-
-* Command:
-
-        hdfs storagepolicies -isInternalSatisfierRunning
-
-### Enable(internal service inside NN or external service outside NN) or Disable SPS without restarting Namenode
-If administrator wants to switch modes of SPS feature while Namenode is running, first he/she needs to update the desired value(internal or external or none) for the configuration item `dfs.storage.policy.satisfier.mode` in configuration file (`hdfs-site.xml`) and then run the following Namenode reconfig command
+### Enable external service outside NN or Disable SPS without restarting Namenode
+If administrator wants to switch modes of SPS feature while Namenode is running, first he/she needs to update the desired value(external or none) for the configuration item `dfs.storage.policy.satisfier.mode` in configuration file (`hdfs-site.xml`) and then run the following Namenode reconfig command
 
 * Command:
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index bab37e4..fb4616a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -59,6 +59,7 @@ import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -139,6 +140,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@@ -165,6 +167,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.XAttrStorage;
 import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -193,6 +196,7 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.Level;
+import org.junit.Assert;
 import org.junit.Assume;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -2491,4 +2495,40 @@ public class DFSTestUtil {
       }
     }, 100, timeout);
   }
+
+  /**
+   * Get namenode connector using the given configuration and file path.
+   *
+   * @param conf
+   *          hdfs configuration
+   * @param filePath
+   *          file path
+   * @param namenodeCount
+   *          number of namenodes
+   * @param createMoverPath
+   *          create move path flag to skip the path creation
+   * @return Namenode connector.
+   * @throws IOException
+   */
+  public static NameNodeConnector getNameNodeConnector(Configuration conf,
+      Path filePath, int namenodeCount, boolean createMoverPath)
+          throws IOException {
+    final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+    Assert.assertEquals(namenodeCount, namenodes.size());
+    NameNodeConnector.checkOtherInstanceRunning(createMoverPath);
+    while (true) {
+      try {
+        final List<NameNodeConnector> nncs = NameNodeConnector
+            .newNameNodeConnectors(namenodes,
+                StoragePolicySatisfier.class.getSimpleName(),
+                filePath, conf,
+                NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+        return nncs.get(0);
+      } catch (IOException e) {
+        LOG.warn("Failed to connect with namenode", e);
+        // Ignore
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index d0c3a83..4863ca1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -123,8 +123,6 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
         .when(mockDn).getMetrics();
-    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
-        .when(mockDn).getStoragePolicySatisfyWorker();
 
     // Set up a simulated dataset with our fake BP
     mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -378,8 +376,6 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
       when(mockDn).getMetrics();
-    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
-        .when(mockDn).getStoragePolicySatisfyWorker();
     final AtomicInteger count = new AtomicInteger();
     Mockito.doAnswer(new Answer<Void>() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
deleted file mode 100644
index 51d3254..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Supplier;
-
-/**
- * This class tests the behavior of moving block replica to the given storage
- * type to fulfill the storage policy requirement.
- */
-public class TestStoragePolicySatisfyWorker {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestStoragePolicySatisfyWorker.class);
-  private static final int DEFAULT_BLOCK_SIZE = 100;
-  private MiniDFSCluster cluster = null;
-  private final Configuration conf = new HdfsConfiguration();
-
-  private static void initConf(Configuration conf) {
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
-    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
-        1L);
-    conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
-    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL.toString());
-    // Reduced refresh cycle to update latest datanodes.
-    conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
-        1000);
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    initConf(conf);
-  }
-
-  @After
-  public void teardown() throws IOException {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  /**
-   * Tests to verify that the block replica is moving to ARCHIVE storage type to
-   * fulfill the storage policy requirement.
-   */
-  @Test(timeout = 120000)
-  public void testMoveSingleBlockToAnotherDatanode() throws Exception {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4)
-        .storageTypes(
-            new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
-                {StorageType.DISK, StorageType.ARCHIVE},
-                {StorageType.ARCHIVE, StorageType.ARCHIVE},
-                {StorageType.ARCHIVE, StorageType.ARCHIVE}})
-        .build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String file = "/testMoveSingleBlockToAnotherDatanode";
-    // write to DISK
-    final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
-    out.writeChars("testMoveSingleBlockToAnotherDatanode");
-    out.close();
-
-    // verify before movement
-    LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
-    StorageType[] storageTypes = lb.getStorageTypes();
-    for (StorageType storageType : storageTypes) {
-      Assert.assertTrue(StorageType.DISK == storageType);
-    }
-    // move to ARCHIVE
-    dfs.setStoragePolicy(new Path(file), "COLD");
-
-    dfs.satisfyStoragePolicy(new Path(file));
-
-    cluster.triggerHeartbeats();
-
-    // Wait till NameNode notified about the block location details
-    waitForLocatedBlockWithArchiveStorageType(dfs, file, 2, 30000);
-  }
-
-  /**
-   * Test to verify that satisfy worker can't move blocks. If specified target
-   * datanode doesn't have enough space to accommodate the moving block.
-   */
-  @Test(timeout = 120000)
-  public void testMoveWithNoSpaceAvailable() throws Exception {
-    final long capacity = 150;
-    final String rack0 = "/rack0";
-    final String rack1 = "/rack1";
-    long[] capacities = new long[] {capacity, capacity, capacity / 2};
-    String[] hosts = {"host0", "host1", "host2"};
-    String[] racks = {rack0, rack1, rack0};
-    int numOfDatanodes = capacities.length;
-
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numOfDatanodes)
-        .hosts(hosts).racks(racks).simulatedCapacities(capacities)
-        .storageTypes(
-            new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
-                {StorageType.DISK, StorageType.ARCHIVE},
-                {StorageType.ARCHIVE, StorageType.ARCHIVE}})
-        .build();
-
-    cluster.waitActive();
-    InetSocketAddress[] favoredNodes = new InetSocketAddress[3];
-    for (int i = 0; i < favoredNodes.length; i++) {
-      // DFSClient will attempt reverse lookup. In case it resolves
-      // "127.0.0.1" to "localhost", we manually specify the hostname.
-      favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
-    }
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String file = "/testMoveWithNoSpaceAvailable";
-    DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 100,
-        DEFAULT_BLOCK_SIZE, (short) 2, 0, false, favoredNodes);
-
-    // verify before movement
-    LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
-    StorageType[] storageTypes = lb.getStorageTypes();
-    for (StorageType storageType : storageTypes) {
-      Assert.assertTrue(StorageType.DISK == storageType);
-    }
-
-    // move to ARCHIVE
-    dfs.setStoragePolicy(new Path(file), "COLD");
-
-    lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
-    DataNode src = cluster.getDataNodes().get(2);
-    DatanodeInfo targetDnInfo = DFSTestUtil
-        .getLocalDatanodeInfo(src.getXferPort());
-
-    SimpleBlocksMovementsStatusHandler handler =
-        new SimpleBlocksMovementsStatusHandler();
-    StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
-        src, handler);
-    try {
-      worker.start();
-      List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
-      BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
-          lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
-          lb.getStorageTypes()[0], StorageType.ARCHIVE);
-      blockMovingInfos.add(blockMovingInfo);
-      worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
-          blockMovingInfos);
-      waitForBlockMovementCompletion(handler, 1, 30000);
-    } finally {
-      worker.stop();
-    }
-  }
-
-  private void waitForBlockMovementCompletion(
-      final SimpleBlocksMovementsStatusHandler handler,
-      int expectedFinishedItemsCount, int timeout) throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        List<Block> completedBlocks = handler.getMoveAttemptFinishedBlocks();
-        int finishedCount = completedBlocks.size();
-        LOG.info("Block movement completed count={}, expected={} and actual={}",
-            completedBlocks.size(), expectedFinishedItemsCount, finishedCount);
-        return expectedFinishedItemsCount == finishedCount;
-      }
-    }, 100, timeout);
-  }
-
-  private void waitForLocatedBlockWithArchiveStorageType(
-      final DistributedFileSystem dfs, final String file,
-      int expectedArchiveCount, int timeout) throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        LocatedBlock lb = null;
-        try {
-          lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
-        } catch (IOException e) {
-          LOG.error("Exception while getting located blocks", e);
-          return false;
-        }
-
-        int archiveCount = 0;
-        for (StorageType storageType : lb.getStorageTypes()) {
-          if (StorageType.ARCHIVE == storageType) {
-            archiveCount++;
-          }
-        }
-        LOG.info("Archive replica count, expected={} and actual={}",
-            expectedArchiveCount, archiveCount);
-        return expectedArchiveCount == archiveCount;
-      }
-    }, 100, timeout);
-  }
-
-  private BlockMovingInfo prepareBlockMovingInfo(Block block,
-      DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
-      StorageType targetStorageType) {
-    return new BlockMovingInfo(block, src, destin, storageType,
-        targetStorageType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 900dcdb..200178d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -678,7 +678,7 @@ public class TestMover {
   public void testMoveWhenStoragePolicySatisfierIsRunning() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL.toString());
+        StoragePolicySatisfierMode.EXTERNAL.toString());
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(3)
         .storageTypes(
@@ -686,6 +686,9 @@ public class TestMover {
                 {StorageType.DISK}}).build();
     try {
       cluster.waitActive();
+      // Simulate External sps by creating #getNameNodeConnector instance.
+      DFSTestUtil.getNameNodeConnector(conf, HdfsServerConstants.MOVER_ID_PATH,
+          1, true);
       final DistributedFileSystem dfs = cluster.getFileSystem();
       final String file = "/testMoveWhenStoragePolicySatisfierIsRunning";
       // write to DISK
@@ -697,7 +700,7 @@ public class TestMover {
       dfs.setStoragePolicy(new Path(file), "COLD");
       int rc = ToolRunner.run(conf, new Mover.Cli(),
           new String[] {"-p", file.toString()});
-      int exitcode = ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode();
+      int exitcode = ExitStatus.IO_EXCEPTION.getExitCode();
       Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc);
     } finally {
       cluster.shutdown();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
index ee0b2e6..0a1b129 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
@@ -248,17 +248,17 @@ public class TestNameNodeReconfigure {
 
     // enable SPS internally by keeping DFS_STORAGE_POLICY_ENABLED_KEY
     nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL.toString());
+        StoragePolicySatisfierMode.EXTERNAL.toString());
 
     // Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled.
     assertNull("SPS shouldn't start as "
         + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled",
             nameNode.getNamesystem().getBlockManager().getSPSManager());
     verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL, false);
+        StoragePolicySatisfierMode.EXTERNAL, false);
 
     assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
-        StoragePolicySatisfierMode.INTERNAL.toString(), nameNode.getConf()
+        StoragePolicySatisfierMode.EXTERNAL.toString(), nameNode.getConf()
             .get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
             DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT));
   }
@@ -285,12 +285,6 @@ public class TestNameNodeReconfigure {
           e.getCause());
     }
 
-    // enable internal SPS
-    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL.toString());
-    verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL, true);
-
     // disable SPS
     nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
         StoragePolicySatisfierMode.NONE.toString());
@@ -302,7 +296,7 @@ public class TestNameNodeReconfigure {
         StoragePolicySatisfierMode.EXTERNAL.toString());
     assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
         false, nameNode.getNamesystem().getBlockManager().getSPSManager()
-            .isInternalSatisfierRunning());
+            .isSatisfierRunning());
     assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
         StoragePolicySatisfierMode.EXTERNAL.toString(),
         nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
@@ -337,27 +331,15 @@ public class TestNameNodeReconfigure {
               + " by admin. Seek for an admin help to enable it "
               + "or use Mover tool.", e);
     }
-
-    // start internal
-    nameNode.reconfigureProperty(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL.toString());
-    assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
-        true, nameNode.getNamesystem().getBlockManager().getSPSManager()
-            .isInternalSatisfierRunning());
-    assertEquals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY + " has wrong value",
-        StoragePolicySatisfierMode.INTERNAL.toString(),
-        nameNode.getConf().get(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-            DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT));
   }
 
   void verifySPSEnabled(final NameNode nameNode, String property,
       StoragePolicySatisfierMode expected, boolean isSatisfierRunning) {
     StoragePolicySatisfyManager spsMgr = nameNode
             .getNamesystem().getBlockManager().getSPSManager();
-    boolean isInternalSatisfierRunning = spsMgr != null
-        ? spsMgr.isInternalSatisfierRunning() : false;
-    assertEquals(property + " has wrong value", isSatisfierRunning,
-        isInternalSatisfierRunning);
+    boolean isSPSRunning = spsMgr != null ? spsMgr.isSatisfierRunning()
+        : false;
+    assertEquals(property + " has wrong value", isSPSRunning, isSPSRunning);
     String actual = nameNode.getConf().get(property,
         DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
     assertEquals(property + " has wrong value", expected,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index e079471..2ad8640 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -29,7 +29,11 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 
@@ -45,11 +49,13 @@ import static org.junit.Assert.*;
  * Test persistence of satisfying files/directories.
  */
 public class TestPersistentStoragePolicySatisfier {
-
   private static Configuration conf;
 
   private static MiniDFSCluster cluster;
   private static DistributedFileSystem fs;
+  private NameNodeConnector nnc;
+  private StoragePolicySatisfier sps;
+  private ExternalSPSContext ctxt;
 
   private static Path testFile =
       new Path("/testFile");
@@ -65,7 +71,6 @@ public class TestPersistentStoragePolicySatisfier {
   private static final String COLD = "COLD";
   private static final String WARM = "WARM";
   private static final String ONE_SSD = "ONE_SSD";
-  private static final String ALL_SSD = "ALL_SSD";
 
   private static StorageType[][] storageTypes = new StorageType[][] {
       {StorageType.DISK, StorageType.ARCHIVE, StorageType.SSD},
@@ -104,7 +109,7 @@ public class TestPersistentStoragePolicySatisfier {
         DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
         "3000");
     conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL.toString());
+        StoragePolicySatisfierMode.EXTERNAL.toString());
     // Reduced refresh cycle to update latest datanodes.
     conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
         1000);
@@ -124,6 +129,14 @@ public class TestPersistentStoragePolicySatisfier {
     } else {
       fs = cluster.getFileSystem();
     }
+    nnc = DFSTestUtil.getNameNodeConnector(conf,
+        HdfsServerConstants.MOVER_ID_PATH, 1, false);
+
+    sps = new StoragePolicySatisfier(conf);
+    ctxt = new ExternalSPSContext(sps, nnc);
+
+    sps.init(ctxt);
+    sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
 
     createTestFiles(fs, replication);
   }
@@ -158,6 +171,9 @@ public class TestPersistentStoragePolicySatisfier {
       cluster.shutdown(true);
       cluster = null;
     }
+    if (sps != null) {
+      sps.stopGracefully();
+    }
   }
 
   /**
@@ -203,49 +219,6 @@ public class TestPersistentStoragePolicySatisfier {
   }
 
   /**
-   * Tests to verify satisfier persistence working as expected
-   * in HA env. This test case runs as below:
-   * 1. setup HA cluster env with simple HA topology.
-   * 2. switch the active NameNode from nn0/nn1 to nn1/nn0.
-   * 3. make sure all the storage policies are satisfied.
-   * @throws Exception
-   */
-  @Test(timeout = 300000)
-  public void testWithHA() throws Exception {
-    try {
-      // Enable HA env for testing.
-      clusterSetUp(true, new HdfsConfiguration());
-
-      fs.setStoragePolicy(testFile, ALL_SSD);
-      fs.satisfyStoragePolicy(testFile);
-
-      cluster.transitionToStandby(0);
-      cluster.transitionToActive(1);
-
-      DFSTestUtil.waitExpectedStorageType(
-          testFileName, StorageType.SSD, 3, timeout, fs);
-
-      // test directory
-      fs.setStoragePolicy(parentDir, WARM);
-      fs.satisfyStoragePolicy(parentDir);
-      cluster.transitionToStandby(1);
-      cluster.transitionToActive(0);
-
-      DFSTestUtil.waitExpectedStorageType(
-          parentFileName, StorageType.DISK, 1, timeout, fs);
-      DFSTestUtil.waitExpectedStorageType(
-          parentFileName, StorageType.ARCHIVE, 2, timeout, fs);
-      DFSTestUtil.waitExpectedStorageType(
-          childFileName, StorageType.DISK, 1, timeout, fs);
-      DFSTestUtil.waitExpectedStorageType(
-          childFileName, StorageType.ARCHIVE, 2, timeout, fs);
-    } finally {
-      clusterShutdown();
-    }
-  }
-
-
-  /**
    * Tests to verify satisfier persistence working well with multiple
    * restarts operations. This test case runs as below:
    * 1. satisfy the storage policy of file1.
@@ -282,63 +255,6 @@ public class TestPersistentStoragePolicySatisfier {
   }
 
   /**
-   * Tests to verify satisfier persistence working well with
-   * federal HA env. This test case runs as below:
-   * 1. setup HA test environment with federal topology.
-   * 2. satisfy storage policy of file1.
-   * 3. switch active NameNode from nn0 to nn1.
-   * 4. switch active NameNode from nn2 to nn3.
-   * 5. check whether the storage policy of file1 is satisfied.
-   * @throws Exception
-   */
-  @Test(timeout = 300000)
-  public void testWithFederationHA() throws Exception {
-    MiniDFSCluster haCluster = null;
-    try {
-      conf = new HdfsConfiguration();
-      conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-          StoragePolicySatisfierMode.INTERNAL.toString());
-      // Reduced refresh cycle to update latest datanodes.
-      conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
-          1000);
-      haCluster = new MiniDFSCluster
-          .Builder(conf)
-          .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
-          .storageTypes(storageTypes)
-          .numDataNodes(storageTypes.length).build();
-      haCluster.waitActive();
-      haCluster.transitionToActive(1);
-      haCluster.transitionToActive(3);
-
-      fs = HATestUtil.configureFailoverFs(haCluster, conf);
-      createTestFiles(fs, (short) 3);
-
-      fs.setStoragePolicy(testFile, WARM);
-      fs.satisfyStoragePolicy(testFile);
-
-      haCluster.transitionToStandby(1);
-      haCluster.transitionToActive(0);
-      haCluster.transitionToStandby(3);
-      haCluster.transitionToActive(2);
-
-      DFSTestUtil.waitExpectedStorageType(
-          testFileName, StorageType.DISK, 1, timeout, fs);
-      DFSTestUtil.waitExpectedStorageType(
-          testFileName, StorageType.ARCHIVE, 2, timeout, fs);
-
-    } finally {
-      if(fs != null) {
-        fs.close();
-        fs = null;
-      }
-      if(haCluster != null) {
-        haCluster.shutdown(true);
-        haCluster = null;
-      }
-    }
-  }
-
-  /**
    * Tests to verify SPS xattr will be removed if the satisfy work has
    * been finished, expect that the method satisfyStoragePolicy can be
    * invoked on the same file again after the block movement has been
@@ -388,7 +304,7 @@ public class TestPersistentStoragePolicySatisfier {
    * 3. make sure sps xattr is removed.
    * @throws Exception
    */
-  @Test(timeout = 300000)
+  @Test(timeout = 300000000)
   public void testDropSPS() throws Exception {
     try {
       clusterSetUp();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ed3a66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
index 0cadc83..cf04db0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java
@@ -17,11 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.util.Time.monotonicNow;
-
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurationException;
@@ -32,24 +28,15 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Tests that StoragePolicySatisfier is able to work with HA enabled.
  */
 public class TestStoragePolicySatisfierWithHA {
   private MiniDFSCluster cluster = null;
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestStoragePolicySatisfierWithHA.class);
 
   private final Configuration config = new HdfsConfiguration();
   private static final int DEFAULT_BLOCK_SIZE = 1024;
@@ -67,7 +54,7 @@ public class TestStoragePolicySatisfierWithHA {
   private void createCluster() throws IOException {
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     config.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-        StoragePolicySatisfierMode.INTERNAL.toString());
+        StoragePolicySatisfierMode.EXTERNAL.toString());
     // Reduced refresh cycle to update latest datanodes.
     config.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
         1000);
@@ -101,50 +88,19 @@ public class TestStoragePolicySatisfierWithHA {
   public void testWhenNNHAStateChanges() throws IOException {
     try {
       createCluster();
-      boolean running;
-
-      dfs = cluster.getFileSystem(1);
-
-      try {
-        dfs.getClient().isInternalSatisfierRunning();
-        Assert.fail("Call this function to Standby NN should "
-            + "raise an exception.");
-      } catch (RemoteException e) {
-        IOException cause = e.unwrapRemoteException();
-        if (!(cause instanceof StandbyException)) {
-          Assert.fail("Unexpected exception happened " + e);
-        }
-      }
-
-      cluster.transitionToActive(0);
-      dfs = cluster.getFileSystem(0);
-      running = dfs.getClient().isInternalSatisfierRunning();
-      Assert.assertTrue("StoragePolicySatisfier should be active "
-          + "when NN transits from Standby to Active mode.", running);
-
       // NN transits from Active to Standby
       cluster.transitionToStandby(0);
-      try {
-        dfs.getClient().isInternalSatisfierRunning();
-        Assert.fail("NN in Standby again, call this function should "
-            + "raise an exception.");
-      } catch (RemoteException e) {
-        IOException cause = e.unwrapRemoteException();
-        if (!(cause instanceof StandbyException)) {
-          Assert.fail("Unexpected exception happened " + e);
-        }
-      }
-
+      cluster.waitActive();
       try {
         cluster.getNameNode(0).reconfigurePropertyImpl(
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
-            StoragePolicySatisfierMode.EXTERNAL.toString());
+            StoragePolicySatisfierMode.NONE.toString());
         Assert.fail("It's not allowed to enable or disable"
             + " StoragePolicySatisfier on Standby NameNode");
       } catch (ReconfigurationException e) {
         GenericTestUtils.assertExceptionContains("Could not change property "
             + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY
-            + " from 'INTERNAL' to 'EXTERNAL'", e);
+            + " from 'EXTERNAL' to 'NONE'", e);
         GenericTestUtils.assertExceptionContains(
             "Enabling or disabling storage policy satisfier service on "
                 + "standby NameNode is not allowed", e.getCause());
@@ -153,104 +109,4 @@ public class TestStoragePolicySatisfierWithHA {
       cluster.shutdown();
     }
   }
-
-  /**
-   * Test to verify that during namenode switch over will add
-   * DNA_DROP_SPS_WORK_COMMAND to all the datanodes. Later, this will ensure to
-   * drop all the SPS queues at datanode.
-   */
-  @Test(timeout = 90000)
-  public void testNamenodeSwitchoverShouldDropSPSWork() throws Exception {
-    try {
-      createCluster();
-
-      FSNamesystem fsn = cluster.getNamesystem(0);
-      ArrayList<DataNode> dataNodes = cluster.getDataNodes();
-      List<DatanodeDescriptor> listOfDns = new ArrayList<>();
-      for (DataNode dn : dataNodes) {
-        DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
-            dn.getDatanodeId());
-        listOfDns.add(dnd);
-      }
-      cluster.shutdownDataNodes();
-
-      cluster.transitionToStandby(0);
-      LOG.info("**Transition to Active**");
-      cluster.transitionToActive(1);
-
-      // Verify that Standby-to-Active transition should set drop SPS flag to
-      // true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
-      // propagated to datanode during heartbeat response.
-      int retries = 20;
-      boolean dropSPSWork = false;
-      while (retries > 0) {
-        for (DatanodeDescriptor dnd : listOfDns) {
-          dropSPSWork = dnd.shouldDropSPSWork();
-          if (!dropSPSWork) {
-            retries--;
-            Thread.sleep(250);
-            break;
-          }
-        }
-        if (dropSPSWork) {
-          break;
-        }
-      }
-      Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  /**
-   * Test to verify that SPS work will be dropped once the datanode is marked as
-   * expired. Internally 'dropSPSWork' flag is set as true while expiration and
-   * at the time of reconnection, will send DNA_DROP_SPS_WORK_COMMAND to that
-   * datanode.
-   */
-  @Test(timeout = 90000)
-  public void testDeadDatanode() throws Exception {
-    int heartbeatExpireInterval = 2 * 2000;
-    config.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
-        3000);
-    config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000L);
-    createCluster();
-
-    DataNode dn = cluster.getDataNodes().get(0);
-    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
-
-    FSNamesystem fsn = cluster.getNamesystem(0);
-    DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn,
-        dn.getDatanodeId());
-    boolean isDead = false;
-    int retries = 20;
-    while (retries > 0) {
-      isDead = dnd.getLastUpdateMonotonic() < (monotonicNow()
-          - heartbeatExpireInterval);
-      if (isDead) {
-        break;
-      }
-      retries--;
-      Thread.sleep(250);
-    }
-    Assert.assertTrue("Datanode is alive", isDead);
-    // Disable datanode heartbeat, so that the datanode will get expired after
-    // the recheck interval and become dead.
-    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
-
-    // Verify that datanode expiration will set drop SPS flag to
-    // true. This will ensure that DNA_DROP_SPS_WORK_COMMAND will be
-    // propagated to datanode during reconnection.
-    boolean dropSPSWork = false;
-    retries = 50;
-    while (retries > 0) {
-      dropSPSWork = dnd.shouldDropSPSWork();
-      if (dropSPSWork) {
-        break;
-      }
-      retries--;
-      Thread.sleep(100);
-    }
-    Assert.assertTrue("Didn't drop SPS work", dropSPSWork);
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org