You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:10 UTC
[42/50] [abbrv] hadoop git commit: HDFS-10801. [SPS]: Protocol buffer
changes for sending storage movement commands from NN to DN. Contributed by
Rakesh R
HDFS-10801. [SPS]: Protocol buffer changes for sending storage movement commands from NN to DN. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a706199c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a706199c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a706199c
Branch: refs/heads/HDFS-10285
Commit: a706199c79a7ac87f26cd0a0d1a2fc71921660e7
Parents: 3bb1c04
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Tue Oct 11 11:44:06 2016 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Dec 15 14:24:16 2016 +0530
----------------------------------------------------------------------
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 90 ++++++++++++++++++++
.../blockmanagement/DatanodeDescriptor.java | 15 ----
.../server/blockmanagement/DatanodeManager.java | 13 ++-
.../hdfs/server/datanode/BPOfferService.java | 8 ++
.../hadoop/hdfs/server/datanode/DataNode.java | 7 ++
.../datanode/StoragePolicySatisfyWorker.java | 22 ++++-
.../protocol/BlockStorageMovementCommand.java | 71 ++++++++++++++-
.../hdfs/server/protocol/DatanodeProtocol.java | 1 +
.../src/main/proto/DatanodeProtocol.proto | 22 +++++
.../namenode/TestStoragePolicySatisfier.java | 86 +++++++++++--------
10 files changed, 273 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 4f6a04e..62e86ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -88,6 +90,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.Block
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -456,6 +460,8 @@ public class PBHelper {
return PBHelper.convert(proto.getBlkIdCmd());
case BlockECReconstructionCommand:
return PBHelper.convert(proto.getBlkECReconstructionCmd());
+ case BlockStorageMovementCommand:
+ return PBHelper.convert(proto.getBlkStorageMovementCmd());
default:
return null;
}
@@ -590,6 +596,11 @@ public class PBHelper {
.setBlkECReconstructionCmd(
convert((BlockECReconstructionCommand) datanodeCommand));
break;
+ case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
+ builder.setCmdType(DatanodeCommandProto.Type.BlockStorageMovementCommand)
+ .setBlkStorageMovementCmd(
+ convert((BlockStorageMovementCommand) datanodeCommand));
+ break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
@@ -962,4 +973,83 @@ public class PBHelper {
DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION,
blkECReconstructionInfos);
}
+
+ private static BlockStorageMovementCommandProto convert(
+ BlockStorageMovementCommand blkStorageMovementCmd) {
+ BlockStorageMovementCommandProto.Builder builder =
+ BlockStorageMovementCommandProto.newBuilder();
+
+ builder.setTrackID(blkStorageMovementCmd.getTrackID());
+ builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId());
+ Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd
+ .getBlockMovingTasks();
+ for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+ builder.addBlockStorageMovement(
+ convertBlockMovingInfo(blkMovingInfo));
+ }
+ return builder.build();
+ }
+
+ private static BlockStorageMovementProto convertBlockMovingInfo(
+ BlockMovingInfo blkMovingInfo) {
+ BlockStorageMovementProto.Builder builder = BlockStorageMovementProto
+ .newBuilder();
+ builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock()));
+
+ DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources();
+ builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+
+ DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets();
+ builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+
+ StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes();
+ builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes));
+
+ StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes();
+ builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+
+ return builder.build();
+ }
+
+ private static DatanodeCommand convert(
+ BlockStorageMovementCommandProto blkStorageMovementCmdProto) {
+ Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+ List<BlockStorageMovementProto> blkSPSatisfyList =
+ blkStorageMovementCmdProto.getBlockStorageMovementList();
+ for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) {
+ blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy));
+ }
+ return new BlockStorageMovementCommand(
+ DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
+ blkStorageMovementCmdProto.getTrackID(),
+ blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos);
+ }
+
+ private static BlockMovingInfo convertBlockMovingInfo(
+ BlockStorageMovementProto blockStoragePolicySatisfyProto) {
+ BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock();
+ Block block = PBHelperClient.convert(blockProto);
+
+ DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto
+ .getSourceDnInfos();
+ DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto);
+
+ DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto
+ .getTargetDnInfos();
+ DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto);
+
+ StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto
+ .getSourceStorageTypes();
+ StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes(
+ srcStorageTypesProto.getStorageTypesList(),
+ srcStorageTypesProto.getStorageTypesList().size());
+
+ StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto
+ .getTargetStorageTypes();
+ StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes(
+ targetStorageTypesProto.getStorageTypesList(),
+ targetStorageTypesProto.getStorageTypesList().size());
+ return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos,
+ srcStorageTypes, targetStorageTypes);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 294e2c0..cdf59cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -952,19 +952,4 @@ public class DatanodeDescriptor extends DatanodeInfo {
public List<BlockMovingInfo> getBlocksToMoveStorages() {
return storageMovementBlocks.poll();
}
-
- // TODO: we will remove this method once DN side handling integrated. We can
- // convert the test to check real block movements instead of this ds.
- @VisibleForTesting
- public List<BlockMovingInfo> getStorageMovementPendingItems() {
- List<BlockMovingInfo> flatList = new ArrayList<>();
- Iterator<List<BlockMovingInfo>> iterator = storageMovementBlocks
- .iterator();
- while (iterator.hasNext()) {
- List<BlockMovingInfo> next = iterator.next();
- flatList.addAll(next);
- }
- return flatList;
- }
}
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 49b78e1..fa1e5d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1633,10 +1633,17 @@ public class DatanodeManager {
nodeinfo.setBalancerBandwidth(0);
}
- List<BlockMovingInfo> blocksToMoveStorages = nodeinfo
+ // check pending block storage movement tasks
+ List<BlockMovingInfo> pendingBlockMovementList = nodeinfo
.getBlocksToMoveStorages();
- if (blocksToMoveStorages != null) {
- // TODO: create BlockStorageMovementCommand and add here in response.
+ if (pendingBlockMovementList != null) {
+ // TODO: trackID is used to track the block movement sends to coordinator
+ // datanode. Need to implement tracking logic. Temporarily, using a
+ // constant value -1.
+ long trackID = -1;
+ cmds.add(new BlockStorageMovementCommand(
+ DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, trackID, blockPoolId,
+ pendingBlockMovementList));
}
if (!cmds.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 00102eb..f5f6738 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -732,6 +732,13 @@ class BPOfferService {
((BlockECReconstructionCommand) cmd).getECTasks();
dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
break;
+ case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
+ LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT");
+ BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd;
+ dn.getStoragePolicySatisfyWorker().processBlockMovingTasks(
+ blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
+ blkSPSCmd.getBlockMovingTasks());
+ break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
@@ -762,6 +769,7 @@ class BPOfferService {
case DatanodeProtocol.DNA_CACHE:
case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
+ case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
break;
default:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 794b1ad..80c7874 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -379,6 +379,7 @@ public class DataNode extends ReconfigurableBase
private String dnUserName = null;
private BlockRecoveryWorker blockRecoveryWorker;
private ErasureCodingWorker ecWorker;
+ private StoragePolicySatisfyWorker storagePolicySatisfyWorker;
private final Tracer tracer;
private final TracerConfigurationManager tracerConfigurationManager;
private static final int NUM_CORES = Runtime.getRuntime()
@@ -1368,6 +1369,8 @@ public class DataNode extends ReconfigurableBase
ecWorker = new ErasureCodingWorker(getConf(), this);
blockRecoveryWorker = new BlockRecoveryWorker(this);
+ storagePolicySatisfyWorker =
+ new StoragePolicySatisfyWorker(getConf(), this);
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(getConf());
@@ -3455,4 +3458,8 @@ public class DataNode extends ReconfigurableBase
void setBlockScanner(BlockScanner blockScanner) {
this.blockScanner = blockScanner;
}
+
+ StoragePolicySatisfyWorker getStoragePolicySatisfyWorker() {
+ return storagePolicySatisfyWorker;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index fa408f6..2c99963 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -28,6 +28,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Callable;
@@ -126,8 +127,25 @@ public class StoragePolicySatisfyWorker {
return moverThreadPool;
}
+ /**
+ * Handles the given set of block movement tasks. This will iterate over the
+ * block movement list and submit each block movement task asynchronously in a
+ * separate thread. Each task will move the block replica to the target node &
+ * wait for the completion.
+ *
+ * TODO: Presently this function is a blocking call, this has to be refined by
+ * moving the tracking logic to another tracker thread. HDFS-10884 jira
+ * addresses the same.
+ *
+ * @param trackID
+ * unique tracking identifier
+ * @param blockPoolID
+ * block pool ID
+ * @param blockMovingInfos
+ * list of blocks to be moved
+ */
public void processBlockMovingTasks(long trackID, String blockPoolID,
- List<BlockMovingInfo> blockMovingInfos) {
+ Collection<BlockMovingInfo> blockMovingInfos) {
Future<Void> moveCallable = null;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
assert blkMovingInfo
@@ -143,8 +161,6 @@ public class StoragePolicySatisfyWorker {
}
}
- // TODO: Presently this function act as a blocking call, this has to be
- // refined by moving the tracking logic to another tracker thread.
for (int i = 0; i < moverTaskFutures.size(); i++) {
try {
moveCallable = moverExecutorCompletionService.take();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
index c1ab800..7c97f1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.protocol;
import java.util.Arrays;
+import java.util.Collection;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -33,12 +34,60 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
* {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker}
* service. After the block movement this DataNode sends response back to the
* NameNode about the movement status.
+ *
+ * The coordinator datanode will use 'trackId' identifier to coordinate the block
+ * movement of the given set of blocks. TrackId is a unique identifier that
+ * represents a group of blocks. Namenode will generate this unique value and
+ * send it to the coordinator datanode along with the
+ * BlockStorageMovementCommand. Datanode will monitor the completion of the
+ * block movements that grouped under this trackId and notifies Namenode about
+ * the completion status.
*/
public class BlockStorageMovementCommand extends DatanodeCommand {
- // TODO: constructor needs to be refined based on the block movement data
- // structure.
- BlockStorageMovementCommand(int action) {
+ private final long trackID;
+ private final String blockPoolId;
+ private final Collection<BlockMovingInfo> blockMovingTasks;
+
+ /**
+ * Block storage movement command constructor.
+ *
+ * @param action
+ * protocol specific action
+ * @param trackID
+ * unique identifier to monitor the given set of block movements
+ * @param blockPoolId
+ * block pool ID
+ * @param blockMovingInfos
+ * block to storage info that will be used for movement
+ */
+ public BlockStorageMovementCommand(int action, long trackID,
+ String blockPoolId, Collection<BlockMovingInfo> blockMovingInfos) {
super(action);
+ this.trackID = trackID;
+ this.blockPoolId = blockPoolId;
+ this.blockMovingTasks = blockMovingInfos;
+ }
+
+ /**
+ * Returns trackID, which will be used to monitor the block movement assigned
+ * to this coordinator datanode.
+ */
+ public long getTrackID() {
+ return trackID;
+ }
+
+ /**
+ * Returns block pool ID.
+ */
+ public String getBlockPoolId() {
+ return blockPoolId;
+ }
+
+ /**
+ * Returns the list of blocks to be moved.
+ */
+ public Collection<BlockMovingInfo> getBlockMovingTasks() {
+ return blockMovingTasks;
}
/**
@@ -47,10 +96,24 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
public static class BlockMovingInfo {
private Block blk;
private DatanodeInfo[] sourceNodes;
- private StorageType[] sourceStorageTypes;
private DatanodeInfo[] targetNodes;
+ private StorageType[] sourceStorageTypes;
private StorageType[] targetStorageTypes;
+ /**
+ * Block to storage info constructor.
+ *
+ * @param block
+ * block
+ * @param sourceDnInfos
+ * node that can be the sources of a block move
+ * @param targetDnInfos
+ * target datanode info
+ * @param srcStorageTypes
+ * type of source storage media
+ * @param targetStorageTypes
+ * type of destin storage media
+ */
public BlockMovingInfo(Block block,
DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 8c4359f..f8b4934 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -77,6 +77,7 @@ public interface DatanodeProtocol {
final static int DNA_CACHE = 9; // cache blocks
final static int DNA_UNCACHE = 10; // uncache blocks
final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command
+ final static int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command
/**
* Register Datanode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 016eae2..f4aac41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -60,6 +60,7 @@ message DatanodeCommandProto {
NullDatanodeCommand = 7;
BlockIdCommand = 8;
BlockECReconstructionCommand = 9;
+ BlockStorageMovementCommand = 10;
}
required Type cmdType = 1; // Type of the command
@@ -74,6 +75,7 @@ message DatanodeCommandProto {
optional RegisterCommandProto registerCmd = 7;
optional BlockIdCommandProto blkIdCmd = 8;
optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
+ optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10;
}
/**
@@ -154,6 +156,26 @@ message BlockECReconstructionCommandProto {
repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1;
}
+ /**
+ * Block storage movement command
+ */
+message BlockStorageMovementCommandProto {
+ required uint64 trackID = 1;
+ required string blockPoolId = 2;
+ repeated BlockStorageMovementProto blockStorageMovement = 3;
+}
+
+/**
+ * Block storage movement information
+ */
+message BlockStorageMovementProto {
+ required BlockProto block = 1;
+ required DatanodeInfosProto sourceDnInfos = 2;
+ required DatanodeInfosProto targetDnInfos = 3;
+ required StorageTypesProto sourceStorageTypes = 4;
+ required StorageTypesProto targetStorageTypes = 5;
+}
+
/**
* registration - Information of the datanode registering with the namenode
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a706199c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index b61814d..37664b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -29,8 +26,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -74,9 +70,6 @@ public class TestStoragePolicySatisfier {
try {
// Change policy to ALL_SSD
distributedFS.setStoragePolicy(new Path(file), "COLD");
- Set<DatanodeDescriptor> previousNodes =
- hdfsCluster.getNameNode().getNamesystem().getBlockManager()
- .getDatanodeManager().getDatanodes();
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
@@ -91,8 +84,8 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details
- waitExpectedStorageType(StorageType.ARCHIVE, distributedFS, previousNodes,
- 6, 30000);
+ waitExpectedStorageType(file, StorageType.ARCHIVE, distributedFS, 3,
+ 30000);
} finally {
hdfsCluster.shutdown();
}
@@ -104,9 +97,6 @@ public class TestStoragePolicySatisfier {
try {
// Change policy to ALL_SSD
distributedFS.setStoragePolicy(new Path(file), "ALL_SSD");
- Set<DatanodeDescriptor> previousNodes =
- hdfsCluster.getNameNode().getNamesystem().getBlockManager()
- .getDatanodeManager().getDatanodes();
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
@@ -123,8 +113,34 @@ public class TestStoragePolicySatisfier {
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
- waitExpectedStorageType(StorageType.SSD, distributedFS, previousNodes, 6,
- 30000);
+ waitExpectedStorageType(file, StorageType.SSD, distributedFS, 3, 30000);
+ } finally {
+ hdfsCluster.shutdown();
+ }
+ }
+
+ @Test(timeout = 300000)
+ public void testWhenStoragePolicySetToONESSD()
+ throws Exception {
+ try {
+ // Change policy to ONE_SSD
+ distributedFS.setStoragePolicy(new Path(file), "ONE_SSD");
+ FSNamesystem namesystem = hdfsCluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode(file);
+
+ StorageType[][] newtypes =
+ new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
+
+ // Making sure SDD based nodes added to cluster. Adding SSD based
+ // datanodes.
+ startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
+ storagesPerDatanode, capacity, hdfsCluster);
+ namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ hdfsCluster.triggerHeartbeats();
+ // Wait till StorgePolicySatisfier Identified that block to move to SSD
+ // areas
+ waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
+ waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
} finally {
hdfsCluster.shutdown();
}
@@ -174,35 +190,31 @@ public class TestStoragePolicySatisfier {
return cluster;
}
- // TODO: this assertion can be changed to end to end based assertion later
- // when DN side processing work integrated to this work.
- private void waitExpectedStorageType(final StorageType expectedStorageType,
- final DistributedFileSystem dfs,
- final Set<DatanodeDescriptor> previousNodes, int expectedArchiveCount,
- int timeout) throws Exception {
+ // Check whether the Block movement has been successfully completed to satisfy
+ // the storage policy for the given file.
+ private void waitExpectedStorageType(final String fileName,
+ final StorageType expectedStorageType, final DistributedFileSystem dfs,
+ int expectedStorageCount, int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
- Iterator<DatanodeDescriptor> iterator = previousNodes.iterator();
- int archiveCount = 0;
- while (iterator.hasNext()) {
- DatanodeDescriptor dn = iterator.next();
- List<BlockMovingInfo> pendingItemsToMove =
- dn.getStorageMovementPendingItems();
- for (BlockMovingInfo blkInfoToMoveStorage : pendingItemsToMove) {
- StorageType[] targetStorageTypes =
- blkInfoToMoveStorage.getTargetStorageTypes();
- for (StorageType storageType : targetStorageTypes) {
- if (storageType == expectedStorageType) {
- archiveCount++;
- }
- }
+ LocatedBlock lb = null;
+ try {
+ lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0);
+ } catch (IOException e) {
+ LOG.error("Exception while getting located blocks", e);
+ return false;
+ }
+ int actualStorageCount = 0;
+ for (StorageType storageType : lb.getStorageTypes()) {
+ if (expectedStorageType == storageType) {
+ actualStorageCount++;
}
}
LOG.info(
expectedStorageType + " replica count, expected={} and actual={}",
- expectedArchiveCount, archiveCount);
- return expectedArchiveCount == archiveCount;
+ expectedStorageType, actualStorageCount);
+ return expectedStorageCount == actualStorageCount;
}
}, 100, timeout);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org