You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 03:49:21 UTC

[28/50] [abbrv] hadoop git commit: HDFS-12955: [SPS]: Move SPS classes to a separate package. Contributed by Rakesh R.

HDFS-12955: [SPS]: Move SPS classes to a separate package. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0b71ea21
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0b71ea21
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0b71ea21

Branch: refs/heads/HDFS-10285
Commit: 0b71ea214ffc18c1339c1540ff0867d1e5ae08a9
Parents: 2c15ac0
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Fri Dec 22 09:10:12 2017 -0800
Committer: Uma Maheswara Rao Gangumalla <um...@apache.org>
Committed: Thu Aug 9 20:47:22 2018 -0700

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    |    6 +-
 .../BlockStorageMovementAttemptedItems.java     |  241 ---
 .../namenode/BlockStorageMovementNeeded.java    |  574 ------
 .../hdfs/server/namenode/FSNamesystem.java      |    1 +
 .../hdfs/server/namenode/IntraNNSPSContext.java |   41 +
 .../server/namenode/StoragePolicySatisfier.java |  973 ----------
 .../sps/BlockStorageMovementAttemptedItems.java |  241 +++
 .../sps/BlockStorageMovementNeeded.java         |  572 ++++++
 .../namenode/sps/StoragePolicySatisfier.java    |  988 ++++++++++
 .../hdfs/server/namenode/sps/package-info.java  |   28 +
 .../TestBlockStorageMovementAttemptedItems.java |  196 --
 .../namenode/TestStoragePolicySatisfier.java    | 1775 -----------------
 ...stStoragePolicySatisfierWithStripedFile.java |  580 ------
 .../TestBlockStorageMovementAttemptedItems.java |  196 ++
 .../sps/TestStoragePolicySatisfier.java         | 1779 ++++++++++++++++++
 ...stStoragePolicySatisfierWithStripedFile.java |  580 ++++++
 16 files changed, 4430 insertions(+), 4341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 0957fe2..ec99a9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -89,11 +89,12 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
+import org.apache.hadoop.hdfs.server.namenode.IntraNNSPSContext;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -478,7 +479,8 @@ public class BlockManager implements BlockStatsMXBean {
         conf.getBoolean(
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
-    sps = new StoragePolicySatisfier(namesystem, this, conf);
+    StoragePolicySatisfier.Context spsctxt = new IntraNNSPSContext(namesystem);
+    sps = new StoragePolicySatisfier(namesystem, this, conf, spsctxt);
     blockTokenSecretManager = createBlockTokenSecretManager(conf);
 
     providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
deleted file mode 100644
index 643255f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.util.Daemon;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * A monitor class for checking whether block storage movements attempt
- * completed or not. If this receives block storage movement attempt
- * status(either success or failure) from DN then it will just remove the
- * entries from tracking. If there is no DN reports about movement attempt
- * finished for a longer time period, then such items will retries automatically
- * after timeout. The default timeout would be 5 minutes.
- */
-public class BlockStorageMovementAttemptedItems {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
-
-  /**
-   * A map holds the items which are already taken for blocks movements
-   * processing and sent to DNs.
-   */
-  private final List<AttemptedItemInfo> storageMovementAttemptedItems;
-  private final List<Block> movementFinishedBlocks;
-  private volatile boolean monitorRunning = true;
-  private Daemon timerThread = null;
-  //
-  // It might take anywhere between 5 to 10 minutes before
-  // a request is timed out.
-  //
-  private long selfRetryTimeout = 5 * 60 * 1000;
-
-  //
-  // It might take anywhere between 1 to 2 minutes before
-  // a request is timed out.
-  //
-  private long minCheckTimeout = 1 * 60 * 1000; // minimum value
-  private BlockStorageMovementNeeded blockStorageMovementNeeded;
-
-  public BlockStorageMovementAttemptedItems(long recheckTimeout,
-      long selfRetryTimeout,
-      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
-    if (recheckTimeout > 0) {
-      this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
-    }
-
-    this.selfRetryTimeout = selfRetryTimeout;
-    this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
-    storageMovementAttemptedItems = new ArrayList<>();
-    movementFinishedBlocks = new ArrayList<>();
-  }
-
-  /**
-   * Add item to block storage movement attempted items map which holds the
-   * tracking/blockCollection id versus time stamp.
-   *
-   * @param itemInfo
-   *          - tracking info
-   */
-  public void add(AttemptedItemInfo itemInfo) {
-    synchronized (storageMovementAttemptedItems) {
-      storageMovementAttemptedItems.add(itemInfo);
-    }
-  }
-
-  /**
-   * Add the storage movement attempt finished blocks to
-   * storageMovementFinishedBlocks.
-   *
-   * @param moveAttemptFinishedBlks
-   *          storage movement attempt finished blocks
-   */
-  public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
-    if (moveAttemptFinishedBlks.length == 0) {
-      return;
-    }
-    synchronized (movementFinishedBlocks) {
-      movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
-    }
-  }
-
-  /**
-   * Starts the monitor thread.
-   */
-  public synchronized void start() {
-    monitorRunning = true;
-    timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor());
-    timerThread.setName("BlocksStorageMovementAttemptMonitor");
-    timerThread.start();
-  }
-
-  /**
-   * Sets running flag to false. Also, this will interrupt monitor thread and
-   * clear all the queued up tasks.
-   */
-  public synchronized void stop() {
-    monitorRunning = false;
-    if (timerThread != null) {
-      timerThread.interrupt();
-    }
-    this.clearQueues();
-  }
-
-  /**
-   * Timed wait to stop monitor thread.
-   */
-  synchronized void stopGracefully() {
-    if (timerThread == null) {
-      return;
-    }
-    if (monitorRunning) {
-      stop();
-    }
-    try {
-      timerThread.join(3000);
-    } catch (InterruptedException ie) {
-    }
-  }
-
-  /**
-   * A monitor class for checking block storage movement attempt status and long
-   * waiting items periodically.
-   */
-  private class BlocksStorageMovementAttemptMonitor implements Runnable {
-    @Override
-    public void run() {
-      while (monitorRunning) {
-        try {
-          blockStorageMovementReportedItemsCheck();
-          blocksStorageMovementUnReportedItemsCheck();
-          Thread.sleep(minCheckTimeout);
-        } catch (InterruptedException ie) {
-          LOG.info("BlocksStorageMovementAttemptMonitor thread "
-              + "is interrupted.", ie);
-        } catch (IOException ie) {
-          LOG.warn("BlocksStorageMovementAttemptMonitor thread "
-              + "received exception and exiting.", ie);
-        }
-      }
-    }
-  }
-
-  @VisibleForTesting
-  void blocksStorageMovementUnReportedItemsCheck() {
-    synchronized (storageMovementAttemptedItems) {
-      Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
-          .iterator();
-      long now = monotonicNow();
-      while (iter.hasNext()) {
-        AttemptedItemInfo itemInfo = iter.next();
-        if (now > itemInfo.getLastAttemptedOrReportedTime()
-            + selfRetryTimeout) {
-          Long blockCollectionID = itemInfo.getTrackId();
-          synchronized (movementFinishedBlocks) {
-            ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
-                blockCollectionID, itemInfo.getRetryCount() + 1);
-            blockStorageMovementNeeded.add(candidate);
-            iter.remove();
-            LOG.info("TrackID: {} becomes timed out and moved to needed "
-                + "retries queue for next iteration.", blockCollectionID);
-          }
-        }
-      }
-
-    }
-  }
-
-  @VisibleForTesting
-  void blockStorageMovementReportedItemsCheck() throws IOException {
-    synchronized (movementFinishedBlocks) {
-      Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
-      while (finishedBlksIter.hasNext()) {
-        Block blk = finishedBlksIter.next();
-        synchronized (storageMovementAttemptedItems) {
-          Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
-              .iterator();
-          while (iterator.hasNext()) {
-            AttemptedItemInfo attemptedItemInfo = iterator.next();
-            attemptedItemInfo.getBlocks().remove(blk);
-            if (attemptedItemInfo.getBlocks().isEmpty()) {
-              // TODO: try add this at front of the Queue, so that this element
-              // gets the chance first and can be cleaned from queue quickly as
-              // all movements already done.
-              blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
-                  .getStartId(), attemptedItemInfo.getTrackId(),
-                  attemptedItemInfo.getRetryCount() + 1));
-              iterator.remove();
-            }
-          }
-        }
-        // Remove attempted blocks from movementFinishedBlocks list.
-        finishedBlksIter.remove();
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public int getMovementFinishedBlocksCount() {
-    return movementFinishedBlocks.size();
-  }
-
-  @VisibleForTesting
-  public int getAttemptedItemsCount() {
-    return storageMovementAttemptedItems.size();
-  }
-
-  public void clearQueues() {
-    movementFinishedBlocks.clear();
-    storageMovementAttemptedItems.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
deleted file mode 100644
index 89bcbff..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
+++ /dev/null
@@ -1,574 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * A Class to track the block collection IDs (Inode's ID) for which physical
- * storage movement needed as per the Namespace and StorageReports from DN.
- * It scan the pending directories for which storage movement is required and
- * schedule the block collection IDs for movement. It track the info of
- * scheduled items and remove the SPS xAttr from the file/Directory once
- * movement is success.
- */
-@InterfaceAudience.Private
-public class BlockStorageMovementNeeded {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
-
-  private final Queue<ItemInfo> storageMovementNeeded =
-      new LinkedList<ItemInfo>();
-
-  /**
-   * Map of startId and number of child's. Number of child's indicate the
-   * number of files pending to satisfy the policy.
-   */
-  private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
-      new HashMap<Long, DirPendingWorkInfo>();
-
-  private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
-      new ConcurrentHashMap<>();
-
-  private final Namesystem namesystem;
-
-  // List of pending dir to satisfy the policy
-  private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
-
-  private final StoragePolicySatisfier sps;
-
-  private Daemon inodeIdCollector;
-
-  private final int maxQueuedItem;
-
-  // Amount of time to cache the SUCCESS status of path before turning it to
-  // NOT_AVAILABLE.
-  private static long statusClearanceElapsedTimeMs = 300000;
-
-  public BlockStorageMovementNeeded(Namesystem namesystem,
-      StoragePolicySatisfier sps, int queueLimit) {
-    this.namesystem = namesystem;
-    this.sps = sps;
-    this.maxQueuedItem = queueLimit;
-  }
-
-  /**
-   * Add the candidate to tracking list for which storage movement
-   * expected if necessary.
-   *
-   * @param trackInfo
-   *          - track info for satisfy the policy
-   */
-  public synchronized void add(ItemInfo trackInfo) {
-    spsStatus.put(trackInfo.getStartId(),
-        new StoragePolicySatisfyPathStatusInfo(
-            StoragePolicySatisfyPathStatus.IN_PROGRESS));
-    storageMovementNeeded.add(trackInfo);
-  }
-
-  /**
-   * Add the itemInfo to tracking list for which storage movement
-   * expected if necessary.
-   * @param startId
-   *            - start id
-   * @param itemInfoList
-   *            - List of child in the directory
-   */
-  @VisibleForTesting
-  public synchronized void addAll(long startId,
-      List<ItemInfo> itemInfoList, boolean scanCompleted) {
-    storageMovementNeeded.addAll(itemInfoList);
-    DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
-    if (pendingWork == null) {
-      pendingWork = new DirPendingWorkInfo();
-      pendingWorkForDirectory.put(startId, pendingWork);
-    }
-    pendingWork.addPendingWorkCount(itemInfoList.size());
-    if (scanCompleted) {
-      pendingWork.markScanCompleted();
-    }
-  }
-
-  /**
-   * Gets the block collection id for which storage movements check necessary
-   * and make the movement if required.
-   *
-   * @return block collection ID
-   */
-  public synchronized ItemInfo get() {
-    return storageMovementNeeded.poll();
-  }
-
-  public synchronized void addToPendingDirQueue(long id) {
-    spsStatus.put(id, new StoragePolicySatisfyPathStatusInfo(
-        StoragePolicySatisfyPathStatus.PENDING));
-    spsDirsToBeTraveresed.add(id);
-    // Notify waiting FileInodeIdCollector thread about the newly
-    // added SPS path.
-    synchronized (spsDirsToBeTraveresed) {
-      spsDirsToBeTraveresed.notify();
-    }
-  }
-
-  /**
-   * Returns queue remaining capacity.
-   */
-  public synchronized int remainingCapacity() {
-    int size = storageMovementNeeded.size();
-    if (size >= maxQueuedItem) {
-      return 0;
-    } else {
-      return (maxQueuedItem - size);
-    }
-  }
-
-  /**
-   * Returns queue size.
-   */
-  public synchronized int size() {
-    return storageMovementNeeded.size();
-  }
-
-  public synchronized void clearAll() {
-    spsDirsToBeTraveresed.clear();
-    storageMovementNeeded.clear();
-    pendingWorkForDirectory.clear();
-  }
-
-  /**
-   * Decrease the pending child count for directory once one file blocks moved
-   * successfully. Remove the SPS xAttr if pending child count is zero.
-   */
-  public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
-      boolean isSuccess) throws IOException {
-    if (trackInfo.isDir()) {
-      // If track is part of some start inode then reduce the pending
-      // directory work count.
-      long startId = trackInfo.getStartId();
-      INode inode = namesystem.getFSDirectory().getInode(startId);
-      if (inode == null) {
-        // directory deleted just remove it.
-        this.pendingWorkForDirectory.remove(startId);
-        updateStatus(startId, isSuccess);
-      } else {
-        DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
-        if (pendingWork != null) {
-          pendingWork.decrementPendingWorkCount();
-          if (pendingWork.isDirWorkDone()) {
-            namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
-            pendingWorkForDirectory.remove(startId);
-            pendingWork.setFailure(!isSuccess);
-            updateStatus(startId, pendingWork.isPolicySatisfied());
-          }
-          pendingWork.setFailure(isSuccess);
-        }
-      }
-    } else {
-      // Remove xAttr if trackID doesn't exist in
-      // storageMovementAttemptedItems or file policy satisfied.
-      namesystem.removeXattr(trackInfo.getTrackId(),
-          XATTR_SATISFY_STORAGE_POLICY);
-      updateStatus(trackInfo.getStartId(), isSuccess);
-    }
-  }
-
-  public synchronized void clearQueue(long trackId) {
-    spsDirsToBeTraveresed.remove(trackId);
-    Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
-    while (iterator.hasNext()) {
-      ItemInfo next = iterator.next();
-      if (next.getStartId() == trackId) {
-        iterator.remove();
-      }
-    }
-    pendingWorkForDirectory.remove(trackId);
-  }
-
-  /**
-   * Mark inode status as SUCCESS in map.
-   */
-  private void updateStatus(long startId, boolean isSuccess){
-    StoragePolicySatisfyPathStatusInfo spsStatusInfo =
-        spsStatus.get(startId);
-    if (spsStatusInfo == null) {
-      spsStatusInfo = new StoragePolicySatisfyPathStatusInfo();
-      spsStatus.put(startId, spsStatusInfo);
-    }
-
-    if (isSuccess) {
-      spsStatusInfo.setSuccess();
-    } else {
-      spsStatusInfo.setFailure();
-    }
-  }
-
-  /**
-   * Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
-   * and notify to clean up required resources.
-   * @throws IOException
-   */
-  public synchronized void clearQueuesWithNotification() {
-    // Remove xAttr from directories
-    Long trackId;
-    while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
-      try {
-        // Remove xAttr for file
-        namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
-      } catch (IOException ie) {
-        LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
-      }
-    }
-
-    // File's directly added to storageMovementNeeded, So try to remove
-    // xAttr for file
-    ItemInfo itemInfo;
-    while ((itemInfo = storageMovementNeeded.poll()) != null) {
-      try {
-        // Remove xAttr for file
-        if (!itemInfo.isDir()) {
-          namesystem.removeXattr(itemInfo.getTrackId(),
-              XATTR_SATISFY_STORAGE_POLICY);
-        }
-      } catch (IOException ie) {
-        LOG.warn(
-            "Failed to remove SPS xattr for track id "
-                + itemInfo.getTrackId(), ie);
-      }
-    }
-    this.clearAll();
-  }
-
-  /**
-   * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
-   * ID's to process for satisfy the policy.
-   */
-  private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
-      implements Runnable {
-
-    private int remainingCapacity = 0;
-
-    private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
-
-    StorageMovementPendingInodeIdCollector(FSDirectory dir) {
-      super(dir);
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Starting FileInodeIdCollector!.");
-      long lastStatusCleanTime = 0;
-      while (namesystem.isRunning() && sps.isRunning()) {
-        try {
-          if (!namesystem.isInSafeMode()) {
-            FSDirectory fsd = namesystem.getFSDirectory();
-            Long startINodeId = spsDirsToBeTraveresed.poll();
-            if (startINodeId == null) {
-              // Waiting for SPS path
-              synchronized (spsDirsToBeTraveresed) {
-                spsDirsToBeTraveresed.wait(5000);
-              }
-            } else {
-              INode startInode = fsd.getInode(startINodeId);
-              if (startInode != null) {
-                try {
-                  remainingCapacity = remainingCapacity();
-                  spsStatus.put(startINodeId,
-                      new StoragePolicySatisfyPathStatusInfo(
-                          StoragePolicySatisfyPathStatus.IN_PROGRESS));
-                  readLock();
-                  traverseDir(startInode.asDirectory(), startINodeId,
-                      HdfsFileStatus.EMPTY_NAME,
-                      new SPSTraverseInfo(startINodeId));
-                } finally {
-                  readUnlock();
-                }
-                // Mark startInode traverse is done
-                addAll(startInode.getId(), currentBatch, true);
-                currentBatch.clear();
-
-                // check if directory was empty and no child added to queue
-                DirPendingWorkInfo dirPendingWorkInfo =
-                    pendingWorkForDirectory.get(startInode.getId());
-                if (dirPendingWorkInfo.isDirWorkDone()) {
-                  namesystem.removeXattr(startInode.getId(),
-                      XATTR_SATISFY_STORAGE_POLICY);
-                  pendingWorkForDirectory.remove(startInode.getId());
-                  updateStatus(startInode.getId(), true);
-                }
-              }
-            }
-            //Clear the SPS status if status is in SUCCESS more than 5 min.
-            if (Time.monotonicNow()
-                - lastStatusCleanTime > statusClearanceElapsedTimeMs) {
-              lastStatusCleanTime = Time.monotonicNow();
-              cleanSpsStatus();
-            }
-          }
-        } catch (Throwable t) {
-          LOG.warn("Exception while loading inodes to satisfy the policy", t);
-        }
-      }
-    }
-
-    private synchronized void cleanSpsStatus() {
-      for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
-          spsStatus.entrySet().iterator(); it.hasNext();) {
-        Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
-        if (entry.getValue().canRemove()) {
-          it.remove();
-        }
-      }
-    }
-
-    @Override
-    protected void checkPauseForTesting() throws InterruptedException {
-      // TODO implement if needed
-    }
-
-    @Override
-    protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
-        throws IOException, InterruptedException {
-      assert getFSDirectory().hasReadLock();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Processing {} for statisy the policy",
-            inode.getFullPathName());
-      }
-      if (!inode.isFile()) {
-        return false;
-      }
-      if (inode.isFile() && inode.asFile().numBlocks() != 0) {
-        currentBatch.add(new ItemInfo(
-            ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
-        remainingCapacity--;
-      }
-      return true;
-    }
-
-    @Override
-    protected boolean canSubmitCurrentBatch() {
-      return remainingCapacity <= 0;
-    }
-
-    @Override
-    protected void checkINodeReady(long startId) throws IOException {
-      FSNamesystem fsn = ((FSNamesystem) namesystem);
-      fsn.checkNameNodeSafeMode("NN is in safe mode,"
-          + "cannot satisfy the policy.");
-      // SPS work should be cancelled when NN goes to standby. Just
-      // double checking for sanity.
-      fsn.checkOperation(NameNode.OperationCategory.WRITE);
-    }
-
-    @Override
-    protected void submitCurrentBatch(long startId)
-        throws IOException, InterruptedException {
-      // Add current child's to queue
-      addAll(startId, currentBatch, false);
-      currentBatch.clear();
-    }
-
-    @Override
-    protected void throttle() throws InterruptedException {
-      assert !getFSDirectory().hasReadLock();
-      assert !namesystem.hasReadLock();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
-            + " waiting for some free slots.");
-      }
-      remainingCapacity = remainingCapacity();
-      // wait for queue to be free
-      while (remainingCapacity <= 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Waiting for storageMovementNeeded queue to be free!");
-        }
-        Thread.sleep(5000);
-        remainingCapacity = remainingCapacity();
-      }
-    }
-
-    @Override
-    protected boolean canTraverseDir(INode inode) throws IOException {
-      return true;
-    }
-  }
-
-  /**
-   * Info for directory recursive scan.
-   */
-  public static class DirPendingWorkInfo {
-
-    private int pendingWorkCount = 0;
-    private boolean fullyScanned = false;
-    private boolean success = true;
-
-    /**
-     * Increment the pending work count for directory.
-     */
-    public synchronized void addPendingWorkCount(int count) {
-      this.pendingWorkCount = this.pendingWorkCount + count;
-    }
-
-    /**
-     * Decrement the pending work count for directory one track info is
-     * completed.
-     */
-    public synchronized void decrementPendingWorkCount() {
-      this.pendingWorkCount--;
-    }
-
-    /**
-     * Return true if all the pending work is done and directory fully
-     * scanned, otherwise false.
-     */
-    public synchronized boolean isDirWorkDone() {
-      return (pendingWorkCount <= 0 && fullyScanned);
-    }
-
-    /**
-     * Mark directory scan is completed.
-     */
-    public synchronized void markScanCompleted() {
-      this.fullyScanned = true;
-    }
-
-    /**
-     * Return true if all the files block movement is success, otherwise false.
-     */
-    public boolean isPolicySatisfied() {
-      return success;
-    }
-
-    /**
-     * Set directory SPS status failed.
-     */
-    public void setFailure(boolean failure) {
-      this.success = this.success || failure;
-    }
-  }
-
-  public void init() {
-    inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
-        namesystem.getFSDirectory()));
-    inodeIdCollector.setName("FileInodeIdCollector");
-    inodeIdCollector.start();
-  }
-
-  public void close() {
-    if (inodeIdCollector != null) {
-      inodeIdCollector.interrupt();
-    }
-  }
-
-  class SPSTraverseInfo extends TraverseInfo {
-    private long startId;
-
-    SPSTraverseInfo(long startId) {
-      this.startId = startId;
-    }
-
-    public long getStartId() {
-      return startId;
-    }
-  }
-
-  /**
-   * Represent the file/directory block movement status.
-   */
-  static class StoragePolicySatisfyPathStatusInfo {
-    private StoragePolicySatisfyPathStatus status =
-        StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
-    private long lastStatusUpdateTime;
-
-    StoragePolicySatisfyPathStatusInfo() {
-      this.lastStatusUpdateTime = 0;
-    }
-
-    StoragePolicySatisfyPathStatusInfo(StoragePolicySatisfyPathStatus status) {
-      this.status = status;
-      this.lastStatusUpdateTime = 0;
-    }
-
-    private void setSuccess() {
-      this.status = StoragePolicySatisfyPathStatus.SUCCESS;
-      this.lastStatusUpdateTime = Time.monotonicNow();
-    }
-
-    private void setFailure() {
-      this.status = StoragePolicySatisfyPathStatus.FAILURE;
-      this.lastStatusUpdateTime = Time.monotonicNow();
-    }
-
-    private StoragePolicySatisfyPathStatus getStatus() {
-      return status;
-    }
-
-    /**
-     * Return true if SUCCESS status cached more then 5 min.
-     */
-    private boolean canRemove() {
-      return (StoragePolicySatisfyPathStatus.SUCCESS == status
-          || StoragePolicySatisfyPathStatus.FAILURE == status)
-          && (Time.monotonicNow()
-              - lastStatusUpdateTime) > statusClearanceElapsedTimeMs;
-    }
-  }
-
-  public StoragePolicySatisfyPathStatus getStatus(long id) {
-    StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(id);
-    if(spsStatusInfo == null){
-      return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
-    }
-    return spsStatusInfo.getStatus();
-  }
-
-  @VisibleForTesting
-  public static void setStatusClearanceElapsedTimeMs(
-      long statusClearanceElapsedTimeMs) {
-    BlockStorageMovementNeeded.statusClearanceElapsedTimeMs =
-        statusClearanceElapsedTimeMs;
-  }
-
-  @VisibleForTesting
-  public static long getStatusClearanceElapsedTimeMs() {
-    return statusClearanceElapsedTimeMs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 5990a5f..2f625ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -258,6 +258,7 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
new file mode 100644
index 0000000..111cabb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+
+/**
+ * This class is the Namenode implementation for analyzing the file blocks which
+ * are expecting to change its storages and assigning the block storage
+ * movements to satisfy the storage policy.
+ */
+// TODO: Now, added one API which is required for sps package. Will refine
+// this interface via HDFS-12911.
+public class IntraNNSPSContext implements StoragePolicySatisfier.Context {
+  private final Namesystem namesystem;
+
+  public IntraNNSPSContext(Namesystem namesystem) {
+    this.namesystem = namesystem;
+  }
+
+  @Override
+  public int getNumLiveDataNodes() {
+    return namesystem.getFSDirectory().getBlockManager().getDatanodeManager()
+        .getNumLiveDataNodes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
deleted file mode 100644
index 972e744..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ /dev/null
@@ -1,973 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
-import org.apache.hadoop.hdfs.server.balancer.Matcher;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
-import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.apache.hadoop.util.Daemon;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Setting storagePolicy on a file after the file write will only update the new
- * storage policy type in Namespace, but physical block storage movement will
- * not happen until user runs "Mover Tool" explicitly for such files. The
- * StoragePolicySatisfier Daemon thread implemented for addressing the case
- * where users may want to physically move the blocks by HDFS itself instead of
- * running mover tool explicitly. Just calling client API to
- * satisfyStoragePolicy on a file/dir will automatically trigger to move its
- * physical storage locations as expected in asynchronous manner. Here Namenode
- * will pick the file blocks which are expecting to change its storages, then it
- * will build the mapping of source block location and expected storage type and
- * location to move. After that this class will also prepare commands to send to
- * Datanode for processing the physical block movements.
- */
-@InterfaceAudience.Private
-public class StoragePolicySatisfier implements Runnable {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(StoragePolicySatisfier.class);
-  private Daemon storagePolicySatisfierThread;
-  private final Namesystem namesystem;
-  private final BlockManager blockManager;
-  private final BlockStorageMovementNeeded storageMovementNeeded;
-  private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
-  private volatile boolean isRunning = false;
-  private int spsWorkMultiplier;
-  private long blockCount = 0L;
-  private int blockMovementMaxRetry;
-  /**
-   * Represents the collective analysis status for all blocks.
-   */
-  private static class BlocksMovingAnalysis {
-
-    enum Status {
-      // Represents that, the analysis skipped due to some conditions. A such
-      // condition is if block collection is in incomplete state.
-      ANALYSIS_SKIPPED_FOR_RETRY,
-      // Represents that few or all blocks found respective target to do
-      // the storage movement.
-      BLOCKS_TARGETS_PAIRED,
-      // Represents that none of the blocks found respective target to do
-      // the storage movement.
-      NO_BLOCKS_TARGETS_PAIRED,
-      // Represents that, none of the blocks found for block storage movements.
-      BLOCKS_ALREADY_SATISFIED,
-      // Represents that, the analysis skipped due to some conditions.
-      // Example conditions are if no blocks really exists in block collection
-      // or
-      // if analysis is not required on ec files with unsuitable storage
-      // policies
-      BLOCKS_TARGET_PAIRING_SKIPPED,
-      // Represents that, All the reported blocks are satisfied the policy but
-      // some of the blocks are low redundant.
-      FEW_LOW_REDUNDANCY_BLOCKS
-    }
-
-    private Status status = null;
-    private List<Block> assignedBlocks = null;
-
-    BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) {
-      this.status = status;
-      this.assignedBlocks = blockMovingInfo;
-    }
-  }
-
-  public StoragePolicySatisfier(final Namesystem namesystem,
-      final BlockManager blkManager, Configuration conf) {
-    this.namesystem = namesystem;
-    this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
-        this, conf.getInt(
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
-    this.blockManager = blkManager;
-    this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
-        conf.getLong(
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT),
-        conf.getLong(
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
-            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
-        storageMovementNeeded);
-    this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
-    this.blockMovementMaxRetry = conf.getInt(
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
-        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
-  }
-
-  /**
-   * Start storage policy satisfier demon thread. Also start block storage
-   * movements monitor for retry the attempts if needed.
-   */
-  public synchronized void start(boolean reconfigStart) {
-    isRunning = true;
-    if (checkIfMoverRunning()) {
-      isRunning = false;
-      LOG.error(
-          "Stopping StoragePolicySatisfier thread " + "as Mover ID file "
-              + HdfsServerConstants.MOVER_ID_PATH.toString()
-              + " been opened. Maybe a Mover instance is running!");
-      return;
-    }
-    if (reconfigStart) {
-      LOG.info("Starting StoragePolicySatisfier, as admin requested to "
-          + "start it.");
-    } else {
-      LOG.info("Starting StoragePolicySatisfier.");
-    }
-
-    // Ensure that all the previously submitted block movements(if any) have to
-    // be stopped in all datanodes.
-    addDropSPSWorkCommandsToAllDNs();
-    storageMovementNeeded.init();
-    storagePolicySatisfierThread = new Daemon(this);
-    storagePolicySatisfierThread.setName("StoragePolicySatisfier");
-    storagePolicySatisfierThread.start();
-    this.storageMovementsMonitor.start();
-  }
-
-  /**
-   * Disables storage policy satisfier by stopping its services.
-   *
-   * @param forceStop
-   *          true represents that it should stop SPS service by clearing all
-   *          pending SPS work
-   */
-  public synchronized void disable(boolean forceStop) {
-    isRunning = false;
-
-    if (storagePolicySatisfierThread == null) {
-      return;
-    }
-
-    storageMovementNeeded.close();
-
-    storagePolicySatisfierThread.interrupt();
-    this.storageMovementsMonitor.stop();
-    if (forceStop) {
-      storageMovementNeeded.clearQueuesWithNotification();
-      addDropSPSWorkCommandsToAllDNs();
-    } else {
-      LOG.info("Stopping StoragePolicySatisfier.");
-    }
-  }
-
-  /**
-   * Timed wait to stop storage policy satisfier daemon threads.
-   */
-  public synchronized void stopGracefully() {
-    if (isRunning) {
-      disable(true);
-    }
-    this.storageMovementsMonitor.stopGracefully();
-
-    if (storagePolicySatisfierThread == null) {
-      return;
-    }
-    try {
-      storagePolicySatisfierThread.join(3000);
-    } catch (InterruptedException ie) {
-    }
-  }
-
-  /**
-   * Check whether StoragePolicySatisfier is running.
-   * @return true if running
-   */
-  public boolean isRunning() {
-    return isRunning;
-  }
-
-  // Return true if a Mover instance is running
-  private boolean checkIfMoverRunning() {
-    String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
-    return namesystem.isFileOpenedForWrite(moverId);
-  }
-
-  /**
-   * Adding drop commands to all datanodes to stop performing the satisfier
-   * block movements, if any.
-   */
-  private void addDropSPSWorkCommandsToAllDNs() {
-    this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
-  }
-
-  @Override
-  public void run() {
-    while (namesystem.isRunning() && isRunning) {
-      try {
-        if (!namesystem.isInSafeMode()) {
-          ItemInfo itemInfo = storageMovementNeeded.get();
-          if (itemInfo != null) {
-            if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
-              LOG.info("Failed to satisfy the policy after "
-                  + blockMovementMaxRetry + " retries. Removing inode "
-                  + itemInfo.getTrackId() + " from the queue");
-              storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
-              continue;
-            }
-            long trackId = itemInfo.getTrackId();
-            BlockCollection blockCollection;
-            BlocksMovingAnalysis status = null;
-            try {
-              namesystem.readLock();
-              blockCollection = namesystem.getBlockCollection(trackId);
-              // Check blockCollectionId existence.
-              if (blockCollection == null) {
-                // File doesn't exists (maybe got deleted), remove trackId from
-                // the queue
-                storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
-              } else {
-                status =
-                    analyseBlocksStorageMovementsAndAssignToDN(
-                        blockCollection);
-              }
-            } finally {
-              namesystem.readUnlock();
-            }
-            if (blockCollection != null) {
-              switch (status.status) {
-              // Just add to monitor, so it will be retried after timeout
-              case ANALYSIS_SKIPPED_FOR_RETRY:
-                // Just add to monitor, so it will be tracked for report and
-                // be removed on storage movement attempt finished report.
-              case BLOCKS_TARGETS_PAIRED:
-                this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
-                    .getStartId(), itemInfo.getTrackId(), monotonicNow(),
-                    status.assignedBlocks, itemInfo.getRetryCount()));
-                break;
-              case NO_BLOCKS_TARGETS_PAIRED:
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID " + trackId
-                      + " back to retry queue as none of the blocks"
-                      + " found its eligible targets.");
-                }
-                itemInfo.retryCount++;
-                this.storageMovementNeeded.add(itemInfo);
-                break;
-              case FEW_LOW_REDUNDANCY_BLOCKS:
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID " + trackId
-                      + " back to retry queue as some of the blocks"
-                      + " are low redundant.");
-                }
-                this.storageMovementNeeded.add(itemInfo);
-                break;
-              // Just clean Xattrs
-              case BLOCKS_TARGET_PAIRING_SKIPPED:
-              case BLOCKS_ALREADY_SATISFIED:
-              default:
-                LOG.info("Block analysis skipped or blocks already satisfied"
-                    + " with storages. So, Cleaning up the Xattrs.");
-                storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
-                break;
-              }
-            }
-          }
-        }
-        int numLiveDn = namesystem.getFSDirectory().getBlockManager()
-            .getDatanodeManager().getNumLiveDataNodes();
-        if (storageMovementNeeded.size() == 0
-            || blockCount > (numLiveDn * spsWorkMultiplier)) {
-          Thread.sleep(3000);
-          blockCount = 0L;
-        }
-      } catch (Throwable t) {
-        handleException(t);
-      }
-    }
-  }
-
-  private void handleException(Throwable t) {
-    // double check to avoid entering into synchronized block.
-    if (isRunning) {
-      synchronized (this) {
-        if (isRunning) {
-          isRunning = false;
-          // Stopping monitor thread and clearing queues as well
-          this.clearQueues();
-          this.storageMovementsMonitor.stopGracefully();
-          if (!namesystem.isRunning()) {
-            LOG.info("Stopping StoragePolicySatisfier.");
-            if (!(t instanceof InterruptedException)) {
-              LOG.info("StoragePolicySatisfier received an exception"
-                  + " while shutting down.", t);
-            }
-            return;
-          }
-        }
-      }
-    }
-    LOG.error("StoragePolicySatisfier thread received runtime exception. "
-        + "Stopping Storage policy satisfier work", t);
-    return;
-  }
-
-  private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
-      BlockCollection blockCollection) {
-    BlocksMovingAnalysis.Status status =
-        BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
-    byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
-    BlockStoragePolicy existingStoragePolicy =
-        blockManager.getStoragePolicy(existingStoragePolicyID);
-    if (!blockCollection.getLastBlock().isComplete()) {
-      // Postpone, currently file is under construction
-      // So, should we add back? or leave it to user
-      LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
-          + " this to the next retry iteration", blockCollection.getId());
-      return new BlocksMovingAnalysis(
-          BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
-          new ArrayList<>());
-    }
-
-    BlockInfo[] blocks = blockCollection.getBlocks();
-    if (blocks.length == 0) {
-      LOG.info("BlockCollectionID: {} file is not having any blocks."
-          + " So, skipping the analysis.", blockCollection.getId());
-      return new BlocksMovingAnalysis(
-          BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
-          new ArrayList<>());
-    }
-    List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
-
-    for (int i = 0; i < blocks.length; i++) {
-      BlockInfo blockInfo = blocks[i];
-      List<StorageType> expectedStorageTypes;
-      if (blockInfo.isStriped()) {
-        if (ErasureCodingPolicyManager
-            .checkStoragePolicySuitableForECStripedMode(
-                existingStoragePolicyID)) {
-          expectedStorageTypes = existingStoragePolicy
-              .chooseStorageTypes((short) blockInfo.getCapacity());
-        } else {
-          // Currently we support only limited policies (HOT, COLD, ALLSSD)
-          // for EC striped mode files. SPS will ignore to move the blocks if
-          // the storage policy is not in EC Striped mode supported policies
-          LOG.warn("The storage policy " + existingStoragePolicy.getName()
-              + " is not suitable for Striped EC files. "
-              + "So, ignoring to move the blocks");
-          return new BlocksMovingAnalysis(
-              BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
-              new ArrayList<>());
-        }
-      } else {
-        expectedStorageTypes = existingStoragePolicy
-            .chooseStorageTypes(blockInfo.getReplication());
-      }
-
-      DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
-      StorageType[] storageTypes = new StorageType[storages.length];
-      for (int j = 0; j < storages.length; j++) {
-        DatanodeStorageInfo datanodeStorageInfo = storages[j];
-        StorageType storageType = datanodeStorageInfo.getStorageType();
-        storageTypes[j] = storageType;
-      }
-      List<StorageType> existing =
-          new LinkedList<StorageType>(Arrays.asList(storageTypes));
-      if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
-          existing, true)) {
-        boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
-            blockInfo, expectedStorageTypes, existing, storages);
-        if (blocksPaired) {
-          status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
-        } else {
-          // none of the blocks found its eligible targets for satisfying the
-          // storage policy.
-          status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
-        }
-      } else {
-        if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
-          status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
-        }
-      }
-    }
-
-    List<Block> assignedBlockIds = new ArrayList<Block>();
-    for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      // Check for at least one block storage movement has been chosen
-      if (blkMovingInfo.getTarget() != null) {
-        // assign block storage movement task to the target node
-        ((DatanodeDescriptor) blkMovingInfo.getTarget())
-            .addBlocksToMoveStorage(blkMovingInfo);
-        LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
-        assignedBlockIds.add(blkMovingInfo.getBlock());
-        blockCount++;
-      }
-    }
-    return new BlocksMovingAnalysis(status, assignedBlockIds);
-  }
-
-  /**
-   * Compute the list of block moving information corresponding to the given
-   * blockId. This will check that each block location of the given block is
-   * satisfying the expected storage policy. If block location is not satisfied
-   * the policy then find out the target node with the expected storage type to
-   * satisfy the storage policy.
-   *
-   * @param blockMovingInfos
-   *          - list of block source and target node pair
-   * @param blockInfo
-   *          - block details
-   * @param expectedStorageTypes
-   *          - list of expected storage type to satisfy the storage policy
-   * @param existing
-   *          - list to get existing storage types
-   * @param storages
-   *          - available storages
-   * @return false if some of the block locations failed to find target node to
-   *         satisfy the storage policy, true otherwise
-   */
-  private boolean computeBlockMovingInfos(
-      List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
-      List<StorageType> expectedStorageTypes, List<StorageType> existing,
-      DatanodeStorageInfo[] storages) {
-    boolean foundMatchingTargetNodesForBlock = true;
-    if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
-        existing, true)) {
-      List<StorageTypeNodePair> sourceWithStorageMap =
-          new ArrayList<StorageTypeNodePair>();
-      List<DatanodeStorageInfo> existingBlockStorages =
-          new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
-      // if expected type exists in source node already, local movement would be
-      // possible, so lets find such sources first.
-      Iterator<DatanodeStorageInfo> iterator = existingBlockStorages.iterator();
-      while (iterator.hasNext()) {
-        DatanodeStorageInfo datanodeStorageInfo = iterator.next();
-        if (checkSourceAndTargetTypeExists(
-            datanodeStorageInfo.getDatanodeDescriptor(), existing,
-            expectedStorageTypes)) {
-          sourceWithStorageMap
-              .add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(),
-                  datanodeStorageInfo.getDatanodeDescriptor()));
-          iterator.remove();
-          existing.remove(datanodeStorageInfo.getStorageType());
-        }
-      }
-
-      // Let's find sources for existing types left.
-      for (StorageType existingType : existing) {
-        iterator = existingBlockStorages.iterator();
-        while (iterator.hasNext()) {
-          DatanodeStorageInfo datanodeStorageInfo = iterator.next();
-          StorageType storageType = datanodeStorageInfo.getStorageType();
-          if (storageType == existingType) {
-            iterator.remove();
-            sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
-                datanodeStorageInfo.getDatanodeDescriptor()));
-            break;
-          }
-        }
-      }
-
-      StorageTypeNodeMap locsForExpectedStorageTypes =
-          findTargetsForExpectedStorageTypes(expectedStorageTypes);
-
-      foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
-          blockMovingInfos, blockInfo, sourceWithStorageMap,
-          expectedStorageTypes, locsForExpectedStorageTypes);
-    }
-    return foundMatchingTargetNodesForBlock;
-  }
-
-  /**
-   * Find the good target node for each source node for which block storages was
-   * misplaced.
-   *
-   * @param blockMovingInfos
-   *          - list of block source and target node pair
-   * @param blockInfo
-   *          - Block
-   * @param sourceWithStorageList
-   *          - Source Datanode with storages list
-   * @param expected
-   *          - Expecting storages to move
-   * @param locsForExpectedStorageTypes
-   *          - Available DNs for expected storage types
-   * @return false if some of the block locations failed to find target node to
-   *         satisfy the storage policy
-   */
-  private boolean findSourceAndTargetToMove(
-      List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
-      List<StorageTypeNodePair> sourceWithStorageList,
-      List<StorageType> expected,
-      StorageTypeNodeMap locsForExpectedStorageTypes) {
-    boolean foundMatchingTargetNodesForBlock = true;
-    List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
-
-    // Looping over all the source node locations and choose the target
-    // storage within same node if possible. This is done separately to
-    // avoid choosing a target which already has this block.
-    for (int i = 0; i < sourceWithStorageList.size(); i++) {
-      StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
-
-      // Check whether the block replica is already placed in the expected
-      // storage type in this source datanode.
-      if (!expected.contains(existingTypeNodePair.storageType)) {
-        StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
-            blockInfo, existingTypeNodePair.dn, expected);
-        if (chosenTarget != null) {
-          if (blockInfo.isStriped()) {
-            buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
-                existingTypeNodePair.storageType, chosenTarget.dn,
-                chosenTarget.storageType, blockMovingInfos);
-          } else {
-            buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
-                existingTypeNodePair.storageType, chosenTarget.dn,
-                chosenTarget.storageType, blockMovingInfos);
-          }
-          expected.remove(chosenTarget.storageType);
-          // TODO: We can increment scheduled block count for this node?
-        }
-      }
-      // To avoid choosing this excludeNodes as targets later
-      excludeNodes.add(existingTypeNodePair.dn);
-    }
-
-    // Looping over all the source node locations. Choose a remote target
-    // storage node if it was not found out within same node.
-    for (int i = 0; i < sourceWithStorageList.size(); i++) {
-      StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
-      StorageTypeNodePair chosenTarget = null;
-      // Chosen the target storage within same datanode. So just skipping this
-      // source node.
-      if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
-        continue;
-      }
-      if (chosenTarget == null && blockManager.getDatanodeManager()
-          .getNetworkTopology().isNodeGroupAware()) {
-        chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
-            expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
-            excludeNodes);
-      }
-
-      // Then, match nodes on the same rack
-      if (chosenTarget == null) {
-        chosenTarget =
-            chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
-                Matcher.SAME_RACK, locsForExpectedStorageTypes, excludeNodes);
-      }
-
-      if (chosenTarget == null) {
-        chosenTarget =
-            chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
-                Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes);
-      }
-      if (null != chosenTarget) {
-        if (blockInfo.isStriped()) {
-          buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
-              existingTypeNodePair.storageType, chosenTarget.dn,
-              chosenTarget.storageType, blockMovingInfos);
-        } else {
-          buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
-              existingTypeNodePair.storageType, chosenTarget.dn,
-              chosenTarget.storageType, blockMovingInfos);
-        }
-
-        expected.remove(chosenTarget.storageType);
-        excludeNodes.add(chosenTarget.dn);
-        // TODO: We can increment scheduled block count for this node?
-      } else {
-        LOG.warn(
-            "Failed to choose target datanode for the required"
-                + " storage types {}, block:{}, existing storage type:{}",
-            expected, blockInfo, existingTypeNodePair.storageType);
-      }
-    }
-
-    if (expected.size() > 0) {
-      foundMatchingTargetNodesForBlock = false;
-    }
-
-    return foundMatchingTargetNodesForBlock;
-  }
-
-  private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
-      DatanodeDescriptor dn) {
-    for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
-      if (blockMovingInfo.getSource().equals(dn)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
-      DatanodeInfo sourceNode, StorageType sourceStorageType,
-      DatanodeInfo targetNode, StorageType targetStorageType,
-      List<BlockMovingInfo> blkMovingInfos) {
-    Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
-        blockInfo.getGenerationStamp());
-    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
-        targetNode, sourceStorageType, targetStorageType);
-    blkMovingInfos.add(blkMovingInfo);
-  }
-
-  private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
-      DatanodeInfo sourceNode, StorageType sourceStorageType,
-      DatanodeInfo targetNode, StorageType targetStorageType,
-      List<BlockMovingInfo> blkMovingInfos) {
-    // For a striped block, it needs to construct internal block at the given
-    // index of a block group. Here it is iterating over all the block indices
-    // and construct internal blocks which can be then considered for block
-    // movement.
-    BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo;
-    for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
-      if (si.getBlockIndex() >= 0) {
-        DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
-        if (sourceNode.equals(dn)) {
-          // construct internal block
-          long blockId = blockInfo.getBlockId() + si.getBlockIndex();
-          long numBytes = StripedBlockUtil.getInternalBlockLength(
-              sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
-              sBlockInfo.getDataBlockNum(), si.getBlockIndex());
-          Block blk = new Block(blockId, numBytes,
-              blockInfo.getGenerationStamp());
-          BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
-              targetNode, sourceStorageType, targetStorageType);
-          blkMovingInfos.add(blkMovingInfo);
-        }
-      }
-    }
-  }
-
-  /**
-   * Choose the target storage within same datanode if possible.
-   *
-   * @param block
-   *          - block info
-   * @param source
-   *          - source datanode
-   * @param targetTypes
-   *          - list of target storage types
-   */
-  private StorageTypeNodePair chooseTargetTypeInSameNode(Block block,
-      DatanodeDescriptor source, List<StorageType> targetTypes) {
-    for (StorageType t : targetTypes) {
-      DatanodeStorageInfo chooseStorage4Block =
-          source.chooseStorage4Block(t, block.getNumBytes());
-      if (chooseStorage4Block != null) {
-        return new StorageTypeNodePair(t, source);
-      }
-    }
-    return null;
-  }
-
-  private StorageTypeNodePair chooseTarget(Block block,
-      DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
-      StorageTypeNodeMap locsForExpectedStorageTypes,
-      List<DatanodeDescriptor> excludeNodes) {
-    for (StorageType t : targetTypes) {
-      List<DatanodeDescriptor> nodesWithStorages =
-          locsForExpectedStorageTypes.getNodesWithStorages(t);
-      if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
-        continue; // no target nodes with the required storage type.
-      }
-      Collections.shuffle(nodesWithStorages);
-      for (DatanodeDescriptor target : nodesWithStorages) {
-        if (!excludeNodes.contains(target) && matcher.match(
-            blockManager.getDatanodeManager().getNetworkTopology(), source,
-            target)) {
-          if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
-            return new StorageTypeNodePair(t, target);
-          }
-        }
-      }
-    }
-    return null;
-  }
-
-  private static class StorageTypeNodePair {
-    private StorageType storageType = null;
-    private DatanodeDescriptor dn = null;
-
-    StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) {
-      this.storageType = storageType;
-      this.dn = dn;
-    }
-  }
-
-  private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
-      List<StorageType> expected) {
-    StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
-    List<DatanodeDescriptor> reports = blockManager.getDatanodeManager()
-        .getDatanodeListForReport(DatanodeReportType.LIVE);
-    for (DatanodeDescriptor dn : reports) {
-      StorageReport[] storageReports = dn.getStorageReports();
-      for (StorageReport storageReport : storageReports) {
-        StorageType t = storageReport.getStorage().getStorageType();
-        if (expected.contains(t)) {
-          final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
-          if (maxRemaining > 0L) {
-            targetMap.add(t, dn);
-          }
-        }
-      }
-    }
-    return targetMap;
-  }
-
-  private static long getMaxRemaining(StorageReport[] storageReports,
-      StorageType t) {
-    long max = 0L;
-    for (StorageReport r : storageReports) {
-      if (r.getStorage().getStorageType() == t) {
-        if (r.getRemaining() > max) {
-          max = r.getRemaining();
-        }
-      }
-    }
-    return max;
-  }
-
-  private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn,
-      List<StorageType> existing, List<StorageType> expectedStorageTypes) {
-    DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos();
-    boolean isExpectedTypeAvailable = false;
-    boolean isExistingTypeAvailable = false;
-    for (DatanodeStorageInfo dnInfo : allDNStorageInfos) {
-      StorageType storageType = dnInfo.getStorageType();
-      if (existing.contains(storageType)) {
-        isExistingTypeAvailable = true;
-      }
-      if (expectedStorageTypes.contains(storageType)) {
-        isExpectedTypeAvailable = true;
-      }
-    }
-    return isExistingTypeAvailable && isExpectedTypeAvailable;
-  }
-
-  private static class StorageTypeNodeMap {
-    private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
-        new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
-
-    private void add(StorageType t, DatanodeDescriptor dn) {
-      List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t);
-      LinkedList<DatanodeDescriptor> value = null;
-      if (nodesWithStorages == null) {
-        value = new LinkedList<DatanodeDescriptor>();
-        value.add(dn);
-        typeNodeMap.put(t, value);
-      } else {
-        nodesWithStorages.add(dn);
-      }
-    }
-
-    /**
-     * @param type
-     *          - Storage type
-     * @return datanodes which has the given storage type
-     */
-    private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
-      return typeNodeMap.get(type);
-    }
-  }
-
-  /**
-   * Receives set of storage movement attempt finished blocks report.
-   *
-   * @param moveAttemptFinishedBlks
-   *          set of storage movement attempt finished blocks.
-   */
-  void handleStorageMovementAttemptFinishedBlks(
-      BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
-    if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
-      return;
-    }
-    storageMovementsMonitor
-        .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks());
-  }
-
-  @VisibleForTesting
-  BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
-    return storageMovementsMonitor;
-  }
-
-  /**
-   * Clear the queues from to be storage movement needed lists and items tracked
-   * in storage movement monitor.
-   */
-  public void clearQueues() {
-    LOG.warn("Clearing all the queues from StoragePolicySatisfier. So, "
-        + "user requests on satisfying block storages would be discarded.");
-    storageMovementNeeded.clearAll();
-  }
-
-  /**
-   * Set file inode in queue for which storage movement needed for its blocks.
-   *
-   * @param inodeId
-   *          - file inode/blockcollection id.
-   */
-  public void satisfyStoragePolicy(Long inodeId) {
-    //For file startId and trackId is same
-    storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Added track info for inode {} to block "
-          + "storageMovementNeeded queue", inodeId);
-    }
-  }
-
-  public void addInodeToPendingDirQueue(long id) {
-    storageMovementNeeded.addToPendingDirQueue(id);
-  }
-
-  /**
-   * Clear queues for given track id.
-   */
-  public void clearQueue(long trackId) {
-    storageMovementNeeded.clearQueue(trackId);
-  }
-
-  /**
-   * ItemInfo is a file info object for which need to satisfy the
-   * policy.
-   */
-  public static class ItemInfo {
-    private long startId;
-    private long trackId;
-    private int retryCount;
-
-    public ItemInfo(long startId, long trackId) {
-      this.startId = startId;
-      this.trackId = trackId;
-      //set 0 when item is getting added first time in queue.
-      this.retryCount = 0;
-    }
-
-    public ItemInfo(long startId, long trackId, int retryCount) {
-      this.startId = startId;
-      this.trackId = trackId;
-      this.retryCount = retryCount;
-    }
-
-    /**
-     * Return the start inode id of the current track Id.
-     */
-    public long getStartId() {
-      return startId;
-    }
-
-    /**
-     * Return the File inode Id for which needs to satisfy the policy.
-     */
-    public long getTrackId() {
-      return trackId;
-    }
-
-    /**
-     * Returns true if the tracking path is a directory, false otherwise.
-     */
-    public boolean isDir() {
-      return (startId != trackId);
-    }
-
-    /**
-     * Get the attempted retry count of the block for satisfy the policy.
-     */
-    public int getRetryCount() {
-      return retryCount;
-    }
-  }
-
-  /**
-   * This class contains information of an attempted blocks and its last
-   * attempted or reported time stamp. This is used by
-   * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
-   */
-  final static class AttemptedItemInfo extends ItemInfo {
-    private long lastAttemptedOrReportedTime;
-    private final List<Block> blocks;
-
-    /**
-     * AttemptedItemInfo constructor.
-     *
-     * @param rootId
-     *          rootId for trackId
-     * @param trackId
-     *          trackId for file.
-     * @param lastAttemptedOrReportedTime
-     *          last attempted or reported time
-     */
-    AttemptedItemInfo(long rootId, long trackId,
-        long lastAttemptedOrReportedTime,
-        List<Block> blocks, int retryCount) {
-      super(rootId, trackId, retryCount);
-      this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
-      this.blocks = blocks;
-    }
-
-    /**
-     * @return last attempted or reported time stamp.
-     */
-    long getLastAttemptedOrReportedTime() {
-      return lastAttemptedOrReportedTime;
-    }
-
-    /**
-     * Update lastAttemptedOrReportedTime, so that the expiration time will be
-     * postponed to future.
-     */
-    void touchLastReportedTimeStamp() {
-      this.lastAttemptedOrReportedTime = monotonicNow();
-    }
-
-    List<Block> getBlocks() {
-      return this.blocks;
-    }
-
-  }
-
-  public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
-      String path) throws IOException {
-    INode inode = namesystem.getFSDirectory().getINode(path);
-    return storageMovementNeeded.getStatus(inode.getId());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
new file mode 100644
index 0000000..b044f30
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A monitor class for checking whether block storage movements attempt
+ * completed or not. If this receives block storage movement attempt
+ * status(either success or failure) from DN then it will just remove the
+ * entries from tracking. If there is no DN reports about movement attempt
+ * finished for a longer time period, then such items will retries automatically
+ * after timeout. The default timeout would be 5 minutes.
+ */
+public class BlockStorageMovementAttemptedItems {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
+
+  /**
+   * A map holds the items which are already taken for blocks movements
+   * processing and sent to DNs.
+   */
+  private final List<AttemptedItemInfo> storageMovementAttemptedItems;
+  private final List<Block> movementFinishedBlocks;
+  private volatile boolean monitorRunning = true;
+  private Daemon timerThread = null;
+  //
+  // It might take anywhere between 5 to 10 minutes before
+  // a request is timed out.
+  //
+  private long selfRetryTimeout = 5 * 60 * 1000;
+
+  //
+  // It might take anywhere between 1 to 2 minutes before
+  // a request is timed out.
+  //
+  private long minCheckTimeout = 1 * 60 * 1000; // minimum value
+  private BlockStorageMovementNeeded blockStorageMovementNeeded;
+
+  public BlockStorageMovementAttemptedItems(long recheckTimeout,
+      long selfRetryTimeout,
+      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+    if (recheckTimeout > 0) {
+      this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
+    }
+
+    this.selfRetryTimeout = selfRetryTimeout;
+    this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
+    storageMovementAttemptedItems = new ArrayList<>();
+    movementFinishedBlocks = new ArrayList<>();
+  }
+
+  /**
+   * Add item to block storage movement attempted items map which holds the
+   * tracking/blockCollection id versus time stamp.
+   *
+   * @param itemInfo
+   *          - tracking info
+   */
+  public void add(AttemptedItemInfo itemInfo) {
+    synchronized (storageMovementAttemptedItems) {
+      storageMovementAttemptedItems.add(itemInfo);
+    }
+  }
+
+  /**
+   * Add the storage movement attempt finished blocks to
+   * storageMovementFinishedBlocks.
+   *
+   * @param moveAttemptFinishedBlks
+   *          storage movement attempt finished blocks
+   */
+  public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
+    if (moveAttemptFinishedBlks.length == 0) {
+      return;
+    }
+    synchronized (movementFinishedBlocks) {
+      movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
+    }
+  }
+
+  /**
+   * Starts the monitor thread.
+   */
+  public synchronized void start() {
+    monitorRunning = true;
+    timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor());
+    timerThread.setName("BlocksStorageMovementAttemptMonitor");
+    timerThread.start();
+  }
+
+  /**
+   * Sets running flag to false. Also, this will interrupt monitor thread and
+   * clear all the queued up tasks.
+   */
+  public synchronized void stop() {
+    monitorRunning = false;
+    if (timerThread != null) {
+      timerThread.interrupt();
+    }
+    this.clearQueues();
+  }
+
+  /**
+   * Timed wait to stop monitor thread.
+   */
+  synchronized void stopGracefully() {
+    if (timerThread == null) {
+      return;
+    }
+    if (monitorRunning) {
+      stop();
+    }
+    try {
+      timerThread.join(3000);
+    } catch (InterruptedException ie) {
+    }
+  }
+
+  /**
+   * A monitor class for checking block storage movement attempt status and long
+   * waiting items periodically.
+   */
+  private class BlocksStorageMovementAttemptMonitor implements Runnable {
+    @Override
+    public void run() {
+      while (monitorRunning) {
+        try {
+          blockStorageMovementReportedItemsCheck();
+          blocksStorageMovementUnReportedItemsCheck();
+          Thread.sleep(minCheckTimeout);
+        } catch (InterruptedException ie) {
+          LOG.info("BlocksStorageMovementAttemptMonitor thread "
+              + "is interrupted.", ie);
+        } catch (IOException ie) {
+          LOG.warn("BlocksStorageMovementAttemptMonitor thread "
+              + "received exception and exiting.", ie);
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void blocksStorageMovementUnReportedItemsCheck() {
+    synchronized (storageMovementAttemptedItems) {
+      Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
+          .iterator();
+      long now = monotonicNow();
+      while (iter.hasNext()) {
+        AttemptedItemInfo itemInfo = iter.next();
+        if (now > itemInfo.getLastAttemptedOrReportedTime()
+            + selfRetryTimeout) {
+          Long blockCollectionID = itemInfo.getTrackId();
+          synchronized (movementFinishedBlocks) {
+            ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
+                blockCollectionID, itemInfo.getRetryCount() + 1);
+            blockStorageMovementNeeded.add(candidate);
+            iter.remove();
+            LOG.info("TrackID: {} becomes timed out and moved to needed "
+                + "retries queue for next iteration.", blockCollectionID);
+          }
+        }
+      }
+
+    }
+  }
+
+  @VisibleForTesting
+  void blockStorageMovementReportedItemsCheck() throws IOException {
+    synchronized (movementFinishedBlocks) {
+      Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
+      while (finishedBlksIter.hasNext()) {
+        Block blk = finishedBlksIter.next();
+        synchronized (storageMovementAttemptedItems) {
+          Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
+              .iterator();
+          while (iterator.hasNext()) {
+            AttemptedItemInfo attemptedItemInfo = iterator.next();
+            attemptedItemInfo.getBlocks().remove(blk);
+            if (attemptedItemInfo.getBlocks().isEmpty()) {
+              // TODO: try add this at front of the Queue, so that this element
+              // gets the chance first and can be cleaned from queue quickly as
+              // all movements already done.
+              blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
+                  .getStartId(), attemptedItemInfo.getTrackId(),
+                  attemptedItemInfo.getRetryCount() + 1));
+              iterator.remove();
+            }
+          }
+        }
+        // Remove attempted blocks from movementFinishedBlocks list.
+        finishedBlksIter.remove();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public int getMovementFinishedBlocksCount() {
+    return movementFinishedBlocks.size();
+  }
+
+  @VisibleForTesting
+  public int getAttemptedItemsCount() {
+    return storageMovementAttemptedItems.size();
+  }
+
+  public void clearQueues() {
+    movementFinishedBlocks.clear();
+    storageMovementAttemptedItems.clear();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org