You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/30 17:42:33 UTC
[33/50] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 53c6cdb,28ea866..8874c4d
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@@ -23,8 -23,8 +23,9 @@@ import java.util.concurrent.TimeUnit
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.http.HttpConfig;
@@@ -171,8 -171,8 +172,10 @@@ public class DFSConfigKeys extends Comm
public static final int DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT = 3;
public static final String DFS_NAMENODE_REPLICATION_MIN_KEY = "dfs.namenode.replication.min";
public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
+ public static final String DFS_NAMENODE_STRIPE_MIN_KEY = "dfs.namenode.stripe.min";
+ public static final int DFS_NAMENODE_STRIPE_MIN_DEFAULT = 1;
+ public static final String DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY =
+ "dfs.namenode.safemode.replication.min";
public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY = "dfs.namenode.replication.pending.timeout-sec";
public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index ac927ef,f4cf4c2..5bf52c5
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@@ -84,9 -85,7 +85,8 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
- import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 3217484,d93277c..1e4b899
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@@ -429,10 -422,10 +429,11 @@@ public class ClientNamenodeProtocolServ
req.getClientName(), flags);
AppendResponseProto.Builder builder = AppendResponseProto.newBuilder();
if (result.getLastBlock() != null) {
- builder.setBlock(PBHelper.convertLocatedBlock(result.getLastBlock()));
- builder.setBlock(PBHelperClient.convert(result.getLastBlock()));
++ builder.setBlock(PBHelperClient.convertLocatedBlock(
++ result.getLastBlock()));
}
if (result.getFileStatus() != null) {
- builder.setStat(PBHelper.convert(result.getFileStatus()));
+ builder.setStat(PBHelperClient.convert(result.getFileStatus()));
}
return builder.build();
} catch (IOException e) {
@@@ -505,7 -498,7 +506,7 @@@
(favor == null || favor.size() == 0) ? null : favor
.toArray(new String[favor.size()]));
return AddBlockResponseProto.newBuilder()
- .setBlock(PBHelper.convertLocatedBlock(result)).build();
- .setBlock(PBHelperClient.convert(result)).build();
++ .setBlock(PBHelperClient.convertLocatedBlock(result)).build();
} catch (IOException e) {
throw new ServiceException(e);
}
@@@ -525,11 -518,11 +526,11 @@@
new DatanodeInfoProto[existingList.size()])),
existingStorageIDsList.toArray(
new String[existingStorageIDsList.size()]),
- PBHelper.convert(excludesList.toArray(
- new DatanodeInfoProto[excludesList.size()])),
+ PBHelperClient.convert(excludesList.toArray(
+ new DatanodeInfoProto[excludesList.size()])),
req.getNumAdditionalNodes(), req.getClientName());
return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
- PBHelper.convertLocatedBlock(result))
- PBHelperClient.convert(result))
++ PBHelperClient.convertLocatedBlock(result))
.build();
} catch (IOException e) {
throw new ServiceException(e);
@@@ -555,7 -548,7 +556,7 @@@
ReportBadBlocksRequestProto req) throws ServiceException {
try {
List<LocatedBlockProto> bl = req.getBlocksList();
- server.reportBadBlocks(PBHelper.convertLocatedBlocks(
- server.reportBadBlocks(PBHelperClient.convertLocatedBlock(
++ server.reportBadBlocks(PBHelperClient.convertLocatedBlocks(
bl.toArray(new LocatedBlockProto[bl.size()])));
} catch (IOException e) {
throw new ServiceException(e);
@@@ -960,8 -953,8 +961,8 @@@
RpcController controller, UpdateBlockForPipelineRequestProto req)
throws ServiceException {
try {
- LocatedBlockProto result = PBHelper.convertLocatedBlock(
- LocatedBlockProto result = PBHelperClient.convert(server
- .updateBlockForPipeline(PBHelperClient.convert(req.getBlock()),
++ LocatedBlockProto result = PBHelperClient.convertLocatedBlock(
+ server.updateBlockForPipeline(PBHelperClient.convert(req.getBlock()),
req.getClientName()));
return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result)
.build();
@@@ -1401,20 -1394,6 +1402,20 @@@
}
@Override
+ public SetErasureCodingPolicyResponseProto setErasureCodingPolicy(
+ RpcController controller, SetErasureCodingPolicyRequestProto req)
+ throws ServiceException {
+ try {
- ErasureCodingPolicy ecPolicy = req.hasEcPolicy() ? PBHelper.convertErasureCodingPolicy(req
- .getEcPolicy()) : null;
++ ErasureCodingPolicy ecPolicy = req.hasEcPolicy() ?
++ PBHelperClient.convertErasureCodingPolicy(req.getEcPolicy()) : null;
+ server.setErasureCodingPolicy(req.getSrc(), ecPolicy);
+ return SetErasureCodingPolicyResponseProto.newBuilder().build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
public SetXAttrResponseProto setXAttr(RpcController controller,
SetXAttrRequestProto req) throws ServiceException {
try {
@@@ -1535,35 -1514,4 +1536,35 @@@
throw new ServiceException(e);
}
}
+
+ @Override
+ public GetErasureCodingPoliciesResponseProto getErasureCodingPolicies(RpcController controller,
+ GetErasureCodingPoliciesRequestProto request) throws ServiceException {
+ try {
+ ErasureCodingPolicy[] ecPolicies = server.getErasureCodingPolicies();
+ GetErasureCodingPoliciesResponseProto.Builder resBuilder = GetErasureCodingPoliciesResponseProto
+ .newBuilder();
+ for (ErasureCodingPolicy ecPolicy : ecPolicies) {
- resBuilder.addEcPolicies(PBHelper.convertErasureCodingPolicy(ecPolicy));
++ resBuilder.addEcPolicies(PBHelperClient.convertErasureCodingPolicy(ecPolicy));
+ }
+ return resBuilder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public GetErasureCodingPolicyResponseProto getErasureCodingPolicy(RpcController controller,
+ GetErasureCodingPolicyRequestProto request) throws ServiceException {
+ try {
+ ErasureCodingPolicy ecPolicy = server.getErasureCodingPolicy(request.getSrc());
+ GetErasureCodingPolicyResponseProto.Builder builder = GetErasureCodingPolicyResponseProto.newBuilder();
+ if (ecPolicy != null) {
- builder.setEcPolicy(PBHelper.convertErasureCodingPolicy(ecPolicy));
++ builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(ecPolicy));
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 8419244,f4ce46d..7b02691
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@@ -338,9 -327,9 +338,9 @@@ public class ClientNamenodeProtocolTran
.build();
try {
AppendResponseProto res = rpcProxy.append(null, req);
- LocatedBlock lastBlock = res.hasBlock() ? PBHelper
+ LocatedBlock lastBlock = res.hasBlock() ? PBHelperClient
- .convert(res.getBlock()) : null;
+ .convertLocatedBlockProto(res.getBlock()) : null;
- HdfsFileStatus stat = (res.hasStat()) ? PBHelper.convert(res.getStat())
+ HdfsFileStatus stat = (res.hasStat()) ? PBHelperClient.convert(res.getStat())
: null;
return new LastBlockWithStatus(lastBlock, stat);
} catch (ServiceException e) {
@@@ -427,8 -416,7 +427,8 @@@
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
}
try {
- return PBHelper.convertLocatedBlockProto(
- return PBHelperClient.convert(rpcProxy.addBlock(null, req.build()).getBlock());
++ return PBHelperClient.convertLocatedBlockProto(
+ rpcProxy.addBlock(null, req.build()).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@@ -453,8 -441,8 +453,8 @@@
.setClientName(clientName)
.build();
try {
- return PBHelper.convertLocatedBlockProto(
- return PBHelperClient.convert(rpcProxy.getAdditionalDatanode(null, req)
- .getBlock());
++ return PBHelperClient.convertLocatedBlockProto(
+ rpcProxy.getAdditionalDatanode(null, req).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@@ -481,7 -469,7 +481,7 @@@
@Override
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
ReportBadBlocksRequestProto req = ReportBadBlocksRequestProto.newBuilder()
- .addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlocks(blocks)))
- .addAllBlocks(Arrays.asList(PBHelperClient.convertLocatedBlock(blocks)))
++ .addAllBlocks(Arrays.asList(PBHelperClient.convertLocatedBlocks(blocks)))
.build();
try {
rpcProxy.reportBadBlocks(null, req);
@@@ -913,7 -901,7 +913,7 @@@
.setClientName(clientName)
.build();
try {
- return PBHelper.convertLocatedBlockProto(
- return PBHelperClient.convert(
++ return PBHelperClient.convertLocatedBlockProto(
rpcProxy.updateBlockForPipeline(null, req).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
@@@ -1419,23 -1407,6 +1419,23 @@@
}
@Override
+ public void setErasureCodingPolicy(String src, ErasureCodingPolicy ecPolicy)
+ throws IOException {
+ final SetErasureCodingPolicyRequestProto.Builder builder =
+ SetErasureCodingPolicyRequestProto.newBuilder();
+ builder.setSrc(src);
+ if (ecPolicy != null) {
- builder.setEcPolicy(PBHelper.convertErasureCodingPolicy(ecPolicy));
++ builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(ecPolicy));
+ }
+ SetErasureCodingPolicyRequestProto req = builder.build();
+ try {
+ rpcProxy.setErasureCodingPolicy(null, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
throws IOException {
SetXAttrRequestProto req = SetXAttrRequestProto.newBuilder()
@@@ -1557,37 -1528,4 +1557,37 @@@
throw ProtobufHelper.getRemoteException(e);
}
}
+
+ @Override
+ public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException {
+ try {
+ GetErasureCodingPoliciesResponseProto response = rpcProxy
+ .getErasureCodingPolicies(null, VOID_GET_EC_POLICIES_REQUEST);
+ ErasureCodingPolicy[] ecPolicies =
+ new ErasureCodingPolicy[response.getEcPoliciesCount()];
+ int i = 0;
+ for (ErasureCodingPolicyProto ecPolicyProto : response.getEcPoliciesList()) {
- ecPolicies[i++] = PBHelper.convertErasureCodingPolicy(ecPolicyProto);
++ ecPolicies[i++] = PBHelperClient.convertErasureCodingPolicy(ecPolicyProto);
+ }
+ return ecPolicies;
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException {
+ GetErasureCodingPolicyRequestProto req = GetErasureCodingPolicyRequestProto.newBuilder()
+ .setSrc(src).build();
+ try {
+ GetErasureCodingPolicyResponseProto response = rpcProxy.getErasureCodingPolicy(
+ null, req);
+ if (response.hasEcPolicy()) {
- return PBHelper.convertErasureCodingPolicy(response.getEcPolicy());
++ return PBHelperClient.convertErasureCodingPolicy(response.getEcPolicy());
+ }
+ return null;
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index f20e58a,18f89f8..194e563
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@@ -281,7 -281,7 +281,7 @@@ public class DatanodeProtocolClientSide
ReportBadBlocksRequestProto.Builder builder = ReportBadBlocksRequestProto
.newBuilder();
for (int i = 0; i < blocks.length; i++) {
- builder.addBlocks(i, PBHelper.convertLocatedBlock(blocks[i]));
- builder.addBlocks(i, PBHelperClient.convert(blocks[i]));
++ builder.addBlocks(i, PBHelperClient.convertLocatedBlock(blocks[i]));
}
ReportBadBlocksRequestProto req = builder.build();
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 1ff80b3,94d1f0c..a1ea9a6
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@@ -259,7 -259,7 +259,7 @@@ public class DatanodeProtocolServerSide
List<LocatedBlockProto> lbps = request.getBlocksList();
LocatedBlock [] blocks = new LocatedBlock [lbps.size()];
for(int i=0; i<lbps.size(); i++) {
- blocks[i] = PBHelper.convertLocatedBlockProto(lbps.get(i));
- blocks[i] = PBHelperClient.convert(lbps.get(i));
++ blocks[i] = PBHelperClient.convertLocatedBlockProto(lbps.get(i));
}
try {
impl.reportBadBlocks(blocks);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index ce39e15,3de4513..ece9984
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@@ -17,108 -17,23 +17,25 @@@
*/
package org.apache.hadoop.hdfs.protocolPB;
- import static com.google.common.base.Preconditions.checkNotNull;
- import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
- .EncryptionZoneProto;
- import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherSuiteProto;
- import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto;
-
- import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
- import java.util.EnumSet;
- import java.util.HashMap;
import java.util.List;
- import java.util.Map;
- import java.util.Map.Entry;
- import java.util.Set;
-
- import org.apache.hadoop.fs.CacheFlag;
- import org.apache.hadoop.fs.ContentSummary;
- import org.apache.hadoop.fs.CreateFlag;
- import org.apache.hadoop.fs.FsServerDefaults;
- import org.apache.hadoop.fs.Path;
+
+ import com.google.protobuf.ByteString;
+
import org.apache.hadoop.fs.StorageType;
- import org.apache.hadoop.fs.XAttr;
- import org.apache.hadoop.fs.XAttrSetFlag;
- import org.apache.hadoop.fs.permission.AclEntry;
- import org.apache.hadoop.fs.permission.AclEntryScope;
- import org.apache.hadoop.fs.permission.AclEntryType;
- import org.apache.hadoop.fs.permission.AclStatus;
- import org.apache.hadoop.fs.permission.FsAction;
- import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.hdfs.DFSUtilClient;
- import org.apache.hadoop.hdfs.inotify.EventBatch;
- import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
- import org.apache.hadoop.hdfs.inotify.Event;
- import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.Block;
- import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
- import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
- import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
- import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
- import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
- import org.apache.hadoop.hdfs.protocol.CachePoolStats;
- import org.apache.hadoop.crypto.CipherOption;
- import org.apache.hadoop.crypto.CipherSuite;
- import org.apache.hadoop.hdfs.protocol.ClientProtocol;
- import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
- import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
- import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
- import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
- import org.apache.hadoop.hdfs.protocol.DirectoryListing;
- import org.apache.hadoop.hdfs.protocol.EncryptionZone;
- import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
- import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
- import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
- import org.apache.hadoop.hdfs.protocol.HdfsConstants;
- import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
- import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
- import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
- import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
- import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
- import org.apache.hadoop.hdfs.protocol.LocatedBlock;
- import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
- import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
- import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
- import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
- import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
- import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
- import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
- import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
- import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto;
- import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryScopeProto;
- import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTypeProto;
- import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto;
- import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto;
- import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
- import org.apache.hadoop.hdfs.protocol.proto.*;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheFlagProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
- import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
- import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
@@@ -130,14 -45,8 +47,17 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
++import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
+ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
+ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
@@@ -381,73 -168,23 +182,34 @@@ public class PBHelper
si, convert(reg.getRole()));
}
- // DatanodeId
- public static DatanodeID convert(DatanodeIDProto dn) {
- return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getDatanodeUuid(),
- dn.getXferPort(), dn.getInfoPort(), dn.hasInfoSecurePort() ? dn
- .getInfoSecurePort() : 0, dn.getIpcPort());
- }
-
- // Arrays of DatanodeId
- public static DatanodeIDProto[] convert(DatanodeID[] did) {
- if (did == null)
- return null;
- final int len = did.length;
- DatanodeIDProto[] result = new DatanodeIDProto[len];
- for (int i = 0; i < len; ++i) {
- result[i] = PBHelperClient.convert(did[i]);
- }
- return result;
- }
-
- public static DatanodeID[] convert(DatanodeIDProto[] did) {
- if (did == null) return null;
- final int len = did.length;
- DatanodeID[] result = new DatanodeID[len];
- for (int i = 0; i < len; ++i) {
- result[i] = convert(did[i]);
- }
- return result;
- }
-
- // Block
- public static BlockProto convert(Block b) {
- return BlockProto.newBuilder().setBlockId(b.getBlockId())
- .setGenStamp(b.getGenerationStamp()).setNumBytes(b.getNumBytes())
- .build();
- }
-
- public static Block convert(BlockProto b) {
- return new Block(b.getBlockId(), b.getNumBytes(), b.getGenStamp());
- }
-
public static BlockWithLocationsProto convert(BlockWithLocations blk) {
- return BlockWithLocationsProto.newBuilder()
- .setBlock(PBHelperClient.convert(blk.getBlock()))
+ BlockWithLocationsProto.Builder builder = BlockWithLocationsProto
- .newBuilder().setBlock(convert(blk.getBlock()))
++ .newBuilder().setBlock(PBHelperClient.convert(blk.getBlock()))
.addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
.addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
- .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes()))
- .build();
+ .addAllStorageTypes(PBHelperClient.convertStorageTypes(blk.getStorageTypes()));
+ if (blk instanceof StripedBlockWithLocations) {
+ StripedBlockWithLocations sblk = (StripedBlockWithLocations) blk;
- builder.setIndices(getByteString(sblk.getIndices()));
++ builder.setIndices(PBHelperClient.getByteString(sblk.getIndices()));
+ builder.setDataBlockNum(sblk.getDataBlockNum());
+ }
+ return builder.build();
}
public static BlockWithLocations convert(BlockWithLocationsProto b) {
final List<String> datanodeUuids = b.getDatanodeUuidsList();
final List<String> storageUuids = b.getStorageUuidsList();
final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
- BlockWithLocations blk = new BlockWithLocations(convert(b.getBlock()),
- return new BlockWithLocations(PBHelperClient.convert(b.getBlock()),
++ BlockWithLocations blk = new BlockWithLocations(PBHelperClient.
++ convert(b.getBlock()),
datanodeUuids.toArray(new String[datanodeUuids.size()]),
storageUuids.toArray(new String[storageUuids.size()]),
- convertStorageTypes(storageTypes, storageUuids.size()));
+ PBHelperClient.convertStorageTypes(storageTypes, storageUuids.size()));
+ if (b.hasIndices()) {
+ blk = new StripedBlockWithLocations(blk, b.getIndices().toByteArray(),
+ (short) b.getDataBlockNum());
+ }
+ return blk;
}
public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
@@@ -596,7 -333,7 +358,7 @@@
if (b == null) {
return null;
}
- LocatedBlockProto lb = PBHelper.convertLocatedBlock(b);
- LocatedBlockProto lb = PBHelperClient.convert((LocatedBlock) b);
++ LocatedBlockProto lb = PBHelperClient.convertLocatedBlock(b);
RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder();
builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
if(b.getNewBlock() != null)
@@@ -606,206 -343,12 +368,12 @@@
public static RecoveringBlock convert(RecoveringBlockProto b) {
ExtendedBlock block = PBHelperClient.convert(b.getBlock().getB());
- DatanodeInfo[] locs = convert(b.getBlock().getLocsList());
+ DatanodeInfo[] locs = PBHelperClient.convert(b.getBlock().getLocsList());
return (b.hasTruncateBlock()) ?
- new RecoveringBlock(block, locs, PBHelper.convert(b.getTruncateBlock())) :
+ new RecoveringBlock(block, locs, PBHelperClient.convert(b.getTruncateBlock())) :
new RecoveringBlock(block, locs, b.getNewGenStamp());
}
-
- static public DatanodeInfo convert(DatanodeInfoProto di) {
- if (di == null) return null;
- return new DatanodeInfo(
- PBHelper.convert(di.getId()),
- di.hasLocation() ? di.getLocation() : null ,
- di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
- di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
- di.getLastUpdate(), di.getLastUpdateMonotonic(),
- di.getXceiverCount(), PBHelper.convert(di.getAdminState()));
- }
-
- static public DatanodeInfo[] convert(DatanodeInfoProto di[]) {
- if (di == null) return null;
- DatanodeInfo[] result = new DatanodeInfo[di.length];
- for (int i = 0; i < di.length; i++) {
- result[i] = convert(di[i]);
- }
- return result;
- }
-
- public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
- DatanodeInfo[] info = new DatanodeInfo[list.size()];
- for (int i = 0; i < info.length; i++) {
- info[i] = convert(list.get(i));
- }
- return info;
- }
-
- public static DatanodeStorageReportProto convertDatanodeStorageReport(
- DatanodeStorageReport report) {
- return DatanodeStorageReportProto.newBuilder()
- .setDatanodeInfo(PBHelperClient.convert(report.getDatanodeInfo()))
- .addAllStorageReports(convertStorageReports(report.getStorageReports()))
- .build();
- }
-
- public static List<DatanodeStorageReportProto> convertDatanodeStorageReports(
- DatanodeStorageReport[] reports) {
- final List<DatanodeStorageReportProto> protos
- = new ArrayList<DatanodeStorageReportProto>(reports.length);
- for(int i = 0; i < reports.length; i++) {
- protos.add(convertDatanodeStorageReport(reports[i]));
- }
- return protos;
- }
-
- public static DatanodeStorageReport convertDatanodeStorageReport(
- DatanodeStorageReportProto proto) {
- return new DatanodeStorageReport(
- convert(proto.getDatanodeInfo()),
- convertStorageReports(proto.getStorageReportsList()));
- }
-
- public static DatanodeStorageReport[] convertDatanodeStorageReports(
- List<DatanodeStorageReportProto> protos) {
- final DatanodeStorageReport[] reports
- = new DatanodeStorageReport[protos.size()];
- for(int i = 0; i < reports.length; i++) {
- reports[i] = convertDatanodeStorageReport(protos.get(i));
- }
- return reports;
- }
-
- public static AdminStates convert(AdminState adminState) {
- switch(adminState) {
- case DECOMMISSION_INPROGRESS:
- return AdminStates.DECOMMISSION_INPROGRESS;
- case DECOMMISSIONED:
- return AdminStates.DECOMMISSIONED;
- case NORMAL:
- default:
- return AdminStates.NORMAL;
- }
- }
-
- public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) {
- if (b == null) return null;
- Builder builder = LocatedBlockProto.newBuilder();
- DatanodeInfo[] locs = b.getLocations();
- List<DatanodeInfo> cachedLocs =
- Lists.newLinkedList(Arrays.asList(b.getCachedLocations()));
- for (int i = 0; i < locs.length; i++) {
- DatanodeInfo loc = locs[i];
- builder.addLocs(i, PBHelperClient.convert(loc));
- boolean locIsCached = cachedLocs.contains(loc);
- builder.addIsCached(locIsCached);
- if (locIsCached) {
- cachedLocs.remove(loc);
- }
- }
- Preconditions.checkArgument(cachedLocs.size() == 0,
- "Found additional cached replica locations that are not in the set of"
- + " storage-backed locations!");
-
- StorageType[] storageTypes = b.getStorageTypes();
- if (storageTypes != null) {
- for (StorageType storageType : storageTypes) {
- builder.addStorageTypes(PBHelperClient.convertStorageType(storageType));
- }
- }
- final String[] storageIDs = b.getStorageIDs();
- if (storageIDs != null) {
- builder.addAllStorageIDs(Arrays.asList(storageIDs));
- }
- if (b instanceof LocatedStripedBlock) {
- LocatedStripedBlock sb = (LocatedStripedBlock) b;
- int[] indices = sb.getBlockIndices();
- Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
- for (int i = 0; i < indices.length; i++) {
- builder.addBlockIndex(indices[i]);
- builder.addBlockTokens(PBHelperClient.convert(blockTokens[i]));
- }
- }
-
- return builder.setB(PBHelperClient.convert(b.getBlock()))
- .setBlockToken(PBHelperClient.convert(b.getBlockToken()))
- .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
- }
-
- public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) {
- if (proto == null) return null;
- List<DatanodeInfoProto> locs = proto.getLocsList();
- DatanodeInfo[] targets = new DatanodeInfo[locs.size()];
- for (int i = 0; i < locs.size(); i++) {
- targets[i] = PBHelper.convert(locs.get(i));
- }
-
- final StorageType[] storageTypes = convertStorageTypes(
- proto.getStorageTypesList(), locs.size());
-
- final int storageIDsCount = proto.getStorageIDsCount();
- final String[] storageIDs;
- if (storageIDsCount == 0) {
- storageIDs = null;
- } else {
- Preconditions.checkState(storageIDsCount == locs.size());
- storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]);
- }
-
- int[] indices = null;
- final int indexCount = proto.getBlockIndexCount();
- if (indexCount > 0) {
- indices = new int[indexCount];
- for (int i = 0; i < indexCount; i++) {
- indices[i] = proto.getBlockIndex(i);
- }
- }
-
- // Set values from the isCached list, re-using references from loc
- List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
- List<Boolean> isCachedList = proto.getIsCachedList();
- for (int i=0; i<isCachedList.size(); i++) {
- if (isCachedList.get(i)) {
- cachedLocs.add(targets[i]);
- }
- }
-
- final LocatedBlock lb;
- if (indices == null) {
- lb = new LocatedBlock(PBHelperClient.convert(proto.getB()), targets,
- storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
- cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
- } else {
- lb = new LocatedStripedBlock(PBHelperClient.convert(proto.getB()), targets,
- storageIDs, storageTypes, indices, proto.getOffset(),
- proto.getCorrupt(),
- cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
- List<TokenProto> tokenProtos = proto.getBlockTokensList();
- Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
- for (int i = 0; i < indices.length; i++) {
- blockTokens[i] = PBHelper.convert(tokenProtos.get(i));
- }
- ((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
- }
- lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
-
- return lb;
- }
-
- public static Token<BlockTokenIdentifier> convert(
- TokenProto blockToken) {
- return new Token<BlockTokenIdentifier>(blockToken.getIdentifier()
- .toByteArray(), blockToken.getPassword().toByteArray(), new Text(
- blockToken.getKind()), new Text(blockToken.getService()));
- }
-
--
- public static Token<DelegationTokenIdentifier> convertDelegationToken(
- TokenProto blockToken) {
- return new Token<DelegationTokenIdentifier>(blockToken.getIdentifier()
- .toByteArray(), blockToken.getPassword().toByteArray(), new Text(
- blockToken.getKind()), new Text(blockToken.getService()));
- }
+
public static ReplicaState convert(ReplicaStateProto state) {
switch (state) {
case RBW:
@@@ -1198,549 -704,8 +736,7 @@@
.setCapabilities(info.getCapabilities())
.build();
}
-
- // Located Block Arrays and Lists
- public static LocatedBlockProto[] convertLocatedBlocks(LocatedBlock[] lb) {
- if (lb == null) return null;
- return convertLocatedBlocks2(Arrays.asList(lb))
- .toArray(new LocatedBlockProto[lb.length]);
- }
-
- public static LocatedBlock[] convertLocatedBlocks(LocatedBlockProto[] lb) {
- if (lb == null) return null;
- return convertLocatedBlocks(Arrays.asList(lb))
- .toArray(new LocatedBlock[lb.length]);
- }
-
- public static List<LocatedBlock> convertLocatedBlocks(
- List<LocatedBlockProto> lb) {
- if (lb == null) return null;
- final int len = lb.size();
- List<LocatedBlock> result = new ArrayList<>(len);
- for (LocatedBlockProto aLb : lb) {
- result.add(PBHelper.convertLocatedBlockProto(aLb));
- }
- return result;
- }
-
- public static List<LocatedBlockProto> convertLocatedBlocks2(
- List<LocatedBlock> lb) {
- if (lb == null) return null;
- final int len = lb.size();
- List<LocatedBlockProto> result = new ArrayList<>(len);
- for (LocatedBlock aLb : lb) {
- result.add(PBHelper.convertLocatedBlock(aLb));
- }
- return result;
- }
-
-
- // LocatedBlocks
- public static LocatedBlocks convert(LocatedBlocksProto lb) {
- return new LocatedBlocks(
- lb.getFileLength(), lb.getUnderConstruction(),
- PBHelper.convertLocatedBlocks(lb.getBlocksList()),
- lb.hasLastBlock() ?
- PBHelper.convertLocatedBlockProto(lb.getLastBlock()) : null,
- lb.getIsLastBlockComplete(),
- lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) : null,
- lb.hasEcPolicy() ? convertErasureCodingPolicy(lb.getEcPolicy()) : null);
- }
-
- public static LocatedBlocksProto convert(LocatedBlocks lb) {
- if (lb == null) {
- return null;
- }
- LocatedBlocksProto.Builder builder =
- LocatedBlocksProto.newBuilder();
- if (lb.getLastLocatedBlock() != null) {
- builder.setLastBlock(
- PBHelper.convertLocatedBlock(lb.getLastLocatedBlock()));
- }
- if (lb.getFileEncryptionInfo() != null) {
- builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo()));
- }
- if (lb.getErasureCodingPolicy() != null) {
- builder.setEcPolicy(convertErasureCodingPolicy(lb.getErasureCodingPolicy()));
- }
- return builder.setFileLength(lb.getFileLength())
- .setUnderConstruction(lb.isUnderConstruction())
- .addAllBlocks(PBHelper.convertLocatedBlocks2(lb.getLocatedBlocks()))
- .setIsLastBlockComplete(lb.isLastBlockComplete()).build();
- }
-
- // DataEncryptionKey
- public static DataEncryptionKey convert(DataEncryptionKeyProto bet) {
- String encryptionAlgorithm = bet.getEncryptionAlgorithm();
- return new DataEncryptionKey(bet.getKeyId(),
- bet.getBlockPoolId(),
- bet.getNonce().toByteArray(),
- bet.getEncryptionKey().toByteArray(),
- bet.getExpiryDate(),
- encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm);
- }
-
- public static DataEncryptionKeyProto convert(DataEncryptionKey bet) {
- DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder()
- .setKeyId(bet.keyId)
- .setBlockPoolId(bet.blockPoolId)
- .setNonce(ByteString.copyFrom(bet.nonce))
- .setEncryptionKey(ByteString.copyFrom(bet.encryptionKey))
- .setExpiryDate(bet.expiryDate);
- if (bet.encryptionAlgorithm != null) {
- b.setEncryptionAlgorithm(bet.encryptionAlgorithm);
- }
- return b.build();
- }
-
- public static FsServerDefaults convert(FsServerDefaultsProto fs) {
- if (fs == null) return null;
- return new FsServerDefaults(
- fs.getBlockSize(), fs.getBytesPerChecksum(),
- fs.getWritePacketSize(), (short) fs.getReplication(),
- fs.getFileBufferSize(),
- fs.getEncryptDataTransfer(),
- fs.getTrashInterval(),
- PBHelperClient.convert(fs.getChecksumType()));
- }
-
- public static FsServerDefaultsProto convert(FsServerDefaults fs) {
- if (fs == null) return null;
- return FsServerDefaultsProto.newBuilder().
- setBlockSize(fs.getBlockSize()).
- setBytesPerChecksum(fs.getBytesPerChecksum()).
- setWritePacketSize(fs.getWritePacketSize())
- .setReplication(fs.getReplication())
- .setFileBufferSize(fs.getFileBufferSize())
- .setEncryptDataTransfer(fs.getEncryptDataTransfer())
- .setTrashInterval(fs.getTrashInterval())
- .setChecksumType(PBHelperClient.convert(fs.getChecksumType()))
- .build();
- }
-
- public static FsPermissionProto convert(FsPermission p) {
- return FsPermissionProto.newBuilder().setPerm(p.toExtendedShort()).build();
- }
-
- public static FsPermission convert(FsPermissionProto p) {
- return new FsPermissionExtension((short)p.getPerm());
- }
-
-
- // The creatFlag field in PB is a bitmask whose values are the same a the
- // emum values of CreateFlag
- public static int convertCreateFlag(EnumSetWritable<CreateFlag> flag) {
- int value = 0;
- if (flag.contains(CreateFlag.APPEND)) {
- value |= CreateFlagProto.APPEND.getNumber();
- }
- if (flag.contains(CreateFlag.CREATE)) {
- value |= CreateFlagProto.CREATE.getNumber();
- }
- if (flag.contains(CreateFlag.OVERWRITE)) {
- value |= CreateFlagProto.OVERWRITE.getNumber();
- }
- if (flag.contains(CreateFlag.LAZY_PERSIST)) {
- value |= CreateFlagProto.LAZY_PERSIST.getNumber();
- }
- if (flag.contains(CreateFlag.NEW_BLOCK)) {
- value |= CreateFlagProto.NEW_BLOCK.getNumber();
- }
- return value;
- }
-
- public static EnumSetWritable<CreateFlag> convertCreateFlag(int flag) {
- EnumSet<CreateFlag> result =
- EnumSet.noneOf(CreateFlag.class);
- if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) {
- result.add(CreateFlag.APPEND);
- }
- if ((flag & CreateFlagProto.CREATE_VALUE) == CreateFlagProto.CREATE_VALUE) {
- result.add(CreateFlag.CREATE);
- }
- if ((flag & CreateFlagProto.OVERWRITE_VALUE)
- == CreateFlagProto.OVERWRITE_VALUE) {
- result.add(CreateFlag.OVERWRITE);
- }
- if ((flag & CreateFlagProto.LAZY_PERSIST_VALUE)
- == CreateFlagProto.LAZY_PERSIST_VALUE) {
- result.add(CreateFlag.LAZY_PERSIST);
- }
- if ((flag & CreateFlagProto.NEW_BLOCK_VALUE)
- == CreateFlagProto.NEW_BLOCK_VALUE) {
- result.add(CreateFlag.NEW_BLOCK);
- }
- return new EnumSetWritable<CreateFlag>(result, CreateFlag.class);
- }
-
- public static int convertCacheFlags(EnumSet<CacheFlag> flags) {
- int value = 0;
- if (flags.contains(CacheFlag.FORCE)) {
- value |= CacheFlagProto.FORCE.getNumber();
- }
- return value;
- }
-
- public static EnumSet<CacheFlag> convertCacheFlags(int flags) {
- EnumSet<CacheFlag> result = EnumSet.noneOf(CacheFlag.class);
- if ((flags & CacheFlagProto.FORCE_VALUE) == CacheFlagProto.FORCE_VALUE) {
- result.add(CacheFlag.FORCE);
- }
- return result;
- }
-
- public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
- if (fs == null)
- return null;
- return new HdfsLocatedFileStatus(
- fs.getLength(), fs.getFileType().equals(FileType.IS_DIR),
- fs.getBlockReplication(), fs.getBlocksize(),
- fs.getModificationTime(), fs.getAccessTime(),
- PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(),
- fs.getFileType().equals(FileType.IS_SYMLINK) ?
- fs.getSymlink().toByteArray() : null,
- fs.getPath().toByteArray(),
- fs.hasFileId()? fs.getFileId(): HdfsConstants.GRANDFATHER_INODE_ID,
- fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null,
- fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
- fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
- fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
- : HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED,
- fs.hasEcPolicy() ? PBHelper.convertErasureCodingPolicy(fs.getEcPolicy()) : null);
- }
-
- public static SnapshottableDirectoryStatus convert(
- SnapshottableDirectoryStatusProto sdirStatusProto) {
- if (sdirStatusProto == null) {
- return null;
- }
- final HdfsFileStatusProto status = sdirStatusProto.getDirStatus();
- return new SnapshottableDirectoryStatus(
- status.getModificationTime(),
- status.getAccessTime(),
- PBHelper.convert(status.getPermission()),
- status.getOwner(),
- status.getGroup(),
- status.getPath().toByteArray(),
- status.getFileId(),
- status.getChildrenNum(),
- sdirStatusProto.getSnapshotNumber(),
- sdirStatusProto.getSnapshotQuota(),
- sdirStatusProto.getParentFullpath().toByteArray());
- }
-
- public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
- if (fs == null)
- return null;
- FileType fType = FileType.IS_FILE;
- if (fs.isDir()) {
- fType = FileType.IS_DIR;
- } else if (fs.isSymlink()) {
- fType = FileType.IS_SYMLINK;
- }
-
- HdfsFileStatusProto.Builder builder =
- HdfsFileStatusProto.newBuilder().
- setLength(fs.getLen()).
- setFileType(fType).
- setBlockReplication(fs.getReplication()).
- setBlocksize(fs.getBlockSize()).
- setModificationTime(fs.getModificationTime()).
- setAccessTime(fs.getAccessTime()).
- setPermission(PBHelper.convert(fs.getPermission())).
- setOwner(fs.getOwner()).
- setGroup(fs.getGroup()).
- setFileId(fs.getFileId()).
- setChildrenNum(fs.getChildrenNum()).
- setPath(ByteString.copyFrom(fs.getLocalNameInBytes())).
- setStoragePolicy(fs.getStoragePolicy());
- if (fs.isSymlink()) {
- builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
- }
- if (fs.getFileEncryptionInfo() != null) {
- builder.setFileEncryptionInfo(convert(fs.getFileEncryptionInfo()));
- }
- if (fs instanceof HdfsLocatedFileStatus) {
- final HdfsLocatedFileStatus lfs = (HdfsLocatedFileStatus) fs;
- LocatedBlocks locations = lfs.getBlockLocations();
- if (locations != null) {
- builder.setLocations(PBHelper.convert(locations));
- }
- }
- if(fs.getErasureCodingPolicy() != null) {
- builder.setEcPolicy(PBHelper.convertErasureCodingPolicy(fs.getErasureCodingPolicy()));
- }
- return builder.build();
- }
-
- public static SnapshottableDirectoryStatusProto convert(
- SnapshottableDirectoryStatus status) {
- if (status == null) {
- return null;
- }
- int snapshotNumber = status.getSnapshotNumber();
- int snapshotQuota = status.getSnapshotQuota();
- byte[] parentFullPath = status.getParentFullPath();
- ByteString parentFullPathBytes = ByteString.copyFrom(
- parentFullPath == null ? DFSUtilClient.EMPTY_BYTES : parentFullPath);
- HdfsFileStatusProto fs = convert(status.getDirStatus());
- SnapshottableDirectoryStatusProto.Builder builder =
- SnapshottableDirectoryStatusProto
- .newBuilder().setSnapshotNumber(snapshotNumber)
- .setSnapshotQuota(snapshotQuota).setParentFullpath(parentFullPathBytes)
- .setDirStatus(fs);
- return builder.build();
- }
-
- public static HdfsFileStatusProto[] convert(HdfsFileStatus[] fs) {
- if (fs == null) return null;
- final int len = fs.length;
- HdfsFileStatusProto[] result = new HdfsFileStatusProto[len];
- for (int i = 0; i < len; ++i) {
- result[i] = PBHelper.convert(fs[i]);
- }
- return result;
- }
-
- public static HdfsFileStatus[] convert(HdfsFileStatusProto[] fs) {
- if (fs == null) return null;
- final int len = fs.length;
- HdfsFileStatus[] result = new HdfsFileStatus[len];
- for (int i = 0; i < len; ++i) {
- result[i] = PBHelper.convert(fs[i]);
- }
- return result;
- }
-
- public static DirectoryListing convert(DirectoryListingProto dl) {
- if (dl == null)
- return null;
- List<HdfsFileStatusProto> partList = dl.getPartialListingList();
- return new DirectoryListing(
- partList.isEmpty() ? new HdfsLocatedFileStatus[0]
- : PBHelper.convert(
- partList.toArray(new HdfsFileStatusProto[partList.size()])),
- dl.getRemainingEntries());
- }
-
- public static DirectoryListingProto convert(DirectoryListing d) {
- if (d == null)
- return null;
- return DirectoryListingProto.newBuilder().
- addAllPartialListing(Arrays.asList(
- PBHelper.convert(d.getPartialListing()))).
- setRemainingEntries(d.getRemainingEntries()).
- build();
- }
-
- public static long[] convert(GetFsStatsResponseProto res) {
- long[] result = new long[7];
- result[ClientProtocol.GET_STATS_CAPACITY_IDX] = res.getCapacity();
- result[ClientProtocol.GET_STATS_USED_IDX] = res.getUsed();
- result[ClientProtocol.GET_STATS_REMAINING_IDX] = res.getRemaining();
- result[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = res.getUnderReplicated();
- result[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = res.getCorruptBlocks();
- result[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = res.getMissingBlocks();
- result[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX] =
- res.getMissingReplOneBlocks();
- return result;
- }
-
- public static GetFsStatsResponseProto convert(long[] fsStats) {
- GetFsStatsResponseProto.Builder result = GetFsStatsResponseProto
- .newBuilder();
- if (fsStats.length >= ClientProtocol.GET_STATS_CAPACITY_IDX + 1)
- result.setCapacity(fsStats[ClientProtocol.GET_STATS_CAPACITY_IDX]);
- if (fsStats.length >= ClientProtocol.GET_STATS_USED_IDX + 1)
- result.setUsed(fsStats[ClientProtocol.GET_STATS_USED_IDX]);
- if (fsStats.length >= ClientProtocol.GET_STATS_REMAINING_IDX + 1)
- result.setRemaining(fsStats[ClientProtocol.GET_STATS_REMAINING_IDX]);
- if (fsStats.length >= ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX + 1)
- result.setUnderReplicated(
- fsStats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]);
- if (fsStats.length >= ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX + 1)
- result.setCorruptBlocks(
- fsStats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX]);
- if (fsStats.length >= ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX + 1)
- result.setMissingBlocks(
- fsStats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]);
- if (fsStats.length >= ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX + 1)
- result.setMissingReplOneBlocks(
- fsStats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX]);
- return result.build();
- }
-
- public static DatanodeReportTypeProto
- convert(DatanodeReportType t) {
- switch (t) {
- case ALL: return DatanodeReportTypeProto.ALL;
- case LIVE: return DatanodeReportTypeProto.LIVE;
- case DEAD: return DatanodeReportTypeProto.DEAD;
- case DECOMMISSIONING: return DatanodeReportTypeProto.DECOMMISSIONING;
- default:
- throw new IllegalArgumentException("Unexpected data type report:" + t);
- }
- }
-
- public static DatanodeReportType
- convert(DatanodeReportTypeProto t) {
- switch (t) {
- case ALL: return DatanodeReportType.ALL;
- case LIVE: return DatanodeReportType.LIVE;
- case DEAD: return DatanodeReportType.DEAD;
- case DECOMMISSIONING: return DatanodeReportType.DECOMMISSIONING;
- default:
- throw new IllegalArgumentException("Unexpected data type report:" + t);
- }
- }
-
- public static SafeModeActionProto convert(
- SafeModeAction a) {
- switch (a) {
- case SAFEMODE_LEAVE:
- return SafeModeActionProto.SAFEMODE_LEAVE;
- case SAFEMODE_ENTER:
- return SafeModeActionProto.SAFEMODE_ENTER;
- case SAFEMODE_GET:
- return SafeModeActionProto.SAFEMODE_GET;
- default:
- throw new IllegalArgumentException("Unexpected SafeModeAction :" + a);
- }
- }
-
- public static SafeModeAction convert(
- ClientNamenodeProtocolProtos.SafeModeActionProto a) {
- switch (a) {
- case SAFEMODE_LEAVE:
- return SafeModeAction.SAFEMODE_LEAVE;
- case SAFEMODE_ENTER:
- return SafeModeAction.SAFEMODE_ENTER;
- case SAFEMODE_GET:
- return SafeModeAction.SAFEMODE_GET;
- default:
- throw new IllegalArgumentException("Unexpected SafeModeAction :" + a);
- }
- }
-
- public static RollingUpgradeActionProto convert(RollingUpgradeAction a) {
- switch (a) {
- case QUERY:
- return RollingUpgradeActionProto.QUERY;
- case PREPARE:
- return RollingUpgradeActionProto.START;
- case FINALIZE:
- return RollingUpgradeActionProto.FINALIZE;
- default:
- throw new IllegalArgumentException("Unexpected value: " + a);
- }
- }
-
- public static RollingUpgradeAction convert(RollingUpgradeActionProto a) {
- switch (a) {
- case QUERY:
- return RollingUpgradeAction.QUERY;
- case START:
- return RollingUpgradeAction.PREPARE;
- case FINALIZE:
- return RollingUpgradeAction.FINALIZE;
- default:
- throw new IllegalArgumentException("Unexpected value: " + a);
- }
- }
-
- public static RollingUpgradeStatusProto convertRollingUpgradeStatus(
- RollingUpgradeStatus status) {
- return RollingUpgradeStatusProto.newBuilder()
- .setBlockPoolId(status.getBlockPoolId())
- .setFinalized(status.isFinalized())
- .build();
- }
-
- public static RollingUpgradeStatus convert(RollingUpgradeStatusProto proto) {
- return new RollingUpgradeStatus(proto.getBlockPoolId(),
- proto.getFinalized());
- }
-
- public static RollingUpgradeInfoProto convert(RollingUpgradeInfo info) {
- return RollingUpgradeInfoProto.newBuilder()
- .setStatus(convertRollingUpgradeStatus(info))
- .setCreatedRollbackImages(info.createdRollbackImages())
- .setStartTime(info.getStartTime())
- .setFinalizeTime(info.getFinalizeTime())
- .build();
- }
-
- public static RollingUpgradeInfo convert(RollingUpgradeInfoProto proto) {
- RollingUpgradeStatusProto status = proto.getStatus();
- return new RollingUpgradeInfo(status.getBlockPoolId(),
- proto.getCreatedRollbackImages(),
- proto.getStartTime(), proto.getFinalizeTime());
- }
-
- public static CorruptFileBlocks convert(CorruptFileBlocksProto c) {
- if (c == null)
- return null;
- List<String> fileList = c.getFilesList();
- return new CorruptFileBlocks(fileList.toArray(new String[fileList.size()]),
- c.getCookie());
- }
-
- public static CorruptFileBlocksProto convert(CorruptFileBlocks c) {
- if (c == null)
- return null;
- return CorruptFileBlocksProto.newBuilder().
- addAllFiles(Arrays.asList(c.getFiles())).
- setCookie(c.getCookie()).
- build();
- }
-
- public static ContentSummary convert(ContentSummaryProto cs) {
- if (cs == null) return null;
- ContentSummary.Builder builder = new ContentSummary.Builder();
- builder.length(cs.getLength()).
- fileCount(cs.getFileCount()).
- directoryCount(cs.getDirectoryCount()).
- quota(cs.getQuota()).
- spaceConsumed(cs.getSpaceConsumed()).
- spaceQuota(cs.getSpaceQuota());
- if (cs.hasTypeQuotaInfos()) {
- for (HdfsProtos.StorageTypeQuotaInfoProto info :
- cs.getTypeQuotaInfos().getTypeQuotaInfoList()) {
- StorageType type = PBHelperClient.convertStorageType(info.getType());
- builder.typeConsumed(type, info.getConsumed());
- builder.typeQuota(type, info.getQuota());
- }
- }
- return builder.build();
- }
-
- public static ContentSummaryProto convert(ContentSummary cs) {
- if (cs == null) return null;
- ContentSummaryProto.Builder builder = ContentSummaryProto.newBuilder();
- builder.setLength(cs.getLength()).
- setFileCount(cs.getFileCount()).
- setDirectoryCount(cs.getDirectoryCount()).
- setQuota(cs.getQuota()).
- setSpaceConsumed(cs.getSpaceConsumed()).
- setSpaceQuota(cs.getSpaceQuota());
-
- if (cs.isTypeQuotaSet() || cs.isTypeConsumedAvailable()) {
- HdfsProtos.StorageTypeQuotaInfosProto.Builder isb =
- HdfsProtos.StorageTypeQuotaInfosProto.newBuilder();
- for (StorageType t: StorageType.getTypesSupportingQuota()) {
- HdfsProtos.StorageTypeQuotaInfoProto info =
- HdfsProtos.StorageTypeQuotaInfoProto.newBuilder().
- setType(PBHelperClient.convertStorageType(t)).
- setConsumed(cs.getTypeConsumed(t)).
- setQuota(cs.getTypeQuota(t)).
- build();
- isb.addTypeQuotaInfo(info);
- }
- builder.setTypeQuotaInfos(isb);
- }
- return builder.build();
- }
-
public static NNHAStatusHeartbeat convert(NNHAStatusHeartbeatProto s) {
if (s == null) return null;
switch (s.getState()) {
@@@ -2933,179 -790,4 +821,140 @@@
setLeaseId(context.getLeaseId()).
build();
}
+
- public static ECSchema convertECSchema(ECSchemaProto schema) {
- List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
- Map<String, String> options = new HashMap<>(optionsList.size());
- for (ECSchemaOptionEntryProto option : optionsList) {
- options.put(option.getKey(), option.getValue());
++ private static List<Integer> convertIntArray(short[] liveBlockIndices) {
++ List<Integer> liveBlockIndicesList = new ArrayList<>();
++ for (short s : liveBlockIndices) {
++ liveBlockIndicesList.add((int) s);
+ }
- return new ECSchema(schema.getCodecName(), schema.getDataUnits(),
- schema.getParityUnits(), options);
- }
-
- public static ECSchemaProto convertECSchema(ECSchema schema) {
- ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
- .setCodecName(schema.getCodecName())
- .setDataUnits(schema.getNumDataUnits())
- .setParityUnits(schema.getNumParityUnits());
- Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet();
- for (Entry<String, String> entry : entrySet) {
- builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
- .setKey(entry.getKey()).setValue(entry.getValue()).build());
++ return liveBlockIndicesList;
++ }
++
++ private static StorageTypesProto convertStorageTypesProto(
++ StorageType[] targetStorageTypes) {
++ StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
++ for (StorageType storageType : targetStorageTypes) {
++ builder.addStorageTypes(PBHelperClient.convertStorageType(storageType));
+ }
+ return builder.build();
+ }
+
- public static ErasureCodingPolicy convertErasureCodingPolicy(
- ErasureCodingPolicyProto policy) {
- return new ErasureCodingPolicy(policy.getName(),
- convertECSchema(policy.getSchema()),
- policy.getCellSize());
++ private static HdfsProtos.StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
++ HdfsProtos.StorageUuidsProto.Builder builder = HdfsProtos.StorageUuidsProto.newBuilder();
++ for (String storageUuid : targetStorageIDs) {
++ builder.addStorageUuids(storageUuid);
++ }
++ return builder.build();
+ }
+
- public static ErasureCodingPolicyProto convertErasureCodingPolicy(
- ErasureCodingPolicy policy) {
- ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto
- .newBuilder()
- .setName(policy.getName())
- .setSchema(convertECSchema(policy.getSchema()))
- .setCellSize(policy.getCellSize());
++ private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
++ DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
++ for (DatanodeInfo datanodeInfo : dnInfos) {
++ builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
++ }
+ return builder.build();
+ }
-
++
++ private static String[] convert(HdfsProtos.StorageUuidsProto targetStorageUuidsProto) {
++ List<String> storageUuidsList = targetStorageUuidsProto
++ .getStorageUuidsList();
++ String[] storageUuids = new String[storageUuidsList.size()];
++ for (int i = 0; i < storageUuidsList.size(); i++) {
++ storageUuids[i] = storageUuidsList.get(i);
++ }
++ return storageUuids;
++ }
++
+ public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
+ BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
+ ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
+ ExtendedBlock block = PBHelperClient.convert(blockProto);
+
+ DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
+ .getSourceDnInfos();
- DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto);
++ DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto);
+
+ DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
+ .getTargetDnInfos();
- DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto);
++ DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto);
+
- StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
++ HdfsProtos.StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
+ .getTargetStorageUuids();
+ String[] targetStorageUuids = convert(targetStorageUuidsProto);
+
+ StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
+ .getTargetStorageTypes();
- StorageType[] convertStorageTypes = convertStorageTypes(
++ StorageType[] convertStorageTypes = PBHelperClient.convertStorageTypes(
+ targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
+ .getStorageTypesList().size());
+
+ List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto
+ .getLiveBlockIndicesList();
+ short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
+ for (int i = 0; i < liveBlockIndicesList.size(); i++) {
+ liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
+ }
+
+ ErasureCodingPolicy ecPolicy =
- convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy());
++ PBHelperClient.convertErasureCodingPolicy(
++ blockEcRecoveryInfoProto.getEcPolicy());
+
+ return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
+ targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
+ }
+
+ public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
+ BlockECRecoveryInfo blockEcRecoveryInfo) {
+ BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
+ .newBuilder();
+ builder.setBlock(PBHelperClient.convert(
+ blockEcRecoveryInfo.getExtendedBlock()));
+
+ DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos();
+ builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+
+ DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos();
+ builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+
+ String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs();
+ builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs));
+
+ StorageType[] targetStorageTypes = blockEcRecoveryInfo
+ .getTargetStorageTypes();
+ builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+
+ short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
+ builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
+
- builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo
- .getErasureCodingPolicy()));
-
- return builder.build();
- }
-
- private static List<Integer> convertIntArray(short[] liveBlockIndices) {
- List<Integer> liveBlockIndicesList = new ArrayList<Integer>();
- for (short s : liveBlockIndices) {
- liveBlockIndicesList.add((int) s);
- }
- return liveBlockIndicesList;
- }
++ builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
++ blockEcRecoveryInfo.getErasureCodingPolicy()));
+
- private static StorageTypesProto convertStorageTypesProto(
- StorageType[] targetStorageTypes) {
- StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
- for (StorageType storageType : targetStorageTypes) {
- builder.addStorageTypes(PBHelperClient.convertStorageType(storageType));
- }
+ return builder.build();
+ }
+
- private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
- StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder();
- for (String storageUuid : targetStorageIDs) {
- builder.addStorageUuids(storageUuid);
- }
- return builder.build();
- }
-
- private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
- DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
- for (DatanodeInfo datanodeInfo : dnInfos) {
- builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
- }
- return builder.build();
- }
-
- private static String[] convert(StorageUuidsProto targetStorageUuidsProto) {
- List<String> storageUuidsList = targetStorageUuidsProto
- .getStorageUuidsList();
- String[] storageUuids = new String[storageUuidsList.size()];
- for (int i = 0; i < storageUuidsList.size(); i++) {
- storageUuids[i] = storageUuidsList.get(i);
- }
- return storageUuids;
- }
-
+ public static BlockECRecoveryCommandProto convert(
+ BlockECRecoveryCommand blkECRecoveryCmd) {
+ BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
+ .newBuilder();
+ Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
+ .getECTasks();
+ for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
+ builder
+ .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
+ }
+ return builder.build();
+ }
-
++
+ public static BlockECRecoveryCommand convert(
+ BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
- Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>();
++ Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<>();
+ List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
+ .getBlockECRecoveryinfoList();
+ for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
+ blkECRecoveryInfos
+ .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
+ }
+ return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
+ blkECRecoveryInfos);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1211169,b0a11fe..e7f9262
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@@ -1160,8 -1107,8 +1176,8 @@@ public class BlockManager implements Bl
* Adds block to list of blocks which will be invalidated on all its
* datanodes.
*/
- private void addToInvalidates(Block b) {
+ private void addToInvalidates(BlockInfo storedBlock) {
- if (!namesystem.isPopulatingReplQueues()) {
+ if (!isPopulatingReplQueues()) {
return;
}
StringBuilder datanodes = new StringBuilder();
@@@ -1287,8 -1215,8 +1303,8 @@@
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|| corruptedDuringWrite) {
// the block is over-replicated so invalidate the replicas immediately
- invalidateBlock(b, node);
+ invalidateBlock(b, node, numberOfReplicas);
- } else if (namesystem.isPopulatingReplQueues()) {
+ } else if (isPopulatingReplQueues()) {
// add the block to neededReplication
updateNeededReplications(b.getStored(), -1, 0);
}
@@@ -2654,9 -2488,9 +2670,9 @@@
DatanodeStorageInfo storageInfo)
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
-- if (!namesystem.isInStartupSafeMode()
- || namesystem.isPopulatingReplQueues()) {
++ if (!namesystem.isInStartupSafeMode()
+ || isPopulatingReplQueues()) {
- addStoredBlock(storedBlock, storageInfo, null, false);
+ addStoredBlock(storedBlock, reported, storageInfo, null, false);
return;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index a80bfd6,6d199d7..fb86ff3
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@@ -542,12 -546,12 +542,12 @@@ public class DecommissionManager
if (blockManager.isNeededReplication(block, liveReplicas)) {
if (!blockManager.neededReplications.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 &&
- namesystem.isPopulatingReplQueues()) {
+ blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block,
- curReplicas,
+ liveReplicas,
num.decommissionedAndDecommissioning(),
- block.getReplication());
+ blockManager.getExpectedReplicaNum(block));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 82a0f62,2aad83d..9228bec
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@@ -1156,8 -1175,7 +1179,9 @@@ public class DataNode extends Reconfigu
saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
+ // Initialize ErasureCoding worker
+ ecWorker = new ErasureCodingWorker(conf, this);
+ startMetricsLogger(conf);
}
/**
@@@ -3264,9 -3256,72 +3291,76 @@@
checkSuperuserPrivilege();
spanReceiverHost.removeSpanReceiver(id);
}
+
+ public ErasureCodingWorker getErasureCodingWorker(){
+ return ecWorker;
-
++ }
+
+ /**
+ * Get timeout value of each OOB type from configuration
+ */
+ private void initOOBTimeout() {
+ final int oobStart = Status.OOB_RESTART_VALUE; // the first OOB type
+ final int oobEnd = Status.OOB_RESERVED3_VALUE; // the last OOB type
+ final int numOobTypes = oobEnd - oobStart + 1;
+ oobTimeouts = new long[numOobTypes];
+
+ final String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
+ DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
+ for (int i = 0; i < numOobTypes; i++) {
+ oobTimeouts[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0;
+ }
+ }
+
+ /**
+ * Get the timeout to be used for transmitting the OOB type
+ * @return the timeout in milliseconds
+ */
+ public long getOOBTimeout(Status status)
+ throws IOException {
+ if (status.getNumber() < Status.OOB_RESTART_VALUE ||
+ status.getNumber() > Status.OOB_RESERVED3_VALUE) {
+ // Not an OOB.
+ throw new IOException("Not an OOB status: " + status);
+ }
+
+ return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE];
+ }
+
+ /**
+ * Start a timer to periodically write DataNode metrics to the log file. This
+ * behavior can be disabled by configuration.
+ *
+ * @param metricConf
+ */
+ protected void startMetricsLogger(Configuration metricConf) {
+ long metricsLoggerPeriodSec = metricConf.getInt(
+ DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
+ DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT);
+
+ if (metricsLoggerPeriodSec <= 0) {
+ return;
+ }
+
+ MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG);
+
+ // Schedule the periodic logging.
+ metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
+ metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(METRICS_LOG,
+ "DataNode", (short) 0), metricsLoggerPeriodSec, metricsLoggerPeriodSec,
+ TimeUnit.SECONDS);
+ }
+
+ protected void stopMetricsLogger() {
+ if (metricsLoggerTimer != null) {
+ metricsLoggerTimer.shutdown();
+ metricsLoggerTimer = null;
+ }
+ }
+
+ @VisibleForTesting
+ ScheduledThreadPoolExecutor getMetricsLoggerTimer() {
+ return metricsLoggerTimer;
}
}