You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/24 07:32:30 UTC

[35/50] [abbrv] hadoop git commit: HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block storage movements. Contributed by Rakesh R.

HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block storage movements. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8fb4a3d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8fb4a3d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8fb4a3d8

Branch: refs/heads/HDFS-10285
Commit: 8fb4a3d84d825502fd776033a955ed928f0fc651
Parents: a2808be
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Thu Oct 12 17:17:51 2017 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Wed Jan 24 11:23:17 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   8 +-
 .../DatanodeProtocolClientSideTranslatorPB.java |  12 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   4 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 150 +++-----
 .../blockmanagement/DatanodeDescriptor.java     |  50 ++-
 .../server/blockmanagement/DatanodeManager.java | 104 ++++--
 .../hdfs/server/datanode/BPOfferService.java    |   3 +-
 .../hdfs/server/datanode/BPServiceActor.java    |  33 +-
 .../datanode/BlockStorageMovementTracker.java   |  80 ++---
 .../datanode/StoragePolicySatisfyWorker.java    | 214 ++++--------
 .../BlockStorageMovementAttemptedItems.java     | 299 ++++------------
 .../BlockStorageMovementInfosBatch.java         |  61 ----
 .../hdfs/server/namenode/FSNamesystem.java      |  11 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   7 +-
 .../server/namenode/StoragePolicySatisfier.java | 343 ++++++++++---------
 .../protocol/BlockStorageMovementCommand.java   |  99 ++----
 .../BlocksStorageMoveAttemptFinished.java       |  48 +++
 .../protocol/BlocksStorageMovementResult.java   |  74 ----
 .../hdfs/server/protocol/DatanodeProtocol.java  |   5 +-
 .../src/main/proto/DatanodeProtocol.proto       |  30 +-
 .../src/main/resources/hdfs-default.xml         |  21 +-
 .../src/site/markdown/ArchivalStorage.md        |   6 +-
 .../TestNameNodePrunesMissingStorages.java      |   5 +-
 .../datanode/InternalDataNodeTestUtils.java     |   4 +-
 .../server/datanode/TestBPOfferService.java     |   4 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   4 +-
 .../server/datanode/TestDataNodeLifeline.java   |   6 +-
 .../TestDatanodeProtocolRetryPolicy.java        |   4 +-
 .../server/datanode/TestFsDatasetCache.java     |   4 +-
 .../TestStoragePolicySatisfyWorker.java         |  52 ++-
 .../hdfs/server/datanode/TestStorageReport.java |   4 +-
 .../server/namenode/NNThroughputBenchmark.java  |   6 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   4 +-
 .../TestBlockStorageMovementAttemptedItems.java | 145 ++++----
 .../hdfs/server/namenode/TestDeadDatanode.java  |   4 +-
 .../namenode/TestStoragePolicySatisfier.java    | 115 ++++++-
 ...stStoragePolicySatisfierWithStripedFile.java |  20 +-
 37 files changed, 908 insertions(+), 1135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 38244fd..2b6aa46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -613,11 +613,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY =
       "dfs.storage.policy.satisfier.recheck.timeout.millis";
   public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT =
-      5 * 60 * 1000;
+      1 * 60 * 1000;
   public static final String DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY =
       "dfs.storage.policy.satisfier.self.retry.timeout.millis";
   public static final int DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT =
-      20 * 60 * 1000;
+      5 * 60 * 1000;
+  public static final String DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY =
+      "dfs.storage.policy.satisfier.low.max-streams.preference";
+  public static final boolean DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT =
+      false;
 
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
   public static final int     DFS_DATANODE_DEFAULT_PORT = 9866;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 9dd87d0..dcc0705 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -140,7 +140,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
       @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMovementResult[] blksMovementResults) throws IOException {
+      BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+          throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -165,8 +166,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
     }
 
     // Adding blocks movement results to the heart beat request.
-    builder.addAllBlksMovementResults(
-        PBHelper.convertBlksMovResults(blksMovementResults));
+    if (storageMovementFinishedBlks != null
+        && storageMovementFinishedBlks.getBlocks() != null) {
+      builder.setStorageMoveAttemptFinishedBlks(
+          PBHelper.convertBlksMovReport(storageMovementFinishedBlks));
+    }
 
     HeartbeatResponseProto resp;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 40458ef..b5bb80a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -123,8 +123,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
           volumeFailureSummary, request.getRequestFullBlockReportLease(),
           PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
           PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
-          PBHelper.convertBlksMovResults(
-              request.getBlksMovementResultsList()));
+          PBHelper.convertBlksMovReport(
+              request.getStorageMoveAttemptFinishedBlks()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 996b986..38f72c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBand
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockMovingInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
@@ -56,11 +57,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerRepo
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
@@ -104,8 +105,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -971,59 +971,27 @@ public class PBHelper {
     return SlowDiskReports.create(slowDisksMap);
   }
 
-  public static BlocksStorageMovementResult[] convertBlksMovResults(
-      List<BlocksStorageMovementResultProto> protos) {
-    BlocksStorageMovementResult[] results =
-        new BlocksStorageMovementResult[protos.size()];
-    for (int i = 0; i < protos.size(); i++) {
-      BlocksStorageMovementResultProto resultProto = protos.get(i);
-      BlocksStorageMovementResult.Status status;
-      switch (resultProto.getStatus()) {
-      case SUCCESS:
-        status = Status.SUCCESS;
-        break;
-      case FAILURE:
-        status = Status.FAILURE;
-        break;
-      case IN_PROGRESS:
-        status = Status.IN_PROGRESS;
-        break;
-      default:
-        throw new AssertionError("Unknown status: " + resultProto.getStatus());
-      }
-      results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(),
-          status);
+  public static BlocksStorageMoveAttemptFinished convertBlksMovReport(
+      BlocksStorageMoveAttemptFinishedProto proto) {
+
+    List<BlockProto> blocksList = proto.getBlocksList();
+    Block[] blocks = new Block[blocksList.size()];
+    for (int i = 0; i < blocksList.size(); i++) {
+      BlockProto blkProto = blocksList.get(i);
+      blocks[i] = PBHelperClient.convert(blkProto);
     }
-    return results;
+    return new BlocksStorageMoveAttemptFinished(blocks);
   }
 
-  public static List<BlocksStorageMovementResultProto> convertBlksMovResults(
-      BlocksStorageMovementResult[] blocksMovementResults) {
-    List<BlocksStorageMovementResultProto> blocksMovementResultsProto =
-        new ArrayList<>();
-    BlocksStorageMovementResultProto.Builder builder =
-        BlocksStorageMovementResultProto.newBuilder();
-    for (int i = 0; i < blocksMovementResults.length; i++) {
-      BlocksStorageMovementResult report = blocksMovementResults[i];
-      builder.setTrackID(report.getTrackId());
-      BlocksStorageMovementResultProto.Status status;
-      switch (report.getStatus()) {
-      case SUCCESS:
-        status = BlocksStorageMovementResultProto.Status.SUCCESS;
-        break;
-      case FAILURE:
-        status = BlocksStorageMovementResultProto.Status.FAILURE;
-        break;
-      case IN_PROGRESS:
-        status = BlocksStorageMovementResultProto.Status.IN_PROGRESS;
-        break;
-      default:
-        throw new AssertionError("Unknown status: " + report.getStatus());
-      }
-      builder.setStatus(status);
-      blocksMovementResultsProto.add(builder.build());
+  public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport(
+      BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) {
+    BlocksStorageMoveAttemptFinishedProto.Builder builder =
+        BlocksStorageMoveAttemptFinishedProto.newBuilder();
+    Block[] blocks = blocksMoveAttemptFinished.getBlocks();
+    for (Block block : blocks) {
+      builder.addBlocks(PBHelperClient.convert(block));
     }
-    return blocksMovementResultsProto;
+    return builder.build();
   }
 
   public static JournalInfo convert(JournalInfoProto info) {
@@ -1211,34 +1179,34 @@ public class PBHelper {
     BlockStorageMovementCommandProto.Builder builder =
         BlockStorageMovementCommandProto.newBuilder();
 
-    builder.setTrackID(blkStorageMovementCmd.getTrackID());
     builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId());
     Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd
         .getBlockMovingTasks();
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      builder.addBlockStorageMovement(
-          convertBlockMovingInfo(blkMovingInfo));
+      builder.addBlockMovingInfo(convertBlockMovingInfo(blkMovingInfo));
     }
     return builder.build();
   }
 
-  private static BlockStorageMovementProto convertBlockMovingInfo(
+  private static BlockMovingInfoProto convertBlockMovingInfo(
       BlockMovingInfo blkMovingInfo) {
-    BlockStorageMovementProto.Builder builder = BlockStorageMovementProto
+    BlockMovingInfoProto.Builder builder = BlockMovingInfoProto
         .newBuilder();
     builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock()));
 
-    DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources();
-    builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+    DatanodeInfo sourceDnInfo = blkMovingInfo.getSource();
+    builder.setSourceDnInfo(PBHelperClient.convert(sourceDnInfo));
 
-    DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets();
-    builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+    DatanodeInfo targetDnInfo = blkMovingInfo.getTarget();
+    builder.setTargetDnInfo(PBHelperClient.convert(targetDnInfo));
 
-    StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes();
-    builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes));
+    StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
+    builder.setSourceStorageType(
+        PBHelperClient.convertStorageType(sourceStorageType));
 
-    StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes();
-    builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+    StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
+    builder.setTargetStorageType(
+        PBHelperClient.convertStorageType(targetStorageType));
 
     return builder.build();
   }
@@ -1246,42 +1214,38 @@ public class PBHelper {
   private static DatanodeCommand convert(
       BlockStorageMovementCommandProto blkStorageMovementCmdProto) {
     Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
-    List<BlockStorageMovementProto> blkSPSatisfyList =
-        blkStorageMovementCmdProto.getBlockStorageMovementList();
-    for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) {
+    List<BlockMovingInfoProto> blkSPSatisfyList =
+        blkStorageMovementCmdProto.getBlockMovingInfoList();
+    for (BlockMovingInfoProto blkSPSatisfy : blkSPSatisfyList) {
       blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy));
     }
     return new BlockStorageMovementCommand(
         DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
-        blkStorageMovementCmdProto.getTrackID(),
         blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos);
   }
 
   private static BlockMovingInfo convertBlockMovingInfo(
-      BlockStorageMovementProto blockStoragePolicySatisfyProto) {
-    BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock();
+      BlockMovingInfoProto blockStorageMovingInfoProto) {
+    BlockProto blockProto = blockStorageMovingInfoProto.getBlock();
     Block block = PBHelperClient.convert(blockProto);
 
-    DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto
-        .getSourceDnInfos();
-    DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto);
-
-    DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto
-        .getTargetDnInfos();
-    DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto);
-
-    StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto
-        .getSourceStorageTypes();
-    StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes(
-        srcStorageTypesProto.getStorageTypesList(),
-        srcStorageTypesProto.getStorageTypesList().size());
-
-    StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto
-        .getTargetStorageTypes();
-    StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes(
-        targetStorageTypesProto.getStorageTypesList(),
-        targetStorageTypesProto.getStorageTypesList().size());
-    return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos,
-        srcStorageTypes, targetStorageTypes);
+    DatanodeInfoProto sourceDnInfoProto = blockStorageMovingInfoProto
+        .getSourceDnInfo();
+    DatanodeInfo sourceDnInfo = PBHelperClient.convert(sourceDnInfoProto);
+
+    DatanodeInfoProto targetDnInfoProto = blockStorageMovingInfoProto
+        .getTargetDnInfo();
+    DatanodeInfo targetDnInfo = PBHelperClient.convert(targetDnInfoProto);
+    StorageTypeProto srcStorageTypeProto = blockStorageMovingInfoProto
+        .getSourceStorageType();
+    StorageType srcStorageType = PBHelperClient
+        .convertStorageType(srcStorageTypeProto);
+
+    StorageTypeProto targetStorageTypeProto = blockStorageMovingInfoProto
+        .getTargetStorageType();
+    StorageType targetStorageType = PBHelperClient
+        .convertStorageType(targetStorageTypeProto);
+    return new BlockMovingInfo(block, sourceDnInfo, targetDnInfo,
+        srcStorageType, targetStorageType);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 0311b02..f9a76b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@@ -212,7 +211,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * A queue of blocks corresponding to trackID for moving its storage
    * placements by this datanode.
    */
-  private final Queue<BlockStorageMovementInfosBatch> storageMovementBlocks =
+  private final Queue<BlockMovingInfo> storageMovementBlocks =
       new LinkedList<>();
   private volatile boolean dropSPSWork = false;
 
@@ -1079,30 +1078,45 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /**
    * Add the block infos which needs to move its storage locations.
    *
-   * @param trackID
-   *          - unique identifier which will be used for tracking the given set
-   *          of blocks movement completion.
-   * @param storageMismatchedBlocks
-   *          - storage mismatched block infos
+   * @param blkMovingInfo
+   *          - storage mismatched block info
    */
-  public void addBlocksToMoveStorage(long trackID,
-      List<BlockMovingInfo> storageMismatchedBlocks) {
+  public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) {
     synchronized (storageMovementBlocks) {
-      storageMovementBlocks.offer(
-          new BlockStorageMovementInfosBatch(trackID, storageMismatchedBlocks));
+      storageMovementBlocks.offer(blkMovingInfo);
     }
   }
 
   /**
-   * @return block infos which needs to move its storage locations. This returns
-   *         list of blocks under one trackId.
+   * Return the number of blocks queued up for movement.
    */
-  public BlockStorageMovementInfosBatch getBlocksToMoveStorages() {
+  public int getNumberOfBlocksToMoveStorages() {
+    return storageMovementBlocks.size();
+  }
+
+  /**
+   * Get the blocks to move to satisfy the storage media type.
+   *
+   * @param numBlocksToMoveTasks
+   *          total number of blocks which will be send to this datanode for
+   *          block movement.
+   *
+   * @return block infos which needs to move its storage locations.
+   */
+  public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) {
     synchronized (storageMovementBlocks) {
-      // TODO: Presently returning the list of blocks under one trackId.
-      // Need to limit the list of items into small batches with in trackId
-      // itself if blocks are many(For example: a file contains many blocks).
-      return storageMovementBlocks.poll();
+      List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+      for (; !storageMovementBlocks.isEmpty()
+          && numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) {
+        blockMovingInfos.add(storageMovementBlocks.poll());
+      }
+      BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
+          .size()];
+      blkMoveArray = blockMovingInfos.toArray(blkMoveArray);
+      if (blkMoveArray.length > 0) {
+        return blkMoveArray;
+      }
+      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 6187b37..ae5fe22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.*;
 import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@@ -208,6 +208,8 @@ public class DatanodeManager {
    */
   private final long timeBetweenResendingCachingDirectivesMs;
 
+  private final boolean blocksToMoveShareEqualRatio;
+
   DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
@@ -332,6 +334,12 @@ public class DatanodeManager {
     this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong(
         DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY,
         DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
+
+    // SPS configuration to decide blocks to move can share equal ratio of
+    // maxtransfers with pending replica and erasure-coded reconstruction tasks
+    blocksToMoveShareEqualRatio = conf.getBoolean(
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT);
   }
 
   private static long getStaleIntervalFromConf(Configuration conf,
@@ -1094,13 +1102,14 @@ public class DatanodeManager {
           // Sets dropSPSWork flag to true, to ensure that
           // DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat
           // response immediately after the node registration. This is
-          // to avoid a situation, where multiple trackId responses coming from
-          // different co-odinator datanodes. After SPS monitor time out, it
-          // will retry the files which were scheduled to the disconnected(for
-          // long time more than heartbeat expiry) DN, by finding new
-          // co-ordinator datanode. Now, if the expired datanode reconnects back
-          // after SPS reschedules, it leads to get different movement results
-          // from reconnected and new DN co-ordinators.
+          // to avoid a situation, where multiple block attempt finished
+          // responses coming from different datanodes. After SPS monitor time
+          // out, it will retry the files which were scheduled to the
+          // disconnected(for long time more than heartbeat expiry) DN, by
+          // finding new datanode. Now, if the expired datanode reconnects back
+          // after SPS reschedules, it leads to get different movement attempt
+          // finished report from reconnected and newly datanode which is
+          // attempting the block movement.
           nodeS.setDropSPSWork(true);
 
           // resolve network location
@@ -1680,19 +1689,47 @@ public class DatanodeManager {
     final List<DatanodeCommand> cmds = new ArrayList<>();
     // Allocate _approximately_ maxTransfers pending tasks to DataNode.
     // NN chooses pending tasks based on the ratio between the lengths of
-    // replication and erasure-coded block queues.
+    // replication, erasure-coded block queues and block storage movement
+    // queues.
     int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
     int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
+    int totalBlocksToMove = nodeinfo.getNumberOfBlocksToMoveStorages();
     int totalBlocks = totalReplicateBlocks + totalECBlocks;
-    if (totalBlocks > 0) {
-      int numReplicationTasks = (int) Math.ceil(
-          (double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
-      int numECTasks = (int) Math.ceil(
-          (double) (totalECBlocks * maxTransfers) / totalBlocks);
-
+    if (totalBlocks > 0 || totalBlocksToMove > 0) {
+      int numReplicationTasks = 0;
+      int numECTasks = 0;
+      int numBlocksToMoveTasks = 0;
+      // Check blocksToMoveShareEqualRatio configuration is true/false. If true,
+      // then equally sharing the max transfer. Otherwise gives high priority to
+      // the pending_replica/erasure-coded tasks and only the delta streams will
+      // be used for blocks to move tasks.
+      if (blocksToMoveShareEqualRatio) {
+        // add blocksToMove count to total blocks so that will get equal share
+        totalBlocks = totalBlocks + totalBlocksToMove;
+        numReplicationTasks = (int) Math
+            .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
+        numECTasks = (int) Math
+            .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
+        numBlocksToMoveTasks = (int) Math
+            .ceil((double) (totalBlocksToMove * maxTransfers) / totalBlocks);
+      } else {
+        // Calculate the replica and ec tasks, then pick blocksToMove if there
+        // is any streams available.
+        numReplicationTasks = (int) Math
+            .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
+        numECTasks = (int) Math
+            .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
+        int numTasks = numReplicationTasks + numECTasks;
+        if (numTasks < maxTransfers) {
+          int remainingMaxTransfers = maxTransfers - numTasks;
+          numBlocksToMoveTasks = Math.min(totalBlocksToMove,
+              remainingMaxTransfers);
+        }
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Pending replication tasks: " + numReplicationTasks
-            + " erasure-coded tasks: " + numECTasks);
+            + " erasure-coded tasks: " + numECTasks + " blocks to move tasks: "
+            + numBlocksToMoveTasks);
       }
       // check pending replication tasks
       List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
@@ -1708,6 +1745,23 @@ public class DatanodeManager {
         cmds.add(new BlockECReconstructionCommand(
             DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
       }
+      // check pending block storage movement tasks
+      if (nodeinfo.shouldDropSPSWork()) {
+        cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
+        // Set back to false to indicate that the new value has been sent to the
+        // datanode.
+        nodeinfo.setDropSPSWork(false);
+      } else {
+        // Get pending block storage movement tasks
+        BlockMovingInfo[] blkStorageMovementInfos = nodeinfo
+            .getBlocksToMoveStorages(numBlocksToMoveTasks);
+
+        if (blkStorageMovementInfos != null) {
+          cmds.add(new BlockStorageMovementCommand(
+              DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, blockPoolId,
+              Arrays.asList(blkStorageMovementInfos)));
+        }
+      }
     }
 
     // check block invalidation
@@ -1751,24 +1805,6 @@ public class DatanodeManager {
       }
     }
 
-    if (nodeinfo.shouldDropSPSWork()) {
-      cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
-      // Set back to false to indicate that the new value has been sent to the
-      // datanode.
-      nodeinfo.setDropSPSWork(false);
-    }
-
-    // check pending block storage movement tasks
-    BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo
-        .getBlocksToMoveStorages();
-
-    if (blkStorageMovementInfosBatch != null) {
-      cmds.add(new BlockStorageMovementCommand(
-          DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
-          blkStorageMovementInfosBatch.getTrackID(), blockPoolId,
-          blkStorageMovementInfosBatch.getBlockMovingInfo()));
-    }
-
     if (!cmds.isEmpty()) {
       return cmds.toArray(new DatanodeCommand[cmds.size()]);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 9308471..1656b16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -799,8 +799,7 @@ class BPOfferService {
       LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT");
       BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd;
       dn.getStoragePolicySatisfyWorker().processBlockMovingTasks(
-          blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
-          blkSPSCmd.getBlockMovingTasks());
+          blkSPSCmd.getBlockPoolId(), blkSPSCmd.getBlockMovingTasks());
       break;
     case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
       LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index f537f49..b7beda4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -50,7 +51,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -513,8 +514,11 @@ class BPServiceActor implements Runnable {
             SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
             SlowDiskReports.EMPTY_REPORT;
 
-    BlocksStorageMovementResult[] blksMovementResults =
-        getBlocksMovementResults();
+    // Get the blocks storage move attempt finished blocks
+    List<Block> results = dn.getStoragePolicySatisfyWorker()
+        .getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks();
+    BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks =
+        getStorageMoveAttemptFinishedBlocks(results);
 
     HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
@@ -527,7 +531,7 @@ class BPServiceActor implements Runnable {
         requestBlockReportLease,
         slowPeers,
         slowDisks,
-        blksMovementResults);
+        storageMoveAttemptFinishedBlks);
 
     if (outliersReportDue) {
       // If the report was due and successfully sent, schedule the next one.
@@ -537,20 +541,23 @@ class BPServiceActor implements Runnable {
     // Remove the blocks movement results after successfully transferring
     // to namenode.
     dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
-        .remove(blksMovementResults);
+        .remove(results);
 
     return response;
   }
 
-  private BlocksStorageMovementResult[] getBlocksMovementResults() {
-    List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
-        .getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
-        .getBlksMovementResults();
-    BlocksStorageMovementResult[] blksMovementResult =
-        new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];
-    trackIdVsMovementStatus.toArray(blksMovementResult);
+  private BlocksStorageMoveAttemptFinished getStorageMoveAttemptFinishedBlocks(
+      List<Block> finishedBlks) {
 
-    return blksMovementResult;
+    if (finishedBlks.isEmpty()) {
+      return null;
+    }
+
+    // Create BlocksStorageMoveAttemptFinished with currently finished
+    // blocks
+    Block[] blockList = new Block[finishedBlks.size()];
+    finishedBlks.toArray(blockList);
+    return new BlocksStorageMoveAttemptFinished(blockList);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index f3d2bb6..b3b9fd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -21,14 +21,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished;
 import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +41,12 @@ import org.slf4j.LoggerFactory;
 public class BlockStorageMovementTracker implements Runnable {
   private static final Logger LOG = LoggerFactory
       .getLogger(BlockStorageMovementTracker.class);
-  private final CompletionService<BlockMovementResult> moverCompletionService;
+  private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
   private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
 
-  // Keeps the information - trackID vs its list of blocks
-  private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures;
-  private final Map<Long, List<BlockMovementResult>> movementResults;
+  // Keeps the information - block vs its list of future move tasks
+  private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures;
+  private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
 
   private volatile boolean running = true;
 
@@ -59,7 +59,7 @@ public class BlockStorageMovementTracker implements Runnable {
    *          blocks movements status handler
    */
   public BlockStorageMovementTracker(
-      CompletionService<BlockMovementResult> moverCompletionService,
+      CompletionService<BlockMovementAttemptFinished> moverCompletionService,
       BlocksMovementsStatusHandler handler) {
     this.moverCompletionService = moverCompletionService;
     this.moverTaskFutures = new HashMap<>();
@@ -82,32 +82,33 @@ public class BlockStorageMovementTracker implements Runnable {
         }
       }
       try {
-        Future<BlockMovementResult> future = moverCompletionService.take();
+        Future<BlockMovementAttemptFinished> future =
+            moverCompletionService.take();
         if (future != null) {
-          BlockMovementResult result = future.get();
+          BlockMovementAttemptFinished result = future.get();
           LOG.debug("Completed block movement. {}", result);
-          long trackId = result.getTrackId();
-          List<Future<BlockMovementResult>> blocksMoving = moverTaskFutures
-              .get(trackId);
+          Block block = result.getBlock();
+          List<Future<BlockMovementAttemptFinished>> blocksMoving =
+              moverTaskFutures.get(block);
           if (blocksMoving == null) {
-            LOG.warn("Future task doesn't exist for trackId " + trackId);
+            LOG.warn("Future task doesn't exist for block : {} ", block);
             continue;
           }
           blocksMoving.remove(future);
 
-          List<BlockMovementResult> resultPerTrackIdList =
-              addMovementResultToTrackIdList(result);
+          List<BlockMovementAttemptFinished> resultPerTrackIdList =
+              addMovementResultToBlockIdList(result);
 
           // Completed all the scheduled blocks movement under this 'trackId'.
-          if (blocksMoving.isEmpty() || moverTaskFutures.get(trackId) == null) {
+          if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
             synchronized (moverTaskFutures) {
-              moverTaskFutures.remove(trackId);
+              moverTaskFutures.remove(block);
             }
             if (running) {
               // handle completed or inprogress blocks movements per trackId.
               blksMovementsStatusHandler.handle(resultPerTrackIdList);
             }
-            movementResults.remove(trackId);
+            movementResults.remove(block);
           }
         }
       } catch (InterruptedException e) {
@@ -123,38 +124,39 @@ public class BlockStorageMovementTracker implements Runnable {
     }
   }
 
-  private List<BlockMovementResult> addMovementResultToTrackIdList(
-      BlockMovementResult result) {
-    long trackId = result.getTrackId();
-    List<BlockMovementResult> perTrackIdList;
+  private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
+      BlockMovementAttemptFinished result) {
+    Block block = result.getBlock();
+    List<BlockMovementAttemptFinished> perBlockIdList;
     synchronized (movementResults) {
-      perTrackIdList = movementResults.get(trackId);
-      if (perTrackIdList == null) {
-        perTrackIdList = new ArrayList<>();
-        movementResults.put(trackId, perTrackIdList);
+      perBlockIdList = movementResults.get(block);
+      if (perBlockIdList == null) {
+        perBlockIdList = new ArrayList<>();
+        movementResults.put(block, perBlockIdList);
       }
-      perTrackIdList.add(result);
+      perBlockIdList.add(result);
     }
-    return perTrackIdList;
+    return perBlockIdList;
   }
 
   /**
    * Add future task to the tracking list to check the completion status of the
    * block movement.
    *
-   * @param trackID
-   *          tracking Id
+   * @param blockID
+   *          block identifier
    * @param futureTask
    *          future task used for moving the respective block
    */
-  void addBlock(long trackID, Future<BlockMovementResult> futureTask) {
+  void addBlock(Block block,
+      Future<BlockMovementAttemptFinished> futureTask) {
     synchronized (moverTaskFutures) {
-      List<Future<BlockMovementResult>> futures = moverTaskFutures
-          .get(Long.valueOf(trackID));
+      List<Future<BlockMovementAttemptFinished>> futures =
+          moverTaskFutures.get(block);
       // null for the first task
       if (futures == null) {
         futures = new ArrayList<>();
-        moverTaskFutures.put(trackID, futures);
+        moverTaskFutures.put(block, futures);
       }
       futures.add(futureTask);
       // Notify waiting tracker thread about the newly added tasks.
@@ -175,16 +177,6 @@ public class BlockStorageMovementTracker implements Runnable {
   }
 
   /**
-   * @return the list of trackIds which are still waiting to complete all the
-   *         scheduled blocks movements.
-   */
-  Set<Long> getInProgressTrackIds() {
-    synchronized (moverTaskFutures) {
-      return moverTaskFutures.keySet();
-    }
-  }
-
-  /**
    * Sets running flag to false and clear the pending movement result queues.
    */
   public void stopTracking() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 4e57805..47318f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
-import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -32,9 +31,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
@@ -62,7 +59,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -89,14 +85,11 @@ public class StoragePolicySatisfyWorker {
 
   private final int moverThreads;
   private final ExecutorService moveExecutor;
-  private final CompletionService<BlockMovementResult> moverCompletionService;
+  private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
   private final BlocksMovementsStatusHandler handler;
   private final BlockStorageMovementTracker movementTracker;
   private Daemon movementTrackerThread;
 
-  private long inprogressTrackIdsCheckInterval = 30 * 1000; // 30seconds.
-  private long nextInprogressRecheckTime;
-
   public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
     this.datanode = datanode;
     this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
@@ -111,16 +104,6 @@ public class StoragePolicySatisfyWorker {
     movementTrackerThread = new Daemon(movementTracker);
     movementTrackerThread.setName("BlockStorageMovementTracker");
 
-    // Interval to check that the inprogress trackIds. The time interval is
-    // proportional o the heart beat interval time period.
-    final long heartbeatIntervalSeconds = conf.getTimeDuration(
-        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
-        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
-    inprogressTrackIdsCheckInterval = 5 * heartbeatIntervalSeconds;
-    // update first inprogress recheck time to a future time stamp.
-    nextInprogressRecheckTime = monotonicNow()
-        + inprogressTrackIdsCheckInterval;
-
     // TODO: Needs to manage the number of concurrent moves per DataNode.
   }
 
@@ -186,30 +169,26 @@ public class StoragePolicySatisfyWorker {
    * separate thread. Each task will move the block replica to the target node &
    * wait for the completion.
    *
-   * @param trackID
-   *          unique tracking identifier
-   * @param blockPoolID
-   *          block pool ID
+   * @param blockPoolID block pool identifier
+   *
    * @param blockMovingInfos
    *          list of blocks to be moved
    */
-  public void processBlockMovingTasks(long trackID, String blockPoolID,
-      Collection<BlockMovingInfo> blockMovingInfos) {
+  public void processBlockMovingTasks(final String blockPoolID,
+      final Collection<BlockMovingInfo> blockMovingInfos) {
     LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      assert blkMovingInfo.getSources().length == blkMovingInfo
-          .getTargets().length;
-      for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
-        DatanodeInfo target = blkMovingInfo.getTargets()[i];
-        BlockMovingTask blockMovingTask = new BlockMovingTask(
-            trackID, blockPoolID, blkMovingInfo.getBlock(),
-            blkMovingInfo.getSources()[i], target,
-            blkMovingInfo.getSourceStorageTypes()[i],
-            blkMovingInfo.getTargetStorageTypes()[i]);
-        Future<BlockMovementResult> moveCallable = moverCompletionService
-            .submit(blockMovingTask);
-        movementTracker.addBlock(trackID, moveCallable);
-      }
+      StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
+      StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
+      assert sourceStorageType != targetStorageType
+          : "Source and Target storage type shouldn't be same!";
+      BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
+          blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+          blkMovingInfo.getTarget(), sourceStorageType, targetStorageType);
+      Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService
+          .submit(blockMovingTask);
+      movementTracker.addBlock(blkMovingInfo.getBlock(),
+          moveCallable);
     }
   }
 
@@ -217,8 +196,7 @@ public class StoragePolicySatisfyWorker {
    * This class encapsulates the process of moving the block replica to the
    * given target and wait for the response.
    */
-  private class BlockMovingTask implements Callable<BlockMovementResult> {
-    private final long trackID;
+  private class BlockMovingTask implements Callable<BlockMovementAttemptFinished> {
     private final String blockPoolID;
     private final Block block;
     private final DatanodeInfo source;
@@ -226,10 +204,9 @@ public class StoragePolicySatisfyWorker {
     private final StorageType srcStorageType;
     private final StorageType targetStorageType;
 
-    BlockMovingTask(long trackID, String blockPoolID, Block block,
+    BlockMovingTask(String blockPoolID, Block block,
         DatanodeInfo source, DatanodeInfo target,
         StorageType srcStorageType, StorageType targetStorageType) {
-      this.trackID = trackID;
       this.blockPoolID = blockPoolID;
       this.block = block;
       this.source = source;
@@ -239,23 +216,26 @@ public class StoragePolicySatisfyWorker {
     }
 
     @Override
-    public BlockMovementResult call() {
+    public BlockMovementAttemptFinished call() {
       BlockMovementStatus status = moveBlock();
-      return new BlockMovementResult(trackID, block.getBlockId(), target,
-          status);
+      return new BlockMovementAttemptFinished(block, source, target, status);
     }
 
     private BlockMovementStatus moveBlock() {
       LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
-              + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
+          + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
           block, source, target, srcStorageType, targetStorageType);
       Socket sock = null;
       DataOutputStream out = null;
       DataInputStream in = null;
       try {
+        datanode.incrementXmitsInProgress();
+
         ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
         DNConf dnConf = datanode.getDnConf();
-        String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
+
+        String dnAddr = datanode.getDatanodeId()
+            .getXferAddr(dnConf.getConnectToDnViaHostname());
         sock = datanode.newSocket();
         NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
             dnConf.getSocketTimeout());
@@ -297,9 +277,10 @@ public class StoragePolicySatisfyWorker {
         LOG.warn(
             "Failed to move block:{} from src:{} to destin:{} to satisfy "
                 + "storageType:{}",
-            block, source, target, targetStorageType, e);
+                block, source, target, targetStorageType, e);
         return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
       } finally {
+        datanode.decrementXmitsInProgress();
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
         IOUtils.closeSocket(sock);
@@ -357,29 +338,25 @@ public class StoragePolicySatisfyWorker {
   }
 
   /**
-   * This class represents result from a block movement task. This will have the
+   * This class represents status from a block movement task. This will have the
    * information of the task which was successful or failed due to errors.
    */
-  static class BlockMovementResult {
-    private final long trackId;
-    private final long blockId;
+  static class BlockMovementAttemptFinished {
+    private final Block block;
+    private final DatanodeInfo src;
     private final DatanodeInfo target;
     private final BlockMovementStatus status;
 
-    BlockMovementResult(long trackId, long blockId,
+    BlockMovementAttemptFinished(Block block, DatanodeInfo src,
         DatanodeInfo target, BlockMovementStatus status) {
-      this.trackId = trackId;
-      this.blockId = blockId;
+      this.block = block;
+      this.src = src;
       this.target = target;
       this.status = status;
     }
 
-    long getTrackId() {
-      return trackId;
-    }
-
-    long getBlockId() {
-      return blockId;
+    Block getBlock() {
+      return block;
     }
 
     BlockMovementStatus getStatus() {
@@ -388,99 +365,79 @@ public class StoragePolicySatisfyWorker {
 
     @Override
     public String toString() {
-      return new StringBuilder().append("Block movement result(\n  ")
-          .append("track id: ").append(trackId).append(" block id: ")
-          .append(blockId).append(" target node: ").append(target)
+      return new StringBuilder().append("Block movement attempt finished(\n  ")
+          .append(" block : ")
+          .append(block).append(" src node: ").append(src)
+          .append(" target node: ").append(target)
           .append(" movement status: ").append(status).append(")").toString();
     }
   }
 
   /**
    * Blocks movements status handler, which is used to collect details of the
-   * completed or inprogress list of block movements and this status(success or
-   * failure or inprogress) will be send to the namenode via heartbeat.
+   * completed block movements and it will send these attempted finished(with
+   * success or failure) blocks to the namenode via heartbeat.
    */
-  class BlocksMovementsStatusHandler {
-    private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
+  public static class BlocksMovementsStatusHandler {
+    private final List<Block> blockIdVsMovementStatus =
         new ArrayList<>();
 
     /**
-     * Collect all the block movement results. Later this will be send to
-     * namenode via heart beat.
+     * Collect all the storage movement attempt finished blocks. Later this will
+     * be send to namenode via heart beat.
      *
-     * @param results
-     *          result of all the block movements per trackId
+     * @param moveAttemptFinishedBlks
+     *          set of storage movement attempt finished blocks
      */
-    void handle(List<BlockMovementResult> resultsPerTrackId) {
-      BlocksStorageMovementResult.Status status =
-          BlocksStorageMovementResult.Status.SUCCESS;
-      long trackId = -1;
-      for (BlockMovementResult blockMovementResult : resultsPerTrackId) {
-        trackId = blockMovementResult.getTrackId();
-        if (blockMovementResult.status ==
-            BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) {
-          status = BlocksStorageMovementResult.Status.FAILURE;
-          // If any of the block movement is failed, then mark as failure so
-          // that namenode can take a decision to retry the blocks associated to
-          // the given trackId.
-          break;
-        }
-      }
+    void handle(List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
+      List<Block> blocks = new ArrayList<>();
 
-      // Adding to the tracking results list. Later this will be send to
+      for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
+        blocks.add(item.getBlock());
+      }
+      // Adding to the tracking report list. Later this will be send to
       // namenode via datanode heartbeat.
-      synchronized (trackIdVsMovementStatus) {
-        trackIdVsMovementStatus.add(
-            new BlocksStorageMovementResult(trackId, status));
+      synchronized (blockIdVsMovementStatus) {
+        blockIdVsMovementStatus.addAll(blocks);
       }
     }
 
     /**
-     * @return unmodifiable list of blocks storage movement results.
+     * @return unmodifiable list of storage movement attempt finished blocks.
      */
-    List<BlocksStorageMovementResult> getBlksMovementResults() {
-      List<BlocksStorageMovementResult> movementResults = new ArrayList<>();
-      // 1. Adding all the completed trackids.
-      synchronized (trackIdVsMovementStatus) {
-        if (trackIdVsMovementStatus.size() > 0) {
-          movementResults = Collections
-              .unmodifiableList(trackIdVsMovementStatus);
+    List<Block> getMoveAttemptFinishedBlocks() {
+      List<Block> moveAttemptFinishedBlks = new ArrayList<>();
+      // 1. Adding all the completed block ids.
+      synchronized (blockIdVsMovementStatus) {
+        if (blockIdVsMovementStatus.size() > 0) {
+          moveAttemptFinishedBlks = Collections
+              .unmodifiableList(blockIdVsMovementStatus);
         }
       }
-      // 2. Adding the in progress track ids after those which are completed.
-      Set<Long> inProgressTrackIds = getInProgressTrackIds();
-      for (Long trackId : inProgressTrackIds) {
-        movementResults.add(new BlocksStorageMovementResult(trackId,
-            BlocksStorageMovementResult.Status.IN_PROGRESS));
-      }
-      return movementResults;
+      return moveAttemptFinishedBlks;
     }
 
     /**
-     * Remove the blocks storage movement results.
+     * Remove the storage movement attempt finished blocks from the tracking
+     * list.
      *
-     * @param results
-     *          set of blocks storage movement results
+     * @param moveAttemptFinishedBlks
+     *          set of storage movement attempt finished blocks
      */
-    void remove(BlocksStorageMovementResult[] results) {
-      if (results != null) {
-        synchronized (trackIdVsMovementStatus) {
-          for (BlocksStorageMovementResult blocksMovementResult : results) {
-            trackIdVsMovementStatus.remove(blocksMovementResult);
-          }
-        }
+    void remove(List<Block> moveAttemptFinishedBlks) {
+      if (moveAttemptFinishedBlks != null) {
+        blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
       }
     }
 
     /**
-     * Clear the trackID vs movement status tracking map.
+     * Clear the blockID vs movement status tracking map.
      */
     void removeAll() {
-      synchronized (trackIdVsMovementStatus) {
-        trackIdVsMovementStatus.clear();
+      synchronized (blockIdVsMovementStatus) {
+        blockIdVsMovementStatus.clear();
       }
     }
-
   }
 
   @VisibleForTesting
@@ -498,23 +455,4 @@ public class StoragePolicySatisfyWorker {
     movementTracker.removeAll();
     handler.removeAll();
   }
-
-  /**
-   * Gets list of trackids which are inprogress. Will do collection periodically
-   * on 'dfs.datanode.storage.policy.satisfier.worker.inprogress.recheck.time.
-   * millis' interval.
-   *
-   * @return collection of trackids which are inprogress
-   */
-  private Set<Long> getInProgressTrackIds() {
-    Set<Long> trackIds = new HashSet<>();
-    long now = monotonicNow();
-    if (nextInprogressRecheckTime >= now) {
-      trackIds = movementTracker.getInProgressTrackIds();
-
-      // schedule next re-check interval
-      nextInprogressRecheckTime = now + inprogressTrackIdsCheckInterval;
-    }
-    return trackIds;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 549819f..cc5b63a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -22,15 +22,12 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
 import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,14 +35,12 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * A monitor class for checking whether block storage movements finished or not.
- * If block storage movement results from datanode indicates about the movement
- * success, then it will just remove the entries from tracking. If it reports
- * failure, then it will add back to needed block storage movements list. If it
- * reports in_progress, that means the blocks movement is in progress and the
- * coordinator is still tracking the movement. If no DN reports about movement
- * for longer time, then such items will be retries automatically after timeout.
- * The default timeout would be 30mins.
+ * A monitor class for checking whether block storage movements attempt
+ * completed or not. If this receives block storage movement attempt
+ * status(either success or failure) from DN then it will just remove the
+ * entries from tracking. If there is no DN reports about movement attempt
+ * finished for a longer time period, then such items will retries automatically
+ * after timeout. The default timeout would be 5 minutes.
  */
 public class BlockStorageMovementAttemptedItems {
   private static final Logger LOG =
@@ -55,37 +50,34 @@ public class BlockStorageMovementAttemptedItems {
    * A map holds the items which are already taken for blocks movements
    * processing and sent to DNs.
    */
-  private final Map<Long, AttemptedItemInfo> storageMovementAttemptedItems;
-  private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
+  private final List<AttemptedItemInfo> storageMovementAttemptedItems;
+  private final List<Block> movementFinishedBlocks;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
-  private final StoragePolicySatisfier sps;
   //
-  // It might take anywhere between 20 to 60 minutes before
+  // It might take anywhere between 5 to 10 minutes before
   // a request is timed out.
   //
-  private long selfRetryTimeout = 20 * 60 * 1000;
+  private long selfRetryTimeout = 5 * 60 * 1000;
 
   //
-  // It might take anywhere between 5 to 10 minutes before
+  // It might take anywhere between 1 to 2 minutes before
   // a request is timed out.
   //
-  private long minCheckTimeout = 5 * 60 * 1000; // minimum value
+  private long minCheckTimeout = 1 * 60 * 1000; // minimum value
   private BlockStorageMovementNeeded blockStorageMovementNeeded;
 
   public BlockStorageMovementAttemptedItems(long recheckTimeout,
       long selfRetryTimeout,
-      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
-      StoragePolicySatisfier sps) {
+      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
     if (recheckTimeout > 0) {
       this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
     }
 
     this.selfRetryTimeout = selfRetryTimeout;
     this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
-    storageMovementAttemptedItems = new HashMap<>();
-    storageMovementAttemptedResults = new ArrayList<>();
-    this.sps = sps;
+    storageMovementAttemptedItems = new ArrayList<>();
+    movementFinishedBlocks = new ArrayList<>();
   }
 
   /**
@@ -94,33 +86,26 @@ public class BlockStorageMovementAttemptedItems {
    *
    * @param itemInfo
    *          - tracking info
-   * @param allBlockLocsAttemptedToSatisfy
-   *          - failed to find matching target nodes to satisfy storage type
-   *          for all the block locations of the given blockCollectionID
    */
-  public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
+  public void add(AttemptedItemInfo itemInfo) {
     synchronized (storageMovementAttemptedItems) {
-      AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
-          itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(),
-          allBlockLocsAttemptedToSatisfy);
-      storageMovementAttemptedItems.put(itemInfo.getTrackId(),
-          attemptedItemInfo);
+      storageMovementAttemptedItems.add(itemInfo);
     }
   }
 
   /**
-   * Add the trackIDBlocksStorageMovementResults to
-   * storageMovementAttemptedResults.
+   * Add the storage movement attempt finished blocks to
+   * storageMovementFinishedBlocks.
    *
-   * @param blksMovementResults
+   * @param moveAttemptFinishedBlks
+   *          storage movement attempt finished blocks
    */
-  public void addResults(BlocksStorageMovementResult[] blksMovementResults) {
-    if (blksMovementResults.length == 0) {
+  public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
+    if (moveAttemptFinishedBlks.length == 0) {
       return;
     }
-    synchronized (storageMovementAttemptedResults) {
-      storageMovementAttemptedResults
-          .addAll(Arrays.asList(blksMovementResults));
+    synchronized (movementFinishedBlocks) {
+      movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
     }
   }
 
@@ -129,8 +114,8 @@ public class BlockStorageMovementAttemptedItems {
    */
   public synchronized void start() {
     monitorRunning = true;
-    timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor());
-    timerThread.setName("BlocksStorageMovementAttemptResultMonitor");
+    timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor());
+    timerThread.setName("BlocksStorageMovementAttemptMonitor");
     timerThread.start();
   }
 
@@ -163,82 +148,22 @@ public class BlockStorageMovementAttemptedItems {
   }
 
   /**
-   * This class contains information of an attempted trackID. Information such
-   * as, (a)last attempted or reported time stamp, (b)whether all the blocks in
-   * the trackID were attempted and blocks movement has been scheduled to
-   * satisfy storage policy. This is used by
-   * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
-   */
-  private final static class AttemptedItemInfo extends ItemInfo {
-    private long lastAttemptedOrReportedTime;
-    private final boolean allBlockLocsAttemptedToSatisfy;
-
-    /**
-     * AttemptedItemInfo constructor.
-     *
-     * @param rootId
-     *          rootId for trackId
-     * @param trackId
-     *          trackId for file.
-     * @param lastAttemptedOrReportedTime
-     *          last attempted or reported time
-     * @param allBlockLocsAttemptedToSatisfy
-     *          whether all the blocks in the trackID were attempted and blocks
-     *          movement has been scheduled to satisfy storage policy
-     */
-    private AttemptedItemInfo(long rootId, long trackId,
-        long lastAttemptedOrReportedTime,
-        boolean allBlockLocsAttemptedToSatisfy) {
-      super(rootId, trackId);
-      this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
-      this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
-    }
-
-    /**
-     * @return last attempted or reported time stamp.
-     */
-    private long getLastAttemptedOrReportedTime() {
-      return lastAttemptedOrReportedTime;
-    }
-
-    /**
-     * @return true/false. True value represents that, all the block locations
-     *         under the trackID has found matching target nodes to satisfy
-     *         storage policy. False value represents that, trackID needed
-     *         retries to satisfy the storage policy for some of the block
-     *         locations.
-     */
-    private boolean isAllBlockLocsAttemptedToSatisfy() {
-      return allBlockLocsAttemptedToSatisfy;
-    }
-
-    /**
-     * Update lastAttemptedOrReportedTime, so that the expiration time will be
-     * postponed to future.
-     */
-    private void touchLastReportedTimeStamp() {
-      this.lastAttemptedOrReportedTime = monotonicNow();
-    }
-
-  }
-
-  /**
-   * A monitor class for checking block storage movement result and long waiting
-   * items periodically.
+   * A monitor class for checking block storage movement attempt status and long
+   * waiting items periodically.
    */
-  private class BlocksStorageMovementAttemptResultMonitor implements Runnable {
+  private class BlocksStorageMovementAttemptMonitor implements Runnable {
     @Override
     public void run() {
       while (monitorRunning) {
         try {
-          blockStorageMovementResultCheck();
+          blockStorageMovementReportedItemsCheck();
           blocksStorageMovementUnReportedItemsCheck();
           Thread.sleep(minCheckTimeout);
         } catch (InterruptedException ie) {
-          LOG.info("BlocksStorageMovementAttemptResultMonitor thread "
+          LOG.info("BlocksStorageMovementAttemptMonitor thread "
               + "is interrupted.", ie);
         } catch (IOException ie) {
-          LOG.warn("BlocksStorageMovementAttemptResultMonitor thread "
+          LOG.warn("BlocksStorageMovementAttemptMonitor thread "
               + "received exception and exiting.", ie);
         }
       }
@@ -248,29 +173,21 @@ public class BlockStorageMovementAttemptedItems {
   @VisibleForTesting
   void blocksStorageMovementUnReportedItemsCheck() {
     synchronized (storageMovementAttemptedItems) {
-      Iterator<Entry<Long, AttemptedItemInfo>> iter =
-          storageMovementAttemptedItems.entrySet().iterator();
+      Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
+          .iterator();
       long now = monotonicNow();
       while (iter.hasNext()) {
-        Entry<Long, AttemptedItemInfo> entry = iter.next();
-        AttemptedItemInfo itemInfo = entry.getValue();
+        AttemptedItemInfo itemInfo = iter.next();
         if (now > itemInfo.getLastAttemptedOrReportedTime()
             + selfRetryTimeout) {
-          Long blockCollectionID = entry.getKey();
-          synchronized (storageMovementAttemptedResults) {
-            if (!isExistInResult(blockCollectionID)) {
-              ItemInfo candidate = new ItemInfo(
-                  itemInfo.getStartId(), blockCollectionID);
-              blockStorageMovementNeeded.add(candidate);
-              iter.remove();
-              LOG.info("TrackID: {} becomes timed out and moved to needed "
-                  + "retries queue for next iteration.", blockCollectionID);
-            } else {
-              LOG.info("Blocks storage movement results for the"
-                  + " tracking id : " + blockCollectionID
-                  + " is reported from one of the co-ordinating datanode."
-                  + " So, the result will be processed soon.");
-            }
+          Long blockCollectionID = itemInfo.getTrackId();
+          synchronized (movementFinishedBlocks) {
+            ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
+                blockCollectionID);
+            blockStorageMovementNeeded.add(candidate);
+            iter.remove();
+            LOG.info("TrackID: {} becomes timed out and moved to needed "
+                + "retries queue for next iteration.", blockCollectionID);
           }
         }
       }
@@ -278,118 +195,38 @@ public class BlockStorageMovementAttemptedItems {
     }
   }
 
-  private boolean isExistInResult(Long blockCollectionID) {
-    Iterator<BlocksStorageMovementResult> iter = storageMovementAttemptedResults
-        .iterator();
-    while (iter.hasNext()) {
-      BlocksStorageMovementResult storageMovementAttemptedResult = iter.next();
-      if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   @VisibleForTesting
-  void blockStorageMovementResultCheck() throws IOException {
-    synchronized (storageMovementAttemptedResults) {
-      Iterator<BlocksStorageMovementResult> resultsIter =
-          storageMovementAttemptedResults.iterator();
-      while (resultsIter.hasNext()) {
-        boolean isInprogress = false;
-        // TrackID need to be retried in the following cases:
-        // 1) All or few scheduled block(s) movement has been failed.
-        // 2) All the scheduled block(s) movement has been succeeded but there
-        // are unscheduled block(s) movement in this trackID. Say, some of
-        // the blocks in the trackID couldn't finding any matching target node
-        // for scheduling block movement in previous SPS iteration.
-        BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter
-            .next();
+  void blockStorageMovementReportedItemsCheck() throws IOException {
+    synchronized (movementFinishedBlocks) {
+      Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
+      while (finishedBlksIter.hasNext()) {
+        Block blk = finishedBlksIter.next();
         synchronized (storageMovementAttemptedItems) {
-          Status status = storageMovementAttemptedResult.getStatus();
-          long trackId = storageMovementAttemptedResult.getTrackId();
-          AttemptedItemInfo attemptedItemInfo = storageMovementAttemptedItems
-              .get(trackId);
-          // itemInfo is null means no root for trackId, using trackId only as
-          // root and handling it in
-          // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
-          // the xAttr
-          ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
-              ? attemptedItemInfo.getStartId() : trackId, trackId);
-          switch (status) {
-          case FAILURE:
-            if (attemptedItemInfo != null) {
-              blockStorageMovementNeeded.add(itemInfo);
-              LOG.warn("Blocks storage movement results for the tracking id:"
-                  + "{} is reported from co-ordinating datanode, but result"
-                  + " status is FAILURE. So, added for retry", trackId);
-            } else {
-              LOG.info("Blocks storage movement is FAILURE for the track"
-                  + " id {}. But the trackID doesn't exists in"
-                  + " storageMovementAttemptedItems list.", trackId);
-              blockStorageMovementNeeded
-                  .removeItemTrackInfo(itemInfo);
-            }
-            break;
-          case SUCCESS:
-            // ItemInfo could be null. One case is, before the blocks movements
-            // result arrives the attempted trackID became timed out and then
-            // removed the trackID from the storageMovementAttemptedItems list.
-            // TODO: Need to ensure that trackID is added to the
-            // 'blockStorageMovementNeeded' queue for retries to handle the
-            // following condition. If all the block locations under the trackID
-            // are attempted and failed to find matching target nodes to satisfy
-            // storage policy in previous SPS iteration.
-            String msg = "Blocks storage movement is SUCCESS for the track id: "
-                + trackId + " reported from co-ordinating datanode.";
-            if (attemptedItemInfo != null) {
-              if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
-                blockStorageMovementNeeded
-                    .add(new ItemInfo(attemptedItemInfo.getStartId(), trackId));
-                LOG.warn("{} But adding trackID back to retry queue as some of"
-                    + " the blocks couldn't find matching target nodes in"
-                    + " previous SPS iteration.", msg);
-              } else {
-                LOG.info(msg);
-                blockStorageMovementNeeded
-                    .removeItemTrackInfo(itemInfo);
-              }
-            } else {
-              LOG.info("{} But the trackID doesn't exists in "
-                  + "storageMovementAttemptedItems list", msg);
+          Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
+              .iterator();
+          while (iterator.hasNext()) {
+            AttemptedItemInfo attemptedItemInfo = iterator.next();
+            attemptedItemInfo.getBlocks().remove(blk);
+            if (attemptedItemInfo.getBlocks().isEmpty()) {
+              // TODO: try add this at front of the Queue, so that this element
+              // gets the chance first and can be cleaned from queue quickly as
+              // all movements already done.
               blockStorageMovementNeeded
-              .removeItemTrackInfo(itemInfo);
-            }
-            break;
-          case IN_PROGRESS:
-            isInprogress = true;
-            attemptedItemInfo = storageMovementAttemptedItems
-                .get(storageMovementAttemptedResult.getTrackId());
-            if(attemptedItemInfo != null){
-              // update the attempted expiration time to next cycle.
-              attemptedItemInfo.touchLastReportedTimeStamp();
+                  .add(new ItemInfo(attemptedItemInfo.getStartId(),
+                      attemptedItemInfo.getTrackId()));
+              iterator.remove();
             }
-            break;
-          default:
-            LOG.error("Unknown status: {}", status);
-            break;
-          }
-          // Remove trackID from the attempted list if the attempt has been
-          // completed(success or failure), if any.
-          if (!isInprogress) {
-            storageMovementAttemptedItems
-                .remove(storageMovementAttemptedResult.getTrackId());
           }
         }
-        // Remove trackID from results as processed above.
-        resultsIter.remove();
+        // Remove attempted blocks from movementFinishedBlocks list.
+        finishedBlksIter.remove();
       }
     }
   }
 
   @VisibleForTesting
-  public int resultsCount() {
-    return storageMovementAttemptedResults.size();
+  public int getMovementFinishedBlocksCount() {
+    return movementFinishedBlocks.size();
   }
 
   @VisibleForTesting
@@ -398,7 +235,7 @@ public class BlockStorageMovementAttemptedItems {
   }
 
   public void clearQueues() {
-    storageMovementAttemptedResults.clear();
+    movementFinishedBlocks.clear();
     storageMovementAttemptedItems.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java
deleted file mode 100644
index a790c13..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import java.util.List;
-
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-
-/**
- * This class represents a batch of blocks under one trackId which needs to move
- * its storage locations to satisfy the storage policy.
- */
-public class BlockStorageMovementInfosBatch {
-  private long trackID;
-  private List<BlockMovingInfo> blockMovingInfos;
-
-  /**
-   * Constructor to create the block storage movement infos batch.
-   *
-   * @param trackID
-   *          - unique identifier which will be used for tracking the given set
-   *          of blocks movement.
-   * @param blockMovingInfos
-   *          - list of block to storage infos.
-   */
-  public BlockStorageMovementInfosBatch(long trackID,
-      List<BlockMovingInfo> blockMovingInfos) {
-    this.trackID = trackID;
-    this.blockMovingInfos = blockMovingInfos;
-  }
-
-  public long getTrackID() {
-    return trackID;
-  }
-
-  public List<BlockMovingInfo> getBlockMovingInfo() {
-    return blockMovingInfos;
-  }
-
-  @Override
-  public String toString() {
-    return new StringBuilder().append("BlockStorageMovementInfosBatch(\n  ")
-        .append("TrackID: ").append(trackID).append("  BlockMovingInfos: ")
-        .append(blockMovingInfos).append(")").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb4a3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index dfb5e27..5c2f99f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -266,7 +266,7 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -3880,7 +3880,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
       @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMovementResult[] blksMovementResults) throws IOException {
+      BlocksStorageMoveAttemptFinished blksMovementsFinished)
+          throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3901,11 +3902,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         if (!sps.isRunning()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
-                "Storage policy satisfier is not running. So, ignoring block "
-                    + "storage movement results sent by co-ordinator datanode");
+                "Storage policy satisfier is not running. So, ignoring storage"
+                    + "  movement attempt finished block info sent by DN");
           }
         } else {
-          sps.handleBlocksStorageMovementResults(blksMovementResults);
+          sps.handleStorageMovementAttemptFinishedBlks(blksMovementsFinished);
         }
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org