You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/17 01:58:30 UTC
[02/50] hadoop git commit: HDFS-8146. Protobuf changes for
BlockECRecoveryCommand and its fields for making it ready for transfer to DN
(Contributed by Uma Maheswara Rao G)
HDFS-8146. Protobuf changes for BlockECRecoveryCommand and its fields for making it ready for transfer to DN (Contributed by Uma Maheswara Rao G)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dc40603b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dc40603b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dc40603b
Branch: refs/heads/HDFS-7285
Commit: dc40603b259fd2eda0fdb361a8fa256c7b0d4520
Parents: 1ef6400
Author: Vinayakumar B <vi...@apache.org>
Authored: Sat Apr 18 23:20:45 2015 +0530
Committer: Jing Zhao <ji...@apache.org>
Committed: Sat May 16 15:12:48 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 137 ++++++++++++++++++-
.../blockmanagement/DatanodeDescriptor.java | 31 +----
.../server/blockmanagement/DatanodeManager.java | 4 +-
.../server/protocol/BlockECRecoveryCommand.java | 80 ++++++++++-
.../hdfs/server/protocol/DatanodeProtocol.java | 2 +-
.../src/main/proto/DatanodeProtocol.proto | 8 ++
.../src/main/proto/erasurecoding.proto | 13 ++
.../hadoop/hdfs/protocolPB/TestPBHelper.java | 88 ++++++++++++
.../namenode/TestRecoverStripedBlocks.java | 10 +-
10 files changed, 335 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 0ed61cd..40517e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -87,3 +87,6 @@
startup. (Hui Zheng via szetszwo)
HDFS-8167. BlockManager.addBlockCollectionWithCheck should check if the block is a striped block. (Hui Zheng via zhz).
+
+ HDFS-8146. Protobuf changes for BlockECRecoveryCommand and its fields for
+ making it ready for transfer to DN (Uma Maheswara Rao G via vinayakumarb)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 9ca73ae..c127b5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -100,7 +101,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTyp
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
+import org.apache.hadoop.hdfs.protocol.proto.*;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
@@ -121,6 +122,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmI
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
@@ -132,11 +134,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaOptionEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECSchemaProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ECZoneInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
@@ -184,7 +186,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StripedBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
@@ -204,8 +205,10 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -3150,4 +3153,132 @@ public class PBHelper {
return new ECZoneInfo(ecZoneInfoProto.getDir(),
convertECSchema(ecZoneInfoProto.getSchema()));
}
+
+ public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
+ BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
+ ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
+ ExtendedBlock block = convert(blockProto);
+
+ DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
+ .getSourceDnInfos();
+ DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto);
+
+ DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
+ .getTargetDnInfos();
+ DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto);
+
+ StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
+ .getTargetStorageUuids();
+ String[] targetStorageUuids = convert(targetStorageUuidsProto);
+
+ StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
+ .getTargetStorageTypes();
+ StorageType[] convertStorageTypes = convertStorageTypes(
+ targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
+ .getStorageTypesList().size());
+
+ List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto
+ .getLiveBlockIndicesList();
+ short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
+ for (int i = 0; i < liveBlockIndicesList.size(); i++) {
+ liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
+ }
+
+ return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
+ targetStorageUuids, convertStorageTypes, liveBlkIndices);
+ }
+
+ public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
+ BlockECRecoveryInfo blockEcRecoveryInfo) {
+ BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
+ .newBuilder();
+ builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock()));
+
+ DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos();
+ builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+
+ DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos();
+ builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+
+ String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs();
+ builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs));
+
+ StorageType[] targetStorageTypes = blockEcRecoveryInfo
+ .getTargetStorageTypes();
+ builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+
+ short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
+ builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
+
+ return builder.build();
+ }
+
+ private static List<Integer> convertIntArray(short[] liveBlockIndices) {
+ List<Integer> liveBlockIndicesList = new ArrayList<Integer>();
+ for (short s : liveBlockIndices) {
+ liveBlockIndicesList.add((int) s);
+ }
+ return liveBlockIndicesList;
+ }
+
+ private static StorageTypesProto convertStorageTypesProto(
+ StorageType[] targetStorageTypes) {
+ StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
+ for (StorageType storageType : targetStorageTypes) {
+ builder.addStorageTypes(convertStorageType(storageType));
+ }
+ return builder.build();
+ }
+
+ private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
+ StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder();
+ for (String storageUuid : targetStorageIDs) {
+ builder.addStorageUuids(storageUuid);
+ }
+ return builder.build();
+ }
+
+ private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
+ DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
+ for (DatanodeInfo datanodeInfo : dnInfos) {
+ builder.addDatanodes(convert(datanodeInfo));
+ }
+ return builder.build();
+ }
+
+ private static String[] convert(StorageUuidsProto targetStorageUuidsProto) {
+ List<String> storageUuidsList = targetStorageUuidsProto
+ .getStorageUuidsList();
+ String[] storageUuids = new String[storageUuidsList.size()];
+ for (int i = 0; i < storageUuidsList.size(); i++) {
+ storageUuids[i] = storageUuidsList.get(i);
+ }
+ return storageUuids;
+ }
+
+ public static BlockECRecoveryCommandProto convert(
+ BlockECRecoveryCommand blkECRecoveryCmd) {
+ BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
+ .newBuilder();
+ Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
+ .getECTasks();
+ for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
+ builder
+ .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
+ }
+ return builder.build();
+ }
+
+ public static BlockECRecoveryCommand convert(
+ BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
+ Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>();
+ List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
+ .getBlockECRecoveryinfoList();
+ for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
+ blkECRecoveryInfos
+ .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
+ }
+ return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
+ blkECRecoveryInfos);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 7ec71a2..35cc31b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -32,8 +32,8 @@ import java.util.Set;
import java.util.Arrays;
import com.google.common.annotations.VisibleForTesting;
-
import com.google.common.collect.ImmutableList;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -99,34 +100,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
- /** Block and targets pair */
- @InterfaceAudience.Private
- @InterfaceStability.Evolving
- public static class BlockECRecoveryInfo {
- public final ExtendedBlock block;
- public final DatanodeDescriptor[] sources;
- public final DatanodeStorageInfo[] targets;
- public final short[] liveBlockIndices;
-
- BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources,
- DatanodeStorageInfo[] targets, short[] liveBlockIndices) {
- this.block = block;
- this.sources = sources;
- this.targets = targets;
- this.liveBlockIndices = liveBlockIndices;
- }
-
- @Override
- public String toString() {
- return new StringBuilder().append("BlockECRecoveryInfo(\n ").
- append("Recovering ").append(block).
- append(" From: ").append(Arrays.asList(sources)).
- append(" To: ").append(Arrays.asList(targets)).append(")\n").
- append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)).
- toString();
- }
- }
-
/** A BlockTargetPair queue. */
private static class BlockQueue<E> {
private final Queue<E> blockq = new LinkedList<E>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index fb5e904..e6ffd77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -34,12 +34,12 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
@@ -1441,7 +1441,7 @@ public class DatanodeManager {
List<BlockECRecoveryInfo> pendingECList =
nodeinfo.getErasureCodeCommand(maxTransfers);
if (pendingECList != null) {
- cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_CODEC,
+ cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
pendingECList));
}
//check block invalidation
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
index f7f02fd..9a387dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
@@ -18,10 +18,15 @@
package org.apache.hadoop.hdfs.server.protocol;
import com.google.common.base.Joiner;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import java.util.Arrays;
import java.util.Collection;
/**
@@ -60,4 +65,77 @@ public class BlockECRecoveryCommand extends DatanodeCommand {
sb.append("\n)");
return sb.toString();
}
+
+ /** Block and targets pair */
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ public static class BlockECRecoveryInfo {
+ private final ExtendedBlock block;
+ private final DatanodeInfo[] sources;
+ private DatanodeInfo[] targets;
+ private String[] targetStorageIDs;
+ private StorageType[] targetStorageTypes;
+ private final short[] liveBlockIndices;
+
+ public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
+ DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices) {
+ this.block = block;
+ this.sources = sources;
+ this.targets = DatanodeStorageInfo.toDatanodeInfos(targetDnStorageInfo);
+ this.targetStorageIDs = DatanodeStorageInfo
+ .toStorageIDs(targetDnStorageInfo);
+ this.targetStorageTypes = DatanodeStorageInfo
+ .toStorageTypes(targetDnStorageInfo);
+ this.liveBlockIndices = liveBlockIndices;
+ }
+
+ public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
+ DatanodeInfo[] targets, String[] targetStorageIDs,
+ StorageType[] targetStorageTypes, short[] liveBlockIndices) {
+ this.block = block;
+ this.sources = sources;
+ this.targets = targets;
+ this.targetStorageIDs = targetStorageIDs;
+ this.targetStorageTypes = targetStorageTypes;
+ this.liveBlockIndices = liveBlockIndices;
+ }
+
+ public ExtendedBlock getExtendedBlock() {
+ return block;
+ }
+
+ public DatanodeInfo[] getSourceDnInfos() {
+ return sources;
+ }
+
+ public DatanodeInfo[] getTargetDnInfos() {
+ return targets;
+ }
+
+ public String[] getTargetStorageIDs() {
+ return targetStorageIDs;
+ }
+
+ public StorageType[] getTargetStorageTypes() {
+ return targetStorageTypes;
+ }
+
+ public short[] getLiveBlockIndices() {
+ return liveBlockIndices;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("BlockECRecoveryInfo(\n ")
+ .append("Recovering ").append(block).append(" From: ")
+ .append(Arrays.asList(sources)).append(" To: [")
+ .append(Arrays.asList(targets)).append(")\n")
+ .append(" Block Indices: ").append(Arrays.asList(liveBlockIndices))
+ .toString();
+ }
+ }
+
+ public Collection<BlockECRecoveryInfo> getECTasks() {
+ return this.ecTasks;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index b8ac165..1411fa9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -76,7 +76,7 @@ public interface DatanodeProtocol {
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
final static int DNA_CACHE = 9; // cache blocks
final static int DNA_UNCACHE = 10; // uncache blocks
- final static int DNA_CODEC = 11; // uncache blocks
+ final static int DNA_ERASURE_CODING_RECOVERY = 11; // erasure coding recovery command
/**
* Register Datanode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 3083dc9..ac9ab46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -33,6 +33,7 @@ package hadoop.hdfs.datanode;
import "HAServiceProtocol.proto";
import "hdfs.proto";
+import "erasurecoding.proto";
/**
* Information to identify a datanode to a namenode
@@ -145,6 +146,13 @@ message RegisterCommandProto {
}
/**
+ * Block Erasure coding recovery command
+ */
+message BlockECRecoveryCommandProto {
+ repeated BlockECRecoveryInfoProto blockECRecoveryinfo = 1;
+}
+
+/**
* registration - Information of the datanode registering with the namenode
*/
message RegisterDatanodeRequestProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto
index d888f71..59bd949 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/erasurecoding.proto
@@ -21,6 +21,7 @@ option java_outer_classname = "ErasureCodingProtos";
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
+import "hdfs.proto";
/**
* ECSchema options entry
@@ -86,4 +87,16 @@ message GetECZoneInfoRequestProto {
message GetECZoneInfoResponseProto {
optional ECZoneInfoProto ECZoneInfo = 1;
+}
+
+/**
+ * Block erasure coding recovery info
+ */
+message BlockECRecoveryInfoProto {
+ required ExtendedBlockProto block = 1;
+ required DatanodeInfosProto sourceDnInfos = 2;
+ required DatanodeInfosProto targetDnInfos = 3;
+ required StorageUuidsProto targetStorageUuids = 4;
+ required StorageTypesProto targetStorageTypes = 5;
+ repeated uint32 liveBlockIndices = 6;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index 4b42f4c..4ec4ea5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.permission.AclEntry;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
@@ -63,15 +66,20 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -639,4 +647,84 @@ public class TestPBHelper {
.build();
Assert.assertEquals(s, PBHelper.convert(PBHelper.convert(s)));
}
+
+ @Test
+ public void testBlockECRecoveryCommand() {
+ DatanodeInfo[] dnInfos0 = new DatanodeInfo[] {
+ DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
+ DatanodeStorageInfo targetDnInfos_0 = BlockManagerTestUtil
+ .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+ new DatanodeStorage("s00"));
+ DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil
+ .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+ new DatanodeStorage("s01"));
+ DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] {
+ targetDnInfos_0, targetDnInfos_1 };
+ short[] liveBlkIndices0 = new short[2];
+ BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
+ new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
+ liveBlkIndices0);
+ DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
+ DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
+ DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil
+ .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+ new DatanodeStorage("s02"));
+ DatanodeStorageInfo targetDnInfos_3 = BlockManagerTestUtil
+ .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+ new DatanodeStorage("s03"));
+ DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] {
+ targetDnInfos_2, targetDnInfos_3 };
+ short[] liveBlkIndices1 = new short[2];
+ BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
+ new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
+ liveBlkIndices1);
+ List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>();
+ blkRecoveryInfosList.add(blkECRecoveryInfo0);
+ blkRecoveryInfosList.add(blkECRecoveryInfo1);
+ BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand(
+ DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList);
+ BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper
+ .convert(blkECRecoveryCmd);
+ blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto);
+ Iterator<BlockECRecoveryInfo> iterator = blkECRecoveryCmd.getECTasks()
+ .iterator();
+ assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next());
+ assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next());
+ }
+
+ private void assertBlockECRecoveryInfoEquals(
+ BlockECRecoveryInfo blkECRecoveryInfo1,
+ BlockECRecoveryInfo blkECRecoveryInfo2) {
+ assertEquals(blkECRecoveryInfo1.getExtendedBlock(),
+ blkECRecoveryInfo2.getExtendedBlock());
+
+ DatanodeInfo[] sourceDnInfos1 = blkECRecoveryInfo1.getSourceDnInfos();
+ DatanodeInfo[] sourceDnInfos2 = blkECRecoveryInfo2.getSourceDnInfos();
+ assertDnInfosEqual(sourceDnInfos1, sourceDnInfos2);
+
+ DatanodeInfo[] targetDnInfos1 = blkECRecoveryInfo1.getTargetDnInfos();
+ DatanodeInfo[] targetDnInfos2 = blkECRecoveryInfo2.getTargetDnInfos();
+ assertDnInfosEqual(targetDnInfos1, targetDnInfos2);
+
+ String[] targetStorageIDs1 = blkECRecoveryInfo1.getTargetStorageIDs();
+ String[] targetStorageIDs2 = blkECRecoveryInfo2.getTargetStorageIDs();
+ assertEquals(targetStorageIDs1.length, targetStorageIDs2.length);
+ for (int i = 0; i < targetStorageIDs1.length; i++) {
+ assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]);
+ }
+
+ short[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices();
+ short[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices();
+ for (int i = 0; i < liveBlockIndices1.length; i++) {
+ assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]);
+ }
+ }
+
+ private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,
+ DatanodeInfo[] dnInfos2) {
+ assertEquals(dnInfos1.length, dnInfos2.length);
+ for (int i = 0; i < dnInfos1.length; i++) {
+ compare(dnInfos1[i], dnInfos2[i]);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc40603b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index ea18c3e..ca4fbbc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -115,10 +115,10 @@ public class TestRecoverStripedBlocks {
last.getNumberOfBlocksToBeErasureCoded());
List<BlockECRecoveryInfo> recovery = last.getErasureCodeCommand(numBlocks);
for (BlockECRecoveryInfo info : recovery) {
- assertEquals(1, info.targets.length);
- assertEquals(last, info.targets[0].getDatanodeDescriptor());
- assertEquals(GROUP_SIZE - 1, info.sources.length);
- assertEquals(GROUP_SIZE - 1, info.liveBlockIndices.length);
+ assertEquals(1, info.getTargetDnInfos().length);
+ assertEquals(last, info.getTargetDnInfos()[0]);
+ assertEquals(GROUP_SIZE - 1, info.getSourceDnInfos().length);
+ assertEquals(GROUP_SIZE - 1, info.getLiveBlockIndices().length);
}
}
}