You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:10 UTC

[42/50] [abbrv] hadoop git commit: HDFS-10801. [SPS]: Protocol buffer changes for sending storage movement commands from NN to DN. Contributed by Rakesh R

HDFS-10801. [SPS]: Protocol buffer changes for sending storage movement commands from NN to DN. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a706199c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a706199c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a706199c

Branch: refs/heads/HDFS-10285
Commit: a706199c79a7ac87f26cd0a0d1a2fc71921660e7
Parents: 3bb1c04
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Tue Oct 11 11:44:06 2016 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Dec 15 14:24:16 2016 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 90 ++++++++++++++++++++
 .../blockmanagement/DatanodeDescriptor.java     | 15 ----
 .../server/blockmanagement/DatanodeManager.java | 13 ++-
 .../hdfs/server/datanode/BPOfferService.java    |  8 ++
 .../hadoop/hdfs/server/datanode/DataNode.java   |  7 ++
 .../datanode/StoragePolicySatisfyWorker.java    | 22 ++++-
 .../protocol/BlockStorageMovementCommand.java   | 71 ++++++++++++++-
 .../hdfs/server/protocol/DatanodeProtocol.java  |  1 +
 .../src/main/proto/DatanodeProtocol.proto       | 22 +++++
 .../namenode/TestStoragePolicySatisfier.java    | 86 +++++++++++--------
 10 files changed, 273 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 4f6a04e..62e86ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -88,6 +90,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.Block
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -456,6 +460,8 @@ public class PBHelper {
       return PBHelper.convert(proto.getBlkIdCmd());
     case BlockECReconstructionCommand:
       return PBHelper.convert(proto.getBlkECReconstructionCmd());
+    case BlockStorageMovementCommand:
+      return PBHelper.convert(proto.getBlkStorageMovementCmd());
     default:
       return null;
     }
@@ -590,6 +596,11 @@ public class PBHelper {
           .setBlkECReconstructionCmd(
               convert((BlockECReconstructionCommand) datanodeCommand));
       break;
+    case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
+      builder.setCmdType(DatanodeCommandProto.Type.BlockStorageMovementCommand)
+          .setBlkStorageMovementCmd(
+              convert((BlockStorageMovementCommand) datanodeCommand));
+      break;
     case DatanodeProtocol.DNA_UNKNOWN: //Not expected
     default:
       builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
@@ -962,4 +973,83 @@ public class PBHelper {
         DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION,
         blkECReconstructionInfos);
   }
+
+  private static BlockStorageMovementCommandProto convert(
+      BlockStorageMovementCommand blkStorageMovementCmd) {
+    BlockStorageMovementCommandProto.Builder builder =
+        BlockStorageMovementCommandProto.newBuilder();
+
+    builder.setTrackID(blkStorageMovementCmd.getTrackID());
+    builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId());
+    Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd
+        .getBlockMovingTasks();
+    for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+      builder.addBlockStorageMovement(
+          convertBlockMovingInfo(blkMovingInfo));
+    }
+    return builder.build();
+  }
+
+  private static BlockStorageMovementProto convertBlockMovingInfo(
+      BlockMovingInfo blkMovingInfo) {
+    BlockStorageMovementProto.Builder builder = BlockStorageMovementProto
+        .newBuilder();
+    builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock()));
+
+    DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources();
+    builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+
+    DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets();
+    builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+
+    StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes();
+    builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes));
+
+    StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes();
+    builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+
+    return builder.build();
+  }
+
+  private static DatanodeCommand convert(
+      BlockStorageMovementCommandProto blkStorageMovementCmdProto) {
+    Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+    List<BlockStorageMovementProto> blkSPSatisfyList =
+        blkStorageMovementCmdProto.getBlockStorageMovementList();
+    for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) {
+      blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy));
+    }
+    return new BlockStorageMovementCommand(
+        DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
+        blkStorageMovementCmdProto.getTrackID(),
+        blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos);
+  }
+
+  private static BlockMovingInfo convertBlockMovingInfo(
+      BlockStorageMovementProto blockStoragePolicySatisfyProto) {
+    BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock();
+    Block block = PBHelperClient.convert(blockProto);
+
+    DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto
+        .getSourceDnInfos();
+    DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto);
+
+    DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto
+        .getTargetDnInfos();
+    DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto);
+
+    StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto
+        .getSourceStorageTypes();
+    StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes(
+        srcStorageTypesProto.getStorageTypesList(),
+        srcStorageTypesProto.getStorageTypesList().size());
+
+    StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto
+        .getTargetStorageTypes();
+    StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes(
+        targetStorageTypesProto.getStorageTypesList(),
+        targetStorageTypesProto.getStorageTypesList().size());
+    return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos,
+        srcStorageTypes, targetStorageTypes);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 294e2c0..cdf59cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -952,19 +952,4 @@ public class DatanodeDescriptor extends DatanodeInfo {
   public List<BlockMovingInfo> getBlocksToMoveStorages() {
     return storageMovementBlocks.poll();
   }
-
-  // TODO: we will remove this method once DN side handling integrated. We can
-  // convert the test to check real block movements instead of this ds.
-  @VisibleForTesting
-  public List<BlockMovingInfo> getStorageMovementPendingItems() {
-    List<BlockMovingInfo> flatList = new ArrayList<>();
-    Iterator<List<BlockMovingInfo>> iterator = storageMovementBlocks
-        .iterator();
-    while (iterator.hasNext()) {
-      List<BlockMovingInfo> next = iterator.next();
-      flatList.addAll(next);
-    }
-    return flatList;
-  }
 }
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 49b78e1..fa1e5d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1633,10 +1633,17 @@ public class DatanodeManager {
       nodeinfo.setBalancerBandwidth(0);
     }
 
-    List<BlockMovingInfo> blocksToMoveStorages = nodeinfo
+    // check pending block storage movement tasks
+    List<BlockMovingInfo> pendingBlockMovementList = nodeinfo
         .getBlocksToMoveStorages();
-    if (blocksToMoveStorages != null) {
-      // TODO: create BlockStorageMovementCommand and add here in response.
+    if (pendingBlockMovementList != null) {
+      // TODO: trackID is used to track the block movement sends to coordinator
+      // datanode. Need to implement tracking logic. Temporarily, using a
+      // constant value -1.
+      long trackID = -1;
+      cmds.add(new BlockStorageMovementCommand(
+          DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, trackID, blockPoolId,
+          pendingBlockMovementList));
     }
 
     if (!cmds.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 00102eb..f5f6738 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -732,6 +732,13 @@ class BPOfferService {
           ((BlockECReconstructionCommand) cmd).getECTasks();
       dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
       break;
+    case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
+      LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT");
+      BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd;
+      dn.getStoragePolicySatisfyWorker().processBlockMovingTasks(
+          blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
+          blkSPSCmd.getBlockMovingTasks());
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -762,6 +769,7 @@ class BPOfferService {
     case DatanodeProtocol.DNA_CACHE:
     case DatanodeProtocol.DNA_UNCACHE:
     case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
+    case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
       LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
       break;
     default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 794b1ad..80c7874 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -379,6 +379,7 @@ public class DataNode extends ReconfigurableBase
   private String dnUserName = null;
   private BlockRecoveryWorker blockRecoveryWorker;
   private ErasureCodingWorker ecWorker;
+  private StoragePolicySatisfyWorker storagePolicySatisfyWorker;
   private final Tracer tracer;
   private final TracerConfigurationManager tracerConfigurationManager;
   private static final int NUM_CORES = Runtime.getRuntime()
@@ -1368,6 +1369,8 @@ public class DataNode extends ReconfigurableBase
 
     ecWorker = new ErasureCodingWorker(getConf(), this);
     blockRecoveryWorker = new BlockRecoveryWorker(this);
+    storagePolicySatisfyWorker =
+        new StoragePolicySatisfyWorker(getConf(), this);
 
     blockPoolManager = new BlockPoolManager(this);
     blockPoolManager.refreshNamenodes(getConf());
@@ -3455,4 +3458,8 @@ public class DataNode extends ReconfigurableBase
   void setBlockScanner(BlockScanner blockScanner) {
     this.blockScanner = blockScanner;
   }
+
+  StoragePolicySatisfyWorker getStoragePolicySatisfyWorker() {
+    return storagePolicySatisfyWorker;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index fa408f6..2c99963 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -28,6 +28,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -126,8 +127,25 @@ public class StoragePolicySatisfyWorker {
     return moverThreadPool;
   }
 
+  /**
+   * Handles the given set of block movement tasks. This will iterate over the
+   * block movement list and submit each block movement task asynchronously in a
+   * separate thread. Each task will move the block replica to the target node &
+   * wait for the completion.
+   *
+   * TODO: Presently this function is a blocking call, this has to be refined by
+   * moving the tracking logic to another tracker thread. HDFS-10884 jira
+   * addresses the same.
+   *
+   * @param trackID
+   *          unique tracking identifier
+   * @param blockPoolID
+   *          block pool ID
+   * @param blockMovingInfos
+   *          list of blocks to be moved
+   */
   public void processBlockMovingTasks(long trackID, String blockPoolID,
-      List<BlockMovingInfo> blockMovingInfos) {
+      Collection<BlockMovingInfo> blockMovingInfos) {
     Future<Void> moveCallable = null;
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       assert blkMovingInfo
@@ -143,8 +161,6 @@ public class StoragePolicySatisfyWorker {
       }
     }
 
-    // TODO: Presently this function act as a blocking call, this has to be
-    // refined by moving the tracking logic to another tracker thread.
     for (int i = 0; i < moverTaskFutures.size(); i++) {
       try {
         moveCallable = moverExecutorCompletionService.take();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
index c1ab800..7c97f1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.protocol;
 
 import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -33,12 +34,60 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  * {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker}
  * service. After the block movement this DataNode sends response back to the
  * NameNode about the movement status.
+ *
+ * The coordinator datanode will use 'trackId' identifier to coordinate the block
+ * movement of the given set of blocks. TrackId is a unique identifier that
+ * represents a group of blocks. Namenode will generate this unique value and
+ * send it to the coordinator datanode along with the
+ * BlockStorageMovementCommand. Datanode will monitor the completion of the
+ * block movements that grouped under this trackId and notifies Namenode about
+ * the completion status.
  */
 public class BlockStorageMovementCommand extends DatanodeCommand {
-  // TODO: constructor needs to be refined based on the block movement data
-  // structure.
-  BlockStorageMovementCommand(int action) {
+  private final long trackID;
+  private final String blockPoolId;
+  private final Collection<BlockMovingInfo> blockMovingTasks;
+
+  /**
+   * Block storage movement command constructor.
+   *
+   * @param action
+   *          protocol specific action
+   * @param trackID
+   *          unique identifier to monitor the given set of block movements
+   * @param blockPoolId
+   *          block pool ID
+   * @param blockMovingInfos
+   *          block to storage info that will be used for movement
+   */
+  public BlockStorageMovementCommand(int action, long trackID,
+      String blockPoolId, Collection<BlockMovingInfo> blockMovingInfos) {
     super(action);
+    this.trackID = trackID;
+    this.blockPoolId = blockPoolId;
+    this.blockMovingTasks = blockMovingInfos;
+  }
+
+  /**
+   * Returns trackID, which will be used to monitor the block movement assigned
+   * to this coordinator datanode.
+   */
+  public long getTrackID() {
+    return trackID;
+  }
+
+  /**
+   * Returns block pool ID.
+   */
+  public String getBlockPoolId() {
+    return blockPoolId;
+  }
+
+  /**
+   * Returns the list of blocks to be moved.
+   */
+  public Collection<BlockMovingInfo> getBlockMovingTasks() {
+    return blockMovingTasks;
   }
 
   /**
@@ -47,10 +96,24 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
   public static class BlockMovingInfo {
     private Block blk;
     private DatanodeInfo[] sourceNodes;
-    private StorageType[] sourceStorageTypes;
     private DatanodeInfo[] targetNodes;
+    private StorageType[] sourceStorageTypes;
     private StorageType[] targetStorageTypes;
 
+    /**
+     * Block to storage info constructor.
+     *
+     * @param block
+     *          block
+     * @param sourceDnInfos
+     *          node that can be the sources of a block move
+     * @param targetDnInfos
+     *          target datanode info
+     * @param srcStorageTypes
+     *          type of source storage media
+     * @param targetStorageTypes
+     *          type of destin storage media
+     */
     public BlockMovingInfo(Block block,
         DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
         StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 8c4359f..f8b4934 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -77,6 +77,7 @@ public interface DatanodeProtocol {
   final static int DNA_CACHE = 9;      // cache blocks
   final static int DNA_UNCACHE = 10;   // uncache blocks
   final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command
+  final static int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command
 
   /** 
    * Register Datanode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 016eae2..f4aac41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -60,6 +60,7 @@ message DatanodeCommandProto {
     NullDatanodeCommand = 7;
     BlockIdCommand = 8;
     BlockECReconstructionCommand = 9;
+    BlockStorageMovementCommand = 10;
   }
 
   required Type cmdType = 1;    // Type of the command
@@ -74,6 +75,7 @@ message DatanodeCommandProto {
   optional RegisterCommandProto registerCmd = 7;
   optional BlockIdCommandProto blkIdCmd = 8;
   optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
+  optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10;
 }
 
 /**
@@ -154,6 +156,26 @@ message BlockECReconstructionCommandProto {
   repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1;
 }
 
+ /**
+ * Block storage movement command
+ */
+message BlockStorageMovementCommandProto {
+  required uint64 trackID = 1;
+  required string blockPoolId = 2;
+  repeated BlockStorageMovementProto blockStorageMovement = 3;
+}
+
+/**
+ * Block storage movement information
+ */
+message BlockStorageMovementProto {
+  required BlockProto block = 1;
+  required DatanodeInfosProto sourceDnInfos = 2;
+  required DatanodeInfosProto targetDnInfos = 3;
+  required StorageTypesProto sourceStorageTypes = 4;
+  required StorageTypesProto targetStorageTypes = 5;
+}
+
 /**
  * registration - Information of the datanode registering with the namenode
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index b61814d..37664b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -18,9 +18,6 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -29,8 +26,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -74,9 +70,6 @@ public class TestStoragePolicySatisfier {
     try {
       // Change policy to ALL_SSD
       distributedFS.setStoragePolicy(new Path(file), "COLD");
-      Set<DatanodeDescriptor> previousNodes =
-          hdfsCluster.getNameNode().getNamesystem().getBlockManager()
-              .getDatanodeManager().getDatanodes();
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
 
@@ -91,8 +84,8 @@ public class TestStoragePolicySatisfier {
 
       hdfsCluster.triggerHeartbeats();
       // Wait till namenode notified about the block location details
-      waitExpectedStorageType(StorageType.ARCHIVE, distributedFS, previousNodes,
-          6, 30000);
+      waitExpectedStorageType(file, StorageType.ARCHIVE, distributedFS, 3,
+          30000);
     } finally {
       hdfsCluster.shutdown();
     }
@@ -104,9 +97,6 @@ public class TestStoragePolicySatisfier {
     try {
       // Change policy to ALL_SSD
       distributedFS.setStoragePolicy(new Path(file), "ALL_SSD");
-      Set<DatanodeDescriptor> previousNodes =
-          hdfsCluster.getNameNode().getNamesystem().getBlockManager()
-              .getDatanodeManager().getDatanodes();
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       INode inode = namesystem.getFSDirectory().getINode(file);
 
@@ -123,8 +113,34 @@ public class TestStoragePolicySatisfier {
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier Identified that block to move to SSD
       // areas
-      waitExpectedStorageType(StorageType.SSD, distributedFS, previousNodes, 6,
-          30000);
+      waitExpectedStorageType(file, StorageType.SSD, distributedFS, 3, 30000);
+    } finally {
+      hdfsCluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 300000)
+  public void testWhenStoragePolicySetToONESSD()
+      throws Exception {
+    try {
+      // Change policy to ONE_SSD
+      distributedFS.setStoragePolicy(new Path(file), "ONE_SSD");
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
+
+      // Making sure SDD based nodes added to cluster. Adding SSD based
+      // datanodes.
+      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      hdfsCluster.triggerHeartbeats();
+      // Wait till StorgePolicySatisfier Identified that block to move to SSD
+      // areas
+      waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
+      waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
     } finally {
       hdfsCluster.shutdown();
     }
@@ -174,35 +190,31 @@ public class TestStoragePolicySatisfier {
     return cluster;
   }
 
-  // TODO: this assertion can be changed to end to end based assertion later
-  // when DN side processing work integrated to this work.
-  private void waitExpectedStorageType(final StorageType expectedStorageType,
-      final DistributedFileSystem dfs,
-      final Set<DatanodeDescriptor> previousNodes, int expectedArchiveCount,
-      int timeout) throws Exception {
+  // Check whether the Block movement has been successfully completed to satisfy
+  // the storage policy for the given file.
+  private void waitExpectedStorageType(final String fileName,
+      final StorageType expectedStorageType, final DistributedFileSystem dfs,
+      int expectedStorageCount, int timeout) throws Exception {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        Iterator<DatanodeDescriptor> iterator = previousNodes.iterator();
-        int archiveCount = 0;
-        while (iterator.hasNext()) {
-          DatanodeDescriptor dn = iterator.next();
-          List<BlockMovingInfo> pendingItemsToMove =
-              dn.getStorageMovementPendingItems();
-          for (BlockMovingInfo blkInfoToMoveStorage : pendingItemsToMove) {
-            StorageType[] targetStorageTypes =
-                blkInfoToMoveStorage.getTargetStorageTypes();
-            for (StorageType storageType : targetStorageTypes) {
-              if (storageType == expectedStorageType) {
-                archiveCount++;
-              }
-            }
+        LocatedBlock lb = null;
+        try {
+          lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0);
+        } catch (IOException e) {
+          LOG.error("Exception while getting located blocks", e);
+          return false;
+        }
+        int actualStorageCount = 0;
+        for (StorageType storageType : lb.getStorageTypes()) {
+          if (expectedStorageType == storageType) {
+            actualStorageCount++;
           }
         }
         LOG.info(
             expectedStorageType + " replica count, expected={} and actual={}",
-            expectedArchiveCount, archiveCount);
-        return expectedArchiveCount == archiveCount;
+            expectedStorageType, actualStorageCount);
+        return expectedStorageCount == actualStorageCount;
       }
     }, 100, timeout);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org