You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/07/11 16:25:09 UTC

[23/50] [abbrv] hadoop git commit: HDFS-10954. [SPS]: Provide mechanism to send blocks movement result back to NN from coordinator DN. Contributed by Rakesh R

HDFS-10954. [SPS]: Provide mechanism to send blocks movement result back to NN from coordinator DN. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e126a7be
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e126a7be
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e126a7be

Branch: refs/heads/HDFS-10285
Commit: e126a7be8d2a3f9afbc1495adec682e738b060cb
Parents: dd1740d
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Thu Nov 3 09:39:14 2016 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Tue Jul 11 18:10:28 2017 +0530

----------------------------------------------------------------------
 .../DatanodeProtocolClientSideTranslatorPB.java |  9 ++-
 .../DatanodeProtocolServerSideTranslatorPB.java |  4 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 52 ++++++++++++++
 .../server/blockmanagement/BlockManager.java    |  4 ++
 .../hdfs/server/datanode/BPServiceActor.java    | 25 ++++++-
 .../datanode/StoragePolicySatisfyWorker.java    | 75 ++++++++++++++++----
 .../hdfs/server/namenode/FSNamesystem.java      | 10 ++-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  7 +-
 .../server/namenode/StoragePolicySatisfier.java | 23 ++++++
 .../protocol/BlocksStorageMovementResult.java   | 64 +++++++++++++++++
 .../hdfs/server/protocol/DatanodeProtocol.java  |  5 +-
 .../src/main/proto/DatanodeProtocol.proto       | 14 ++++
 .../TestNameNodePrunesMissingStorages.java      |  3 +-
 .../datanode/InternalDataNodeTestUtils.java     |  4 +-
 .../server/datanode/TestBPOfferService.java     |  8 ++-
 .../hdfs/server/datanode/TestBlockRecovery.java |  4 +-
 .../server/datanode/TestDataNodeLifeline.java   |  7 +-
 .../TestDatanodeProtocolRetryPolicy.java        |  4 +-
 .../server/datanode/TestFsDatasetCache.java     |  4 +-
 .../TestStoragePolicySatisfyWorker.java         | 13 ++--
 .../hdfs/server/datanode/TestStorageReport.java |  4 +-
 .../server/namenode/NNThroughputBenchmark.java  |  9 +--
 .../hdfs/server/namenode/NameNodeAdapter.java   |  4 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |  5 +-
 .../namenode/TestStoragePolicySatisfier.java    | 50 +++++++++++++
 25 files changed, 368 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 9cc4516..9dd87d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -138,7 +139,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks) throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BlocksStorageMovementResult[] blksMovementResults) throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -161,6 +163,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
     if (slowDisks.haveSlowDisks()) {
       builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
     }
+
+    // Adding blocks movement results to the heart beat request.
+    builder.addAllBlksMovementResults(
+        PBHelper.convertBlksMovResults(blksMovementResults));
+
     HeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 5cba284..40458ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -122,7 +122,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements
           request.getXceiverCount(), request.getFailedVolumes(),
           volumeFailureSummary, request.getRequestFullBlockReportLease(),
           PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
-          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
+          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
+          PBHelper.convertBlksMovResults(
+              request.getBlksMovementResultsList()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 83f3454..156c9c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailur
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -98,6 +99,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -952,6 +955,55 @@ public class PBHelper {
     return SlowDiskReports.create(slowDisksMap);
   }
 
+  public static BlocksStorageMovementResult[] convertBlksMovResults(
+      List<BlocksStorageMovementResultProto> protos) {
+    BlocksStorageMovementResult[] results =
+        new BlocksStorageMovementResult[protos.size()];
+    for (int i = 0; i < protos.size(); i++) {
+      BlocksStorageMovementResultProto resultProto = protos.get(i);
+      BlocksStorageMovementResult.Status status;
+      switch (resultProto.getStatus()) {
+      case SUCCESS:
+        status = Status.SUCCESS;
+        break;
+      case FAILURE:
+        status = Status.FAILURE;
+        break;
+      default:
+        throw new AssertionError("Unknown status: " + resultProto.getStatus());
+      }
+      results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(),
+          status);
+    }
+    return results;
+  }
+
+  public static List<BlocksStorageMovementResultProto> convertBlksMovResults(
+      BlocksStorageMovementResult[] blocksMovementResults) {
+    List<BlocksStorageMovementResultProto> blocksMovementResultsProto =
+        new ArrayList<>();
+    BlocksStorageMovementResultProto.Builder builder =
+        BlocksStorageMovementResultProto.newBuilder();
+    for (int i = 0; i < blocksMovementResults.length; i++) {
+      BlocksStorageMovementResult report = blocksMovementResults[i];
+      builder.setTrackID(report.getTrackId());
+      BlocksStorageMovementResultProto.Status status;
+      switch (report.getStatus()) {
+      case SUCCESS:
+        status = BlocksStorageMovementResultProto.Status.SUCCESS;
+        break;
+      case FAILURE:
+        status = BlocksStorageMovementResultProto.Status.FAILURE;
+        break;
+      default:
+        throw new AssertionError("Unknown status: " + report.getStatus());
+      }
+      builder.setStatus(status);
+      blocksMovementResultsProto.add(builder.build());
+    }
+    return blocksMovementResultsProto;
+  }
+
   public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ed416b2..3fee9b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -4833,4 +4833,8 @@ public class BlockManager implements BlockStatsMXBean {
   public void satisfyStoragePolicy(long id) {
     storageMovementNeeded.add(id);
   }
+
+  public StoragePolicySatisfier getStoragePolicySatisfier() {
+    return sps;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index a94d2df..0f93fb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -511,6 +512,10 @@ class BPServiceActor implements Runnable {
         outliersReportDue && dn.getDiskMetrics() != null ?
             SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
             SlowDiskReports.EMPTY_REPORT;
+
+    BlocksStorageMovementResult[] blksMovementResults =
+        getBlocksMovementResults();
+
     HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
         dn.getFSDataset().getCacheCapacity(),
@@ -521,15 +526,33 @@ class BPServiceActor implements Runnable {
         volumeFailureSummary,
         requestBlockReportLease,
         slowPeers,
-        slowDisks);
+        slowDisks,
+        blksMovementResults);
 
     if (outliersReportDue) {
       // If the report was due and successfully sent, schedule the next one.
       scheduler.scheduleNextOutlierReport();
     }
+
+    // Remove the blocks movement results after successfully transferring
+    // to namenode.
+    dn.getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+        .remove(blksMovementResults);
+
     return response;
   }
 
+  private BlocksStorageMovementResult[] getBlocksMovementResults() {
+    List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
+        .getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+        .getBlksMovementResults();
+    BlocksStorageMovementResult[] blksMovementResult =
+        new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];
+    trackIdVsMovementStatus.toArray(blksMovementResult);
+
+    return blksMovementResult;
+  }
+
   @VisibleForTesting
   void sendLifelineForTests() throws IOException {
     lifelineSender.sendLifeline();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 604fb4a..1bd851e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -218,7 +220,8 @@ public class StoragePolicySatisfyWorker {
         OutputStream unbufOut = sock.getOutputStream();
         InputStream unbufIn = sock.getInputStream();
         Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
-            extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+            extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+            new StorageType[]{targetStorageType}, new String[0]);
 
         DataEncryptionKeyFactory keyFactory = datanode
             .getDataEncryptionKeyFactoryForBlock(extendedBlock);
@@ -257,7 +260,7 @@ public class StoragePolicySatisfyWorker {
         Token<BlockTokenIdentifier> accessToken, DatanodeInfo srcDn,
         StorageType destinStorageType) throws IOException {
       new Sender(out).replaceBlock(eb, destinStorageType, accessToken,
-          srcDn.getDatanodeUuid(), srcDn);
+          srcDn.getDatanodeUuid(), srcDn, null);
     }
 
     /** Receive a reportedBlock copy response from the input stream. */
@@ -276,7 +279,7 @@ public class StoragePolicySatisfyWorker {
   /**
    * Block movement status code.
    */
-  enum BlockMovementStatus {
+  public static enum BlockMovementStatus {
     /** Success. */
     DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
     /**
@@ -343,26 +346,72 @@ public class StoragePolicySatisfyWorker {
 
   /**
    * Blocks movements completion handler, which is used to collect details of
-   * the completed list of block movements and notify the namenode about the
-   * success or failures.
+   * the completed list of block movements and this status(success or failure)
+   * will be send to the namenode via heartbeat.
    */
   static class BlocksMovementsCompletionHandler {
-    private final List<BlockMovementResult> completedBlocks = new ArrayList<>();
+    private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
+        new ArrayList<>();
 
     /**
-     * Collect all the block movement results and notify namenode.
+     * Collect all the block movement results. Later this will be send to
+     * namenode via heart beat.
      *
      * @param results
      *          result of all the block movements per trackId
      */
-    void handle(List<BlockMovementResult> results) {
-      completedBlocks.addAll(results);
-      // TODO: notify namenode about the success/failures.
+    void handle(List<BlockMovementResult> resultsPerTrackId) {
+      BlocksStorageMovementResult.Status status =
+          BlocksStorageMovementResult.Status.SUCCESS;
+      long trackId = -1;
+      for (BlockMovementResult blockMovementResult : resultsPerTrackId) {
+        trackId = blockMovementResult.getTrackId();
+        if (blockMovementResult.status ==
+            BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) {
+          status = BlocksStorageMovementResult.Status.FAILURE;
+          // If any of the block movement is failed, then mark as failure so
+          // that namenode can take a decision to retry the blocks associated to
+          // the given trackId.
+          break;
+        }
+      }
+
+      // Adding to the tracking results list. Later this will be send to
+      // namenode via datanode heartbeat.
+      synchronized (trackIdVsMovementStatus) {
+        trackIdVsMovementStatus.add(
+            new BlocksStorageMovementResult(trackId, status));
+      }
+    }
+
+    /**
+     * @return unmodifiable list of blocks storage movement results.
+     */
+    List<BlocksStorageMovementResult> getBlksMovementResults() {
+      synchronized (trackIdVsMovementStatus) {
+        if (trackIdVsMovementStatus.size() <= 0) {
+          return new ArrayList<>();
+        }
+        List<BlocksStorageMovementResult> results = Collections
+            .unmodifiableList(trackIdVsMovementStatus);
+        return results;
+      }
     }
 
-    @VisibleForTesting
-    List<BlockMovementResult> getCompletedBlocks() {
-      return completedBlocks;
+    /**
+     * Remove the blocks storage movement results.
+     *
+     * @param results
+     *          set of blocks storage movement results
+     */
+    void remove(BlocksStorageMovementResult[] results) {
+      if (results != null) {
+        synchronized (trackIdVsMovementStatus) {
+          for (BlocksStorageMovementResult blocksMovementResult : results) {
+            trackIdVsMovementStatus.remove(blocksMovementResult);
+          }
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 9872cd7..69dbaf4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -258,6 +258,7 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -3721,7 +3722,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks) throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BlocksStorageMovementResult[] blksMovementResults) throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3735,6 +3737,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (requestFullBlockReportLease) {
         blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
       }
+
+      // TODO: Handle blocks movement results send by the coordinator datanode.
+      // This has to be revisited as part of HDFS-11029.
+      blockManager.getStoragePolicySatisfier()
+          .handleBlocksStorageMovementResults(blksMovementResults);
+
       //create ha status
       final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
           haContext.getState().getServiceState(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 39d93df..0028d65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -147,6 +147,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1450,13 +1451,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       int failedVolumes, VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks) throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BlocksStorageMovementResult[] blkMovementStatus) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
         failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
-        slowPeers, slowDisks);
+        slowPeers, slowDisks,
+        blkMovementStatus);
   }
 
   @Override // DatanodeProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index b5aed37..fbe686a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -39,11 +39,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Setting storagePolicy on a file after the file write will only update the new
  * storage policy type in Namespace, but physical block storage movement will
@@ -394,4 +397,24 @@ public class StoragePolicySatisfier implements Runnable {
       return typeNodeMap.get(type);
     }
   }
+
+  // TODO: Temporarily keeping the results for assertion. This has to be
+  // revisited as part of HDFS-11029.
+  @VisibleForTesting
+  List<BlocksStorageMovementResult> results = new ArrayList<>();
+
+  /**
+   * Receives the movement results of collection of blocks associated to a
+   * trackId.
+   *
+   * @param blksMovementResults
+   *          movement status of the set of blocks associated to a trackId.
+   */
+  void handleBlocksStorageMovementResults(
+      BlocksStorageMovementResult[] blksMovementResults) {
+    if (blksMovementResults.length <= 0) {
+      return;
+    }
+    results.addAll(Arrays.asList(blksMovementResults));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
new file mode 100644
index 0000000..1afba34
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * This class represents, movement status of a set of blocks associated to a
+ * track Id.
+ */
+public class BlocksStorageMovementResult {
+
+  private final long trackId;
+  private final Status status;
+
+  /**
+   * SUCCESS - If all the blocks associated to track id has moved successfully
+   * or maximum possible movements done.
+   *
+   * <p>
+   * FAILURE - If any of its(trackId) blocks movement failed and requires to
+   * retry these failed blocks movements. Example selected target node is no
+   * more running or no space. So, retrying by selecting new target node might
+   * work.
+   */
+  public static enum Status {
+    SUCCESS, FAILURE;
+  }
+
+  /**
+   * BlocksStorageMovementResult constructor.
+   *
+   * @param trackId
+   *          tracking identifier
+   * @param status
+   *          block movement status
+   */
+  public BlocksStorageMovementResult(long trackId, Status status) {
+    this.trackId = trackId;
+    this.status = status;
+  }
+
+  public long getTrackId() {
+    return trackId;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 283f367..858f59b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -111,6 +111,8 @@ public interface DatanodeProtocol {
    * @param slowPeers Details of peer DataNodes that were detected as being
    *                  slow to respond to packet writes. Empty report if no
    *                  slow peers were detected by the DataNode.
+   * @param blksMovementResults array of movement status of a set of blocks
+   *                            associated to a trackId.
    * @throws IOException on error
    */
   @Idempotent
@@ -124,7 +126,8 @@ public interface DatanodeProtocol {
                                        VolumeFailureSummary volumeFailureSummary,
                                        boolean requestFullBlockReportLease,
                                        @Nonnull SlowPeerReports slowPeers,
-                                       @Nonnull SlowDiskReports slowDisks)
+                                       @Nonnull SlowDiskReports slowDisks,
+                                       BlocksStorageMovementResult[] blksMovementResults)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 8e19809..77b0f86 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -177,6 +177,18 @@ message BlockStorageMovementProto {
 }
 
 /**
+ * Movement status of the set of blocks associated to a trackId.
+ */
+message BlocksStorageMovementResultProto {
+  enum Status {
+    SUCCESS = 1; // block movement succeeded
+    FAILURE = 2; // block movement failed and needs to retry
+  }
+  required uint64 trackID = 1;
+  required Status status = 2;
+}
+
+/**
  * registration - Information of the datanode registering with the namenode
  */
 message RegisterDatanodeRequestProto {
@@ -219,6 +231,7 @@ message VolumeFailureSummaryProto {
  * volumeFailureSummary - info about volume failures
  * slowPeers - info about peer DataNodes that are suspected to be slow.
  * slowDisks - info about DataNode disks that are suspected to be slow.
+ * blksMovementResults - status of the scheduled blocks movements
  */
 message HeartbeatRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
@@ -232,6 +245,7 @@ message HeartbeatRequestProto {
   optional bool requestFullBlockReportLease = 9 [ default = false ];
   repeated SlowPeerReportProto slowPeers = 10;
   repeated SlowDiskReportProto slowDisks = 11;
+  repeated BlocksStorageMovementResultProto blksMovementResults = 12;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index 948a8fb..2d58732 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -116,7 +117,7 @@ public class TestNameNodePrunesMissingStorages {
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
           0, null, true, SlowPeerReports.EMPTY_REPORT,
-          SlowDiskReports.EMPTY_REPORT);
+          SlowDiskReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index 876a854..33cf391 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -140,7 +141,8 @@ public class InternalDataNodeTestUtils {
             Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
-            Mockito.any(SlowDiskReports.class))).thenReturn(
+            Mockito.any(SlowDiskReports.class),
+            Mockito.any(BlocksStorageMovementResult[].class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index aa47eeb..f8c8557 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -122,6 +123,8 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
         .when(mockDn).getMetrics();
+    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
+        .getStoragePolicySatisfyWorker();
 
     // Set up a simulated dataset with our fake BP
     mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -156,7 +159,8 @@ public class TestBPOfferService {
           Mockito.any(VolumeFailureSummary.class),
           Mockito.anyBoolean(),
           Mockito.any(SlowPeerReports.class),
-          Mockito.any(SlowDiskReports.class));
+          Mockito.any(SlowDiskReports.class),
+          Mockito.any(BlocksStorageMovementResult[].class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;
@@ -346,6 +350,8 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
       when(mockDn).getMetrics();
+    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
+        .getStoragePolicySatisfyWorker();
     final AtomicInteger count = new AtomicInteger();
     Mockito.doAnswer(new Answer<Void>() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 311d5a6..02e8dc7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -221,7 +222,8 @@ public class TestBlockRecovery {
             Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
-            Mockito.any(SlowDiskReports.class)))
+            Mockito.any(SlowDiskReports.class),
+            Mockito.any(BlocksStorageMovementResult[].class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index 28427bc..b15b530 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -172,7 +173,8 @@ public class TestDataNodeLifeline {
             any(VolumeFailureSummary.class),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class));
+            any(SlowDiskReports.class),
+            any(BlocksStorageMovementResult[].class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -237,7 +239,8 @@ public class TestDataNodeLifeline {
             any(VolumeFailureSummary.class),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class));
+            any(SlowDiskReports.class),
+            any(BlocksStorageMovementResult[].class));
 
     // While waiting on the latch for the expected number of heartbeat messages,
     // poll DataNode tracking information.  We expect that the DataNode always

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index bb1d9ef..d7ac3f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -222,7 +223,8 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.any(VolumeFailureSummary.class),
            Mockito.anyBoolean(),
            Mockito.any(SlowPeerReports.class),
-           Mockito.any(SlowDiskReports.class));
+           Mockito.any(SlowDiskReports.class),
+           Mockito.any(BlocksStorageMovementResult[].class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 2dbd5b9..b9f21a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -208,7 +209,8 @@ public class TestFsDatasetCache {
           (StorageReport[]) any(), anyLong(), anyLong(),
           anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
           anyBoolean(), any(SlowPeerReports.class),
-          any(SlowDiskReports.class));
+          any(SlowDiskReports.class),
+          (BlocksStorageMovementResult[]) any());
     } finally {
       lock.writeLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index ea3eec3..1eb44e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -34,10 +34,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -191,12 +190,12 @@ public class TestStoragePolicySatisfyWorker {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        List<BlockMovementResult> completedBlocks = worker
-            .getBlocksMovementsCompletionHandler().getCompletedBlocks();
+        List<BlocksStorageMovementResult> completedBlocks = worker
+            .getBlocksMovementsCompletionHandler().getBlksMovementResults();
         int failedCount = 0;
-        for (BlockMovementResult blockMovementResult : completedBlocks) {
-          if (BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE ==
-              blockMovementResult.getStatus()) {
+        for (BlocksStorageMovementResult blkMovementResult : completedBlocks) {
+          if (blkMovementResult.getStatus() ==
+              BlocksStorageMovementResult.Status.FAILURE) {
             failedCount++;
           }
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index 5f62ddb..df120ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -110,7 +111,8 @@ public class TestStorageReport {
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
         Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
         Mockito.any(SlowPeerReports.class),
-        Mockito.any(SlowDiskReports.class));
+        Mockito.any(SlowDiskReports.class),
+        Mockito.any(BlocksStorageMovementResult[].class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 3a3c471..1e016f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -956,8 +957,8 @@ public class NNThroughputBenchmark implements Tool {
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
-          .getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+          new BlocksStorageMovementResult[0]).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1007,8 +1008,8 @@ public class NNThroughputBenchmark implements Tool {
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
-          .getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+          new BlocksStorageMovementResult[0]).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 242e8f5..ba29c82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -126,7 +127,8 @@ public class NameNodeAdapter {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
         dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
-        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
+        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+        new BlocksStorageMovementResult[0]);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 6df8fcf..c773fde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -135,8 +136,8 @@ public class TestDeadDatanode {
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
         dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
-            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
-            .getCommands();
+            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+            new BlocksStorageMovementResult[0]).getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e126a7be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 37664b5..cbfdfc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -146,6 +148,54 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  /**
+   * Tests to verify that the block storage movement results will be propagated
+   * to Namenode via datanode heartbeat.
+   */
+  @Test(timeout = 300000)
+  public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
+    try {
+      // Change policy to ONE_SSD
+      distributedFS.setStoragePolicy(new Path(file), "ONE_SSD");
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
+
+      // Making sure SDD based nodes added to cluster. Adding SSD based
+      // datanodes.
+      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      hdfsCluster.triggerHeartbeats();
+
+      // Wait till the block is moved to SSD areas
+      waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
+      waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
+
+      // TODO: Temporarily using the results from StoragePolicySatisfier class.
+      // This has to be revisited as part of HDFS-11029.
+      waitForBlocksMovementResult(1, 30000);
+    } finally {
+      hdfsCluster.shutdown();
+    }
+  }
+
+  private void waitForBlocksMovementResult(int expectedResultsCount,
+      int timeout) throws TimeoutException, InterruptedException {
+    BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
+    final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LOG.info("expectedResultsCount={} actualResultsCount={}",
+            expectedResultsCount, sps.results.size());
+        return expectedResultsCount == sps.results.size();
+      }
+    }, 100, timeout);
+  }
+
   private void writeContent(final DistributedFileSystem dfs,
       final String fileName) throws IOException {
     // write to DISK


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org