You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2016/12/29 07:22:14 UTC

hadoop git commit: HDFS-11248: [SPS]: Handle partial block location movements. Contributed by Rakesh R

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-10285 c5876ee04 -> 57193f703


HDFS-11248: [SPS]: Handle partial block location movements. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/57193f70
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/57193f70
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/57193f70

Branch: refs/heads/HDFS-10285
Commit: 57193f7032c4ed771383063052df287b031cf434
Parents: c5876ee
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Wed Dec 28 23:21:07 2016 -0800
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Wed Dec 28 23:21:07 2016 -0800

----------------------------------------------------------------------
 .../datanode/BlockStorageMovementTracker.java   |  15 --
 .../datanode/StoragePolicySatisfyWorker.java    |  15 +-
 .../BlockStorageMovementAttemptedItems.java     | 206 +++++++++++++-----
 .../server/namenode/StoragePolicySatisfier.java | 215 +++++++++++++------
 .../TestBlockStorageMovementAttemptedItems.java | 101 ++++++++-
 .../namenode/TestStoragePolicySatisfier.java    |  63 +++++-
 6 files changed, 454 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index 2de88fc..bd35b09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -28,7 +28,6 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -109,20 +108,6 @@ public class BlockStorageMovementTracker implements Runnable {
     }
   }
 
-  /**
-   * Mark as block movement failure for the given trackId and blockId.
-   *
-   * @param trackId tracking id
-   * @param blockId block id
-   */
-  void markBlockMovementFailure(long trackId, long blockId) {
-    LOG.debug("Mark as block movement failure for the given "
-        + "trackId:{} and blockId:{}", trackId, blockId);
-    BlockMovementResult result = new BlockMovementResult(trackId, blockId, null,
-        BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE);
-    addMovementResultToTrackIdList(result);
-  }
-
   private List<BlockMovementResult> addMovementResultToTrackIdList(
       BlockMovementResult result) {
     long trackId = result.getTrackId();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index ded010e..1648d8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -154,18 +154,9 @@ public class StoragePolicySatisfyWorker {
       Collection<BlockMovingInfo> blockMovingInfos) {
     LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      // Iterating backwards. This is to ensure that all the block src location
-      // which doesn't have a target node will be marked as failure before
-      // scheduling the block movement to valid target nodes.
-      for (int i = blkMovingInfo.getSources().length - 1; i >= 0; i--) {
-        if (i >= blkMovingInfo.getTargets().length) {
-          // Since there is no target selected for scheduling the block,
-          // just mark this block storage movement as failure. Later, namenode
-          // can take action on this.
-          movementTracker.markBlockMovementFailure(trackID,
-              blkMovingInfo.getBlock().getBlockId());
-          continue;
-        }
+      assert blkMovingInfo.getSources().length == blkMovingInfo
+          .getTargets().length;
+      for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
         DatanodeInfo target = blkMovingInfo.getTargets()[i];
         BlockMovingTask blockMovingTask = new BlockMovingTask(
             trackID, blockPoolID, blkMovingInfo.getBlock(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index bb26082..ce97075 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -43,11 +43,14 @@ import com.google.common.annotations.VisibleForTesting;
  * automatically after timeout. The default timeout would be 30mins.
  */
 public class BlockStorageMovementAttemptedItems {
-  public static final Logger LOG =
+  private static final Logger LOG =
       LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
-  // A map holds the items which are already taken for blocks movements
-  // processing and sent to DNs.
-  private final Map<Long, Long> storageMovementAttemptedItems;
+
+  /**
+   * A map holds the items which are already taken for blocks movements
+   * processing and sent to DNs.
+   */
+  private final Map<Long, ItemInfo> storageMovementAttemptedItems;
   private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
@@ -83,10 +86,16 @@ public class BlockStorageMovementAttemptedItems {
    *
    * @param blockCollectionID
    *          - tracking id / block collection id
+   * @param allBlockLocsAttemptedToSatisfy
+   *          - failed to find matching target nodes to satisfy storage type for
+   *          all the block locations of the given blockCollectionID
    */
-  public void add(Long blockCollectionID) {
+  public void add(Long blockCollectionID,
+      boolean allBlockLocsAttemptedToSatisfy) {
     synchronized (storageMovementAttemptedItems) {
-      storageMovementAttemptedItems.put(blockCollectionID, monotonicNow());
+      ItemInfo itemInfo = new ItemInfo(monotonicNow(),
+          allBlockLocsAttemptedToSatisfy);
+      storageMovementAttemptedItems.put(blockCollectionID, itemInfo);
     }
   }
 
@@ -121,15 +130,62 @@ public class BlockStorageMovementAttemptedItems {
    */
   public synchronized void stop() {
     monitorRunning = false;
-    timerThread.interrupt();
-    try {
-      timerThread.join(3000);
-    } catch (InterruptedException ie) {
+    if (timerThread != null) {
+      timerThread.interrupt();
+      try {
+        timerThread.join(3000);
+      } catch (InterruptedException ie) {
+      }
     }
     this.clearQueues();
   }
 
   /**
+   * This class contains information of an attempted trackID. Information such
+   * as, (a)last attempted time stamp, (b)whether all the blocks in the trackID
+   * were attempted and blocks movement has been scheduled to satisfy storage
+   * policy. This is used by
+   * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
+   */
+  private final static class ItemInfo {
+    private final long lastAttemptedTimeStamp;
+    private final boolean allBlockLocsAttemptedToSatisfy;
+
+    /**
+     * ItemInfo constructor.
+     *
+     * @param lastAttemptedTimeStamp
+     *          last attempted time stamp
+     * @param allBlockLocsAttemptedToSatisfy
+     *          whether all the blocks in the trackID were attempted and blocks
+     *          movement has been scheduled to satisfy storage policy
+     */
+    private ItemInfo(long lastAttemptedTimeStamp,
+        boolean allBlockLocsAttemptedToSatisfy) {
+      this.lastAttemptedTimeStamp = lastAttemptedTimeStamp;
+      this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
+    }
+
+    /**
+     * @return last attempted time stamp.
+     */
+    private long getLastAttemptedTimeStamp() {
+      return lastAttemptedTimeStamp;
+    }
+
+    /**
+     * @return true/false. True value represents that, all the block locations
+     *         under the trackID has found matching target nodes to satisfy
+     *         storage policy. False value represents that, trackID needed
+     *         retries to satisfy the storage policy for some of the block
+     *         locations.
+     */
+    private boolean isAllBlockLocsAttemptedToSatisfy() {
+      return allBlockLocsAttemptedToSatisfy;
+    }
+  }
+
+  /**
    * A monitor class for checking block storage movement result and long waiting
    * items periodically.
    */
@@ -147,76 +203,108 @@ public class BlockStorageMovementAttemptedItems {
         }
       }
     }
+  }
 
-    private void blocksStorageMovementUnReportedItemsCheck() {
-      synchronized (storageMovementAttemptedItems) {
-        Iterator<Entry<Long, Long>> iter =
-            storageMovementAttemptedItems.entrySet().iterator();
-        long now = monotonicNow();
-        while (iter.hasNext()) {
-          Entry<Long, Long> entry = iter.next();
-          if (now > entry.getValue() + selfRetryTimeout) {
-            Long blockCollectionID = entry.getKey();
-            synchronized (storageMovementAttemptedResults) {
-              boolean exist = isExistInResult(blockCollectionID);
-              if (!exist) {
-                blockStorageMovementNeeded.add(blockCollectionID);
-              } else {
-                LOG.info("Blocks storage movement results for the"
-                    + " tracking id : " + blockCollectionID
-                    + " is reported from one of the co-ordinating datanode."
-                    + " So, the result will be processed soon.");
-              }
+  @VisibleForTesting
+  void blocksStorageMovementUnReportedItemsCheck() {
+    synchronized (storageMovementAttemptedItems) {
+      Iterator<Entry<Long, ItemInfo>> iter = storageMovementAttemptedItems
+          .entrySet().iterator();
+      long now = monotonicNow();
+      while (iter.hasNext()) {
+        Entry<Long, ItemInfo> entry = iter.next();
+        ItemInfo itemInfo = entry.getValue();
+        if (now > itemInfo.getLastAttemptedTimeStamp() + selfRetryTimeout) {
+          Long blockCollectionID = entry.getKey();
+          synchronized (storageMovementAttemptedResults) {
+            if (!isExistInResult(blockCollectionID)) {
+              blockStorageMovementNeeded.add(blockCollectionID);
               iter.remove();
+              LOG.info("TrackID: {} becomes timed out and moved to needed "
+                  + "retries queue for next iteration.", blockCollectionID);
+            } else {
+              LOG.info("Blocks storage movement results for the"
+                  + " tracking id : " + blockCollectionID
+                  + " is reported from one of the co-ordinating datanode."
+                  + " So, the result will be processed soon.");
             }
           }
         }
-
       }
+
     }
+  }
 
-    private boolean isExistInResult(Long blockCollectionID) {
-      Iterator<BlocksStorageMovementResult> iter =
-          storageMovementAttemptedResults.iterator();
-      while (iter.hasNext()) {
-        BlocksStorageMovementResult storageMovementAttemptedResult =
-            iter.next();
-        if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
-          return true;
-        }
+  private boolean isExistInResult(Long blockCollectionID) {
+    Iterator<BlocksStorageMovementResult> iter = storageMovementAttemptedResults
+        .iterator();
+    while (iter.hasNext()) {
+      BlocksStorageMovementResult storageMovementAttemptedResult = iter.next();
+      if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
+        return true;
       }
-      return false;
     }
+    return false;
+  }
 
-    private void blockStorageMovementResultCheck() {
-      synchronized (storageMovementAttemptedResults) {
-        Iterator<BlocksStorageMovementResult> iter =
-            storageMovementAttemptedResults.iterator();
-        while (iter.hasNext()) {
-          BlocksStorageMovementResult storageMovementAttemptedResult =
-              iter.next();
+  @VisibleForTesting
+  void blockStorageMovementResultCheck() {
+    synchronized (storageMovementAttemptedResults) {
+      Iterator<BlocksStorageMovementResult> resultsIter =
+          storageMovementAttemptedResults.iterator();
+      while (resultsIter.hasNext()) {
+        // TrackID need to be retried in the following cases:
+        // 1) All or few scheduled block(s) movement has been failed.
+        // 2) All the scheduled block(s) movement has been succeeded but there
+        // are unscheduled block(s) movement in this trackID. Say, some of
+        // the blocks in the trackID couldn't finding any matching target node
+        // for scheduling block movement in previous SPS iteration.
+        BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter
+            .next();
+        synchronized (storageMovementAttemptedItems) {
           if (storageMovementAttemptedResult
               .getStatus() == BlocksStorageMovementResult.Status.FAILURE) {
             blockStorageMovementNeeded
                 .add(storageMovementAttemptedResult.getTrackId());
-            LOG.warn("Blocks storage movement results for the tracking id : "
-                + storageMovementAttemptedResult.getTrackId()
+            LOG.warn("Blocks storage movement results for the tracking id: {}"
                 + " is reported from co-ordinating datanode, but result"
-                + " status is FAILURE. So, added for retry");
+                + " status is FAILURE. So, added for retry",
+                storageMovementAttemptedResult.getTrackId());
           } else {
-            synchronized (storageMovementAttemptedItems) {
-              storageMovementAttemptedItems
-                  .remove(storageMovementAttemptedResult.getTrackId());
+            ItemInfo itemInfo = storageMovementAttemptedItems
+                .get(storageMovementAttemptedResult.getTrackId());
+
+            // ItemInfo could be null. One case is, before the blocks movements
+            // result arrives the attempted trackID became timed out and then
+            // removed the trackID from the storageMovementAttemptedItems list.
+            // TODO: Need to ensure that trackID is added to the
+            // 'blockStorageMovementNeeded' queue for retries to handle the
+            // following condition. If all the block locations under the trackID
+            // are attempted and failed to find matching target nodes to satisfy
+            // storage policy in previous SPS iteration.
+            if (itemInfo != null
+                && !itemInfo.isAllBlockLocsAttemptedToSatisfy()) {
+              blockStorageMovementNeeded
+                  .add(storageMovementAttemptedResult.getTrackId());
+              LOG.warn("Blocks storage movement is SUCCESS for the track id: {}"
+                  + " reported from co-ordinating datanode. But adding trackID"
+                  + " back to retry queue as some of the blocks couldn't find"
+                  + " matching target nodes in previous SPS iteration.",
+                  storageMovementAttemptedResult.getTrackId());
+            } else {
+              LOG.info("Blocks storage movement is SUCCESS for the track id: {}"
+                  + " reported from co-ordinating datanode. But the trackID "
+                  + "doesn't exists in storageMovementAttemptedItems list",
+                  storageMovementAttemptedResult.getTrackId());
             }
-            LOG.info("Blocks storage movement results for the tracking id : "
-                + storageMovementAttemptedResult.getTrackId()
-                + " is reported from co-ordinating datanode. "
-                + "The result status is SUCCESS.");
           }
-          iter.remove(); // remove from results as processed above
+          // Remove trackID from the attempted list, if any.
+          storageMovementAttemptedItems
+              .remove(storageMovementAttemptedResult.getTrackId());
         }
+        // Remove trackID from results as processed above.
+        resultsIter.remove();
       }
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 56a531f..26e0775 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -162,8 +162,15 @@ public class StoragePolicySatisfier implements Runnable {
       try {
         Long blockCollectionID = storageMovementNeeded.get();
         if (blockCollectionID != null) {
-          computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID);
-          this.storageMovementsMonitor.add(blockCollectionID);
+          BlockCollection blockCollection =
+              namesystem.getBlockCollection(blockCollectionID);
+          // Check blockCollectionId existence.
+          if (blockCollection != null) {
+            boolean allBlockLocsAttemptedToSatisfy =
+                computeAndAssignStorageMismatchedBlocksToDNs(blockCollection);
+            this.storageMovementsMonitor.add(blockCollectionID,
+                allBlockLocsAttemptedToSatisfy);
+          }
         }
         // TODO: We can think to make this as configurable later, how frequently
         // we want to check block movements.
@@ -192,20 +199,17 @@ public class StoragePolicySatisfier implements Runnable {
     }
   }
 
-  private void computeAndAssignStorageMismatchedBlocksToDNs(
-      long blockCollectionID) {
-    BlockCollection blockCollection =
-        namesystem.getBlockCollection(blockCollectionID);
-    if (blockCollection == null) {
-      return;
-    }
+  private boolean computeAndAssignStorageMismatchedBlocksToDNs(
+      BlockCollection blockCollection) {
     byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
     BlockStoragePolicy existingStoragePolicy =
         blockManager.getStoragePolicy(existingStoragePolicyID);
     if (!blockCollection.getLastBlock().isComplete()) {
       // Postpone, currently file is under construction
       // So, should we add back? or leave it to user
-      return;
+      LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
+          + " this to the next retry iteration", blockCollection.getId());
+      return true;
     }
 
     // First datanode will be chosen as the co-ordinator node for storage
@@ -213,61 +217,87 @@ public class StoragePolicySatisfier implements Runnable {
     DatanodeDescriptor coordinatorNode = null;
     BlockInfo[] blocks = blockCollection.getBlocks();
     List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
+
+    // True value represents that, SPS is able to find matching target nodes
+    // to satisfy storage type for all the blocks locations of the given
+    // blockCollection. A false value represents that, blockCollection needed
+    // retries to satisfy the storage policy for some of the block locations.
+    boolean foundMatchingTargetNodesForAllBlocks = true;
+
     for (int i = 0; i < blocks.length; i++) {
       BlockInfo blockInfo = blocks[i];
-      List<StorageType> expectedStorageTypes =
-          existingStoragePolicy.chooseStorageTypes(blockInfo.getReplication());
-      DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
-      StorageType[] storageTypes = new StorageType[storages.length];
-      for (int j = 0; j < storages.length; j++) {
-        DatanodeStorageInfo datanodeStorageInfo = storages[j];
-        StorageType storageType = datanodeStorageInfo.getStorageType();
-        storageTypes[j] = storageType;
-      }
-      List<StorageType> existing =
-          new LinkedList<StorageType>(Arrays.asList(storageTypes));
-      if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
-          existing, true)) {
-        List<StorageTypeNodePair> sourceWithStorageMap =
-            new ArrayList<StorageTypeNodePair>();
-        List<DatanodeStorageInfo> existingBlockStorages =
-            new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
-        for (StorageType existingType : existing) {
-          Iterator<DatanodeStorageInfo> iterator =
-              existingBlockStorages.iterator();
-          while (iterator.hasNext()) {
-            DatanodeStorageInfo datanodeStorageInfo = iterator.next();
-            StorageType storageType = datanodeStorageInfo.getStorageType();
-            if (storageType == existingType) {
-              iterator.remove();
-              sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
-                  datanodeStorageInfo.getDatanodeDescriptor()));
-              break;
-            }
-          }
-        }
+      List<StorageType> expectedStorageTypes = existingStoragePolicy
+            .chooseStorageTypes(blockInfo.getReplication());
+      foundMatchingTargetNodesForAllBlocks |= computeBlockMovingInfos(
+          blockMovingInfos, blockInfo, expectedStorageTypes);
+    }
 
-        StorageTypeNodeMap locsForExpectedStorageTypes =
-            findTargetsForExpectedStorageTypes(expectedStorageTypes);
-
-        BlockMovingInfo blockMovingInfo =
-            findSourceAndTargetToMove(blockInfo, existing, sourceWithStorageMap,
-                expectedStorageTypes, locsForExpectedStorageTypes);
-        if (coordinatorNode == null) {
-          // For now, first datanode will be chosen as the co-ordinator. Later
-          // this can be optimized if needed.
-          coordinatorNode =
-              (DatanodeDescriptor) blockMovingInfo.getSources()[0];
+    assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
+        blockMovingInfos, coordinatorNode);
+    return foundMatchingTargetNodesForAllBlocks;
+  }
+
+  /**
+   * Compute the list of block moving information corresponding to the given
+   * blockId. This will check that each block location of the given block is
+   * satisfying the expected storage policy. If block location is not satisfied
+   * the policy then find out the target node with the expected storage type to
+   * satisfy the storage policy.
+   *
+   * @param blockMovingInfos
+   *          - list of block source and target node pair
+   * @param blockInfo
+   *          - block details
+   * @param expectedStorageTypes
+   *          - list of expected storage type to satisfy the storage policy
+   * @return false if some of the block locations failed to find target node to
+   *         satisfy the storage policy, true otherwise
+   */
+  private boolean computeBlockMovingInfos(
+      List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
+      List<StorageType> expectedStorageTypes) {
+    boolean foundMatchingTargetNodesForBlock = true;
+    DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
+    StorageType[] storageTypes = new StorageType[storages.length];
+    for (int j = 0; j < storages.length; j++) {
+      DatanodeStorageInfo datanodeStorageInfo = storages[j];
+      StorageType storageType = datanodeStorageInfo.getStorageType();
+      storageTypes[j] = storageType;
+    }
+    List<StorageType> existing =
+        new LinkedList<StorageType>(Arrays.asList(storageTypes));
+    if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
+        existing, true)) {
+      List<StorageTypeNodePair> sourceWithStorageMap =
+          new ArrayList<StorageTypeNodePair>();
+      List<DatanodeStorageInfo> existingBlockStorages =
+          new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
+      for (StorageType existingType : existing) {
+        Iterator<DatanodeStorageInfo> iterator =
+            existingBlockStorages.iterator();
+        while (iterator.hasNext()) {
+          DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+          StorageType storageType = datanodeStorageInfo.getStorageType();
+          if (storageType == existingType) {
+            iterator.remove();
+            sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
+                datanodeStorageInfo.getDatanodeDescriptor()));
+            break;
+          }
         }
-        blockMovingInfos.add(blockMovingInfo);
       }
-    }
 
-    addBlockMovingInfosToCoordinatorDn(blockCollectionID, blockMovingInfos,
-        coordinatorNode);
+      StorageTypeNodeMap locsForExpectedStorageTypes =
+          findTargetsForExpectedStorageTypes(expectedStorageTypes);
+
+      foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
+          blockMovingInfos, blockInfo, existing, sourceWithStorageMap,
+          expectedStorageTypes, locsForExpectedStorageTypes);
+    }
+    return foundMatchingTargetNodesForBlock;
   }
 
-  private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID,
+  private void assignBlockMovingInfosToCoordinatorDn(long blockCollectionID,
       List<BlockMovingInfo> blockMovingInfos,
       DatanodeDescriptor coordinatorNode) {
 
@@ -278,6 +308,11 @@ public class StoragePolicySatisfier implements Runnable {
       return;
     }
 
+    // For now, first datanode will be chosen as the co-ordinator. Later
+    // this can be optimized if needed.
+    coordinatorNode = (DatanodeDescriptor) blockMovingInfos.get(0)
+        .getSources()[0];
+
     boolean needBlockStorageMovement = false;
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       // Check for atleast one block storage movement has been chosen
@@ -301,6 +336,8 @@ public class StoragePolicySatisfier implements Runnable {
    * Find the good target node for each source node for which block storages was
    * misplaced.
    *
+   * @param blockMovingInfos
+   *          - list of block source and target node pair
    * @param blockInfo
    *          - Block
    * @param existing
@@ -311,23 +348,49 @@ public class StoragePolicySatisfier implements Runnable {
    *          - Expecting storages to move
    * @param locsForExpectedStorageTypes
    *          - Available DNs for expected storage types
-   * @return list of block source and target node pair
+   * @return false if some of the block locations failed to find target node to
+   *         satisfy the storage policy
    */
-  private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo,
+  private boolean findSourceAndTargetToMove(
+      List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
       List<StorageType> existing,
       List<StorageTypeNodePair> sourceWithStorageList,
       List<StorageType> expected,
       StorageTypeNodeMap locsForExpectedStorageTypes) {
+    boolean foundMatchingTargetNodesForBlock = true;
     List<DatanodeInfo> sourceNodes = new ArrayList<>();
     List<StorageType> sourceStorageTypes = new ArrayList<>();
     List<DatanodeInfo> targetNodes = new ArrayList<>();
     List<StorageType> targetStorageTypes = new ArrayList<>();
     List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
+
+    // Looping over all the source node locations and choose the target
+    // storage within same node if possible. This is done separately to
+    // avoid choosing a target which already has this block.
     for (int i = 0; i < sourceWithStorageList.size(); i++) {
       StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
       StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
           existingTypeNodePair.dn, expected);
+      if (chosenTarget != null) {
+        sourceNodes.add(existingTypeNodePair.dn);
+        sourceStorageTypes.add(existingTypeNodePair.storageType);
+        targetNodes.add(chosenTarget.dn);
+        targetStorageTypes.add(chosenTarget.storageType);
+        chosenNodes.add(chosenTarget.dn);
+        // TODO: We can increment scheduled block count for this node?
+      }
+    }
 
+    // Looping over all the source node locations. Choose a remote target
+    // storage node if it was not found out within same node.
+    for (int i = 0; i < sourceWithStorageList.size(); i++) {
+      StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
+      StorageTypeNodePair chosenTarget = null;
+      // Chosen the target storage within same datanode. So just skipping this
+      // source node.
+      if (sourceNodes.contains(existingTypeNodePair.dn)) {
+        continue;
+      }
       if (chosenTarget == null && blockManager.getDatanodeManager()
           .getNetworkTopology().isNodeGroupAware()) {
         chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
@@ -359,18 +422,40 @@ public class StoragePolicySatisfier implements Runnable {
             "Failed to choose target datanode for the required"
                 + " storage types {}, block:{}, existing storage type:{}",
             expected, blockInfo, existingTypeNodePair.storageType);
-        sourceNodes.add(existingTypeNodePair.dn);
-        sourceStorageTypes.add(existingTypeNodePair.storageType);
-        // Imp: Not setting the target details, empty targets. Later, this is
-        // used as an indicator for retrying this block movement.
+        foundMatchingTargetNodesForBlock = false;
       }
     }
-    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo,
+
+    blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes,
+        sourceStorageTypes, targetNodes, targetStorageTypes));
+    return foundMatchingTargetNodesForBlock;
+  }
+
+  private List<BlockMovingInfo> getBlockMovingInfos(BlockInfo blockInfo,
+      List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
+      List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes) {
+    List<BlockMovingInfo> blkMovingInfos = new ArrayList<>();
+    // No source-target node pair exists.
+    if (sourceNodes.size() <= 0) {
+      return blkMovingInfos;
+    }
+    buildBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes,
+        targetNodes, targetStorageTypes, blkMovingInfos);
+    return blkMovingInfos;
+  }
+
+  private void buildBlockMovingInfos(BlockInfo blockInfo,
+      List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
+      List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
+      List<BlockMovingInfo> blkMovingInfos) {
+    Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
+        blockInfo.getGenerationStamp());
+    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk,
         sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]),
         targetNodes.toArray(new DatanodeInfo[targetNodes.size()]),
         sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]),
         targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()]));
-    return blkMovingInfo;
+    blkMovingInfos.add(blkMovingInfo);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 8c70d99..6641134 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -33,13 +33,13 @@ public class TestBlockStorageMovementAttemptedItems {
 
   private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
   private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
+  private final int selfRetryTimeout = 500;
 
   @Before
   public void setup() {
     unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
-    bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, 500,
-        unsatisfiedStorageMovementFiles);
-    bsmAttemptedItems.start();
+    bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
+        selfRetryTimeout, unsatisfiedStorageMovementFiles);
   }
 
   @After
@@ -72,8 +72,9 @@ public class TestBlockStorageMovementAttemptedItems {
 
   @Test(timeout = 30000)
   public void testAddResultWithFailureResult() throws Exception {
+    bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(item);
+    bsmAttemptedItems.add(item, true);
     bsmAttemptedItems.addResults(
         new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
             item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
@@ -82,8 +83,9 @@ public class TestBlockStorageMovementAttemptedItems {
 
   @Test(timeout = 30000)
   public void testAddResultWithSucessResult() throws Exception {
+    bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(item);
+    bsmAttemptedItems.add(item, true);
     bsmAttemptedItems.addResults(
         new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
             item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@@ -92,10 +94,93 @@ public class TestBlockStorageMovementAttemptedItems {
 
   @Test(timeout = 30000)
   public void testNoResultAdded() throws Exception {
+    bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(item);
-    // After selfretry timeout, it should be added back for retry
-    assertTrue(checkItemMovedForRetry(item, 600));
+    bsmAttemptedItems.add(item, true);
+    // After self retry timeout, it should be added back for retry
+    assertTrue("Failed to add to the retry list",
+        checkItemMovedForRetry(item, 600));
+    assertEquals("Failed to remove from the attempted list", 0,
+        bsmAttemptedItems.getAttemptedItemsCount());
   }
 
+  /**
+   * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
+   * first occurrence is #blockStorageMovementResultCheck() and then
+   * #blocksStorageMovementUnReportedItemsCheck().
+   */
+  @Test(timeout = 30000)
+  public void testPartialBlockMovementShouldBeRetried1() throws Exception {
+    Long item = new Long(1234);
+    bsmAttemptedItems.add(item, false);
+    bsmAttemptedItems.addResults(
+        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+            item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
+
+    // start block movement result monitor thread
+    bsmAttemptedItems.start();
+    assertTrue("Failed to add to the retry list",
+        checkItemMovedForRetry(item, 5000));
+    assertEquals("Failed to remove from the attempted list", 0,
+        bsmAttemptedItems.getAttemptedItemsCount());
+  }
+
+  /**
+   * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
+   * first occurrence is #blocksStorageMovementUnReportedItemsCheck() and then
+   * #blockStorageMovementResultCheck().
+   */
+  @Test(timeout = 30000)
+  public void testPartialBlockMovementShouldBeRetried2() throws Exception {
+    Long item = new Long(1234);
+    bsmAttemptedItems.add(item, false);
+    bsmAttemptedItems.addResults(
+        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+            item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
+
+    Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
+
+    bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck();
+    bsmAttemptedItems.blockStorageMovementResultCheck();
+
+    assertTrue("Failed to add to the retry list",
+        checkItemMovedForRetry(item, 5000));
+    assertEquals("Failed to remove from the attempted list", 0,
+        bsmAttemptedItems.getAttemptedItemsCount());
+  }
+
+  /**
+   * Partial block movement with only BlocksStorageMovementResult#FAILURE result
+   * and storageMovementAttemptedItems list is empty.
+   */
+  @Test(timeout = 30000)
+  public void testPartialBlockMovementShouldBeRetried3() throws Exception {
+    Long item = new Long(1234);
+    bsmAttemptedItems.addResults(
+        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+            item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
+    bsmAttemptedItems.blockStorageMovementResultCheck();
+    assertTrue("Failed to add to the retry list",
+        checkItemMovedForRetry(item, 5000));
+    assertEquals("Failed to remove from the attempted list", 0,
+        bsmAttemptedItems.getAttemptedItemsCount());
+  }
+
+  /**
+   * Partial block movement with BlocksStorageMovementResult#FAILURE result and
+   * storageMovementAttemptedItems.
+   */
+  @Test(timeout = 30000)
+  public void testPartialBlockMovementShouldBeRetried4() throws Exception {
+    Long item = new Long(1234);
+    bsmAttemptedItems.add(item, false);
+    bsmAttemptedItems.addResults(
+        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+            item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
+    bsmAttemptedItems.blockStorageMovementResultCheck();
+    assertTrue("Failed to add to the retry list",
+        checkItemMovedForRetry(item, 5000));
+    assertEquals("Failed to remove from the attempted list", 0,
+        bsmAttemptedItems.getAttemptedItemsCount());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 179b66b..718dbcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -527,6 +527,59 @@ public class TestStoragePolicySatisfier {
     waitExpectedStorageType(file1, StorageType.DISK, 2, 30000);
   }
 
+  /**
+   * Tests to verify that for the given path, only few of the blocks or block
+   * src locations(src nodes) under the given path will be scheduled for block
+   * movement.
+   *
+   * For example, there are two block for a file:
+   *
+   * File1 => two blocks and default storage policy(HOT).
+   * blk_1[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)],
+   * blk_2[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)].
+   *
+   * Now, set storage policy to COLD.
+   * Only two Dns are available with expected storage type ARCHIVE, say A, E.
+   *
+   * SPS will schedule block movement to the coordinator node with the details,
+   * blk_1[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)],
+   * blk_2[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)].
+   */
+  @Test(timeout = 300000)
+  public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes()
+      throws Exception {
+    try {
+      int numOfDns = 5;
+      config.setLong("dfs.block.size", 1024);
+      allDiskTypes =
+          new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+              {StorageType.DISK, StorageType.DISK},
+              {StorageType.DISK, StorageType.DISK},
+              {StorageType.DISK, StorageType.DISK},
+              {StorageType.DISK, StorageType.ARCHIVE}};
+      hdfsCluster = startCluster(config, allDiskTypes, numOfDns,
+          storagesPerDatanode, capacity);
+      dfs = hdfsCluster.getFileSystem();
+      writeContent(file, (short) 5);
+
+      // Change policy to COLD
+      dfs.setStoragePolicy(new Path(file), "COLD");
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      hdfsCluster.triggerHeartbeats();
+      // Wait till StorgePolicySatisfier identified that block to move to
+      // ARCHIVE area.
+      waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000);
+      waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
+
+      waitForBlocksMovementResult(1, 30000);
+    } finally {
+      shutdownCluster();
+    }
+  }
+
   private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
       throws IOException {
     ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
@@ -561,7 +614,7 @@ public class TestStoragePolicySatisfier {
       DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
       favoredNodesCount--;
       if (favoredNodesCount <= 0) {
-        break;// marked favoredNodesCount number of pinned block location
+        break; // marked favoredNodesCount number of pinned block location
       }
     }
     return file1;
@@ -600,8 +653,14 @@ public class TestStoragePolicySatisfier {
   }
 
   private void writeContent(final String fileName) throws IOException {
+    writeContent(fileName, (short) 3);
+  }
+
+  private void writeContent(final String fileName, short replicatonFactor)
+      throws IOException {
     // write to DISK
-    final FSDataOutputStream out = dfs.create(new Path(fileName));
+    final FSDataOutputStream out = dfs.create(new Path(fileName),
+        replicatonFactor);
     for (int i = 0; i < 1000; i++) {
       out.writeChars("t");
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org