You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/05/18 06:14:09 UTC
[hbase] branch branch-2.2 updated: HBASE-22400 Remove the adapter
code in async fs implementation for hadoop-2.7.x
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new a612537 HBASE-22400 Remove the adapter code in async fs implementation for hadoop-2.7.x
a612537 is described below
commit a6125374825bd513f5f979abfe4ed48828c4c8ea
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed May 15 09:59:41 2019 +0800
HBASE-22400 Remove the adapter code in async fs implementation for hadoop-2.7.x
---
.../FanOutOneBlockAsyncDFSOutputHelper.java | 328 ++-------------------
.../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 66 +----
2 files changed, 38 insertions(+), 356 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 34d9f29..0c8a49b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -21,9 +21,9 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.ConnectionUtils;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@@ -71,23 +73,19 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationH
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
@@ -136,35 +134,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
- // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
- // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
- // get from proto directly, or combined by the reply field of the proto and a ECN object. See
- // createPipelineAckStatusGetter for more details.
- private interface PipelineAckStatusGetter {
- Status get(PipelineAckProto ack);
- }
-
- private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
-
- // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
- // we need to use reflection to set it.See createStorageTypeSetter for more details.
- private interface StorageTypeSetter {
- OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
- }
-
- private static final StorageTypeSetter STORAGE_TYPE_SETTER;
-
- // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
- // hadoop 2.8 or later. See createBlockAdder for more details.
- private interface BlockAdder {
-
- LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
- ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
- throws IOException;
- }
-
- private static final BlockAdder BLOCK_ADDER;
-
private interface LeaseManager {
void begin(DFSClient client, long inodeId);
@@ -183,23 +152,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
- // helper class for convert protos.
- private interface PBHelper {
-
- ExtendedBlockProto convert(ExtendedBlock b);
-
- TokenProto convert(Token<?> tok);
- }
-
- private static final PBHelper PB_HELPER;
-
- // helper class for creating data checksum.
- private interface ChecksumCreater {
- DataChecksum createChecksum(DFSClient client);
- }
-
- private static final ChecksumCreater CHECKSUM_CREATER;
-
// helper class for creating files.
private interface FileCreator {
default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
@@ -269,234 +221,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
};
}
- private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
- throws NoSuchMethodException {
- Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
- @SuppressWarnings("rawtypes")
- Class<? extends Enum> ecnClass;
- try {
- ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
- .asSubclass(Enum.class);
- } catch (ClassNotFoundException e) {
- String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
- "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
- "HBASE-16110 for more information.";
- LOG.error(msg, e);
- throw new Error(msg, e);
- }
- @SuppressWarnings("unchecked")
- Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
- Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
- Method combineHeaderMethod =
- PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
- Method getStatusFromHeaderMethod =
- PipelineAck.class.getMethod("getStatusFromHeader", int.class);
- return new PipelineAckStatusGetter() {
-
- @Override
- public Status get(PipelineAckProto ack) {
- try {
- @SuppressWarnings("unchecked")
- List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
- Integer headerFlag;
- if (flagList.isEmpty()) {
- Status reply = (Status) getReplyMethod.invoke(ack, 0);
- headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
- } else {
- headerFlag = flagList.get(0);
- }
- return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
- throws NoSuchMethodException {
- Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
- return new PipelineAckStatusGetter() {
-
- @Override
- public Status get(PipelineAckProto ack) {
- try {
- return (Status) getStatusMethod.invoke(ack, 0);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private static PipelineAckStatusGetter createPipelineAckStatusGetter()
- throws NoSuchMethodException {
- try {
- return createPipelineAckStatusGetter27();
- } catch (NoSuchMethodException e) {
- LOG.debug("Can not get expected method " + e.getMessage() +
- ", this usually because your Hadoop is pre 2.7.0, " +
- "try the methods in Hadoop 2.6.x instead.");
- }
- return createPipelineAckStatusGetter26();
- }
-
- private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
- Method setStorageTypeMethod =
- OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
- ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
- for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
- builder.put(storageTypeProto.name(), storageTypeProto);
- }
- ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
- return new StorageTypeSetter() {
-
- @Override
- public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
- Object protoEnum = name2ProtoEnum.get(storageType.name());
- try {
- setStorageTypeMethod.invoke(builder, protoEnum);
- } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- return builder;
- }
- };
- }
-
- private static BlockAdder createBlockAdder() throws NoSuchMethodException {
- for (Method method : ClientProtocol.class.getMethods()) {
- if (method.getName().equals("addBlock")) {
- Method addBlockMethod = method;
- Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
- if (paramTypes[paramTypes.length - 1] == String[].class) {
- return new BlockAdder() {
-
- @Override
- public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
- ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
- String[] favoredNodes) throws IOException {
- try {
- return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
- excludeNodes, fileId, favoredNodes);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
- throw new RuntimeException(e);
- }
- }
- };
- } else {
- return new BlockAdder() {
-
- @Override
- public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
- ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
- String[] favoredNodes) throws IOException {
- try {
- return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
- excludeNodes, fileId, favoredNodes, null);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
- throw new RuntimeException(e);
- }
- }
- };
- }
- }
- }
- throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
- }
-
- private static PBHelper createPBHelper() throws NoSuchMethodException {
- Class<?> helperClass;
- String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
- try {
- helperClass = Class.forName(clazzName);
- } catch (ClassNotFoundException e) {
- helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
- LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
- helperClass.toString() + " instead.");
- }
- Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
- Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
- return new PBHelper() {
-
- @Override
- public ExtendedBlockProto convert(ExtendedBlock b) {
- try {
- return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public TokenProto convert(Token<?> tok) {
- try {
- return (TokenProto) convertTokenMethod.invoke(null, tok);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
- throws NoSuchMethodException {
- for (Method method : confClass.getMethods()) {
- if (method.getName().equals("createChecksum")) {
- Method createChecksumMethod = method;
- return new ChecksumCreater() {
-
- @Override
- public DataChecksum createChecksum(DFSClient client) {
- try {
- return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
- (Object) null);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
- }
- throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
- }
-
- private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
- throws NoSuchMethodException {
- Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
- createChecksumMethod.setAccessible(true);
- return new ChecksumCreater() {
-
- @Override
- public DataChecksum createChecksum(DFSClient client) {
- try {
- return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private static ChecksumCreater createChecksumCreater()
- throws NoSuchMethodException, ClassNotFoundException {
- Method getConfMethod = DFSClient.class.getMethod("getConf");
- try {
- return createChecksumCreater28(getConfMethod,
- Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
- } catch (ClassNotFoundException e) {
- LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
- }
- return createChecksumCreater27(getConfMethod,
- Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
- }
-
private static FileCreator createFileCreator3() throws NoSuchMethodException {
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
@@ -547,13 +271,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
static {
try {
- PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
- STORAGE_TYPE_SETTER = createStorageTypeSetter();
- BLOCK_ADDER = createBlockAdder();
LEASE_MANAGER = createLeaseManager();
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
- PB_HELPER = createPBHelper();
- CHECKSUM_CREATER = createChecksumCreater();
FILE_CREATOR = createFileCreator();
} catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please " +
@@ -573,11 +292,19 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
static DataChecksum createChecksum(DFSClient client) {
- return CHECKSUM_CREATER.createChecksum(client);
+ return client.getConf().createChecksum(null);
}
static Status getStatus(PipelineAckProto ack) {
- return PIPELINE_ACK_STATUS_GETTER.get(ack);
+ List<Integer> flagList = ack.getFlagList();
+ Integer headerFlag;
+ if (flagList.isEmpty()) {
+ Status reply = ack.getReply(0);
+ headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
+ } else {
+ headerFlag = flagList.get(0);
+ }
+ return PipelineAck.getStatusFromHeader(headerFlag);
}
private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
@@ -641,12 +368,13 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
});
}
- private static void requestWriteBlock(Channel channel, Enum<?> storageType,
+ private static void requestWriteBlock(Channel channel, StorageType storageType,
OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
- OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
+ OpWriteBlockProto proto =
+ writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
int protoLen = proto.getSerializedSize();
ByteBuf buffer =
- channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+ channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
buffer.writeByte(Op.WRITE_BLOCK.code);
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
@@ -654,7 +382,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
- Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+ StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
throws IOException {
Promise<Void> saslPromise = channel.eventLoop().newPromise();
@@ -678,7 +406,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) {
- Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
+ StorageType[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname =
conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
@@ -686,9 +414,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
blockCopy.setNumBytes(locatedBlock.getBlockSize());
ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
- .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
- .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
- .setClientName(clientName).build();
+ .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
+ .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
+ .setClientName(clientName).build();
ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
.setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
@@ -699,7 +427,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
for (int i = 0; i < datanodeInfos.length; i++) {
DatanodeInfo dnInfo = datanodeInfos[i];
- Enum<?> storageType = storageTypes[i];
+ StorageType storageType = storageTypes[i];
Promise<Channel> promise = eventLoopGroup.next().newPromise();
futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
@@ -771,8 +499,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
List<Future<Channel>> futureList = null;
try {
DataChecksum summer = createChecksum(client);
- locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
- excludesNodes, stat.getFileId(), null);
+ locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes,
+ stat.getFileId(), null, null);
List<Channel> datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index ef6c1ca..4181dd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import com.google.protobuf.ByteString;
@@ -66,7 +66,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
@@ -128,16 +128,6 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final SaslAdaptor SASL_ADAPTOR;
- // helper class for convert protos.
- private interface PBHelper {
-
- List<CipherOptionProto> convertCipherOptions(List<CipherOption> options);
-
- List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options);
- }
-
- private static final PBHelper PB_HELPER;
-
private interface TransparentCryptoHelper {
Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
@@ -188,42 +178,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
};
}
- private static PBHelper createPBHelper() throws NoSuchMethodException {
- Class<?> helperClass;
- try {
- helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
- } catch (ClassNotFoundException e) {
- LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
- helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
- }
- Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class);
- Method convertCipherOptionProtosMethod =
- helperClass.getMethod("convertCipherOptionProtos", List.class);
- return new PBHelper() {
-
- @SuppressWarnings("unchecked")
- @Override
- public List<CipherOptionProto> convertCipherOptions(List<CipherOption> options) {
- try {
- return (List<CipherOptionProto>) convertCipherOptionsMethod.invoke(null, options);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options) {
- try {
- return (List<CipherOption>) convertCipherOptionProtosMethod.invoke(null, options);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private static TransparentCryptoHelper createTransparentCryptoHelper27()
+ private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
throws NoSuchMethodException {
Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
.getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
@@ -252,7 +207,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
};
}
- private static TransparentCryptoHelper createTransparentCryptoHelper28()
+ private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
throws ClassNotFoundException, NoSuchMethodException {
Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
@@ -285,18 +240,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static TransparentCryptoHelper createTransparentCryptoHelper()
throws NoSuchMethodException, ClassNotFoundException {
try {
- return createTransparentCryptoHelper27();
+ return createTransparentCryptoHelperWithoutHDFS12396();
} catch (NoSuchMethodException e) {
- LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient, should be hadoop 2.8+",
- e);
+ LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
+ " should be hadoop version with HDFS-12396", e);
}
- return createTransparentCryptoHelper28();
+ return createTransparentCryptoHelperWithHDFS12396();
}
static {
try {
SASL_ADAPTOR = createSaslAdaptor();
- PB_HELPER = createPBHelper();
TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
} catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please "
@@ -409,7 +363,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
builder.setPayload(ByteString.copyFrom(payload));
}
if (options != null) {
- builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options));
+ builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
}
DataTransferEncryptorMessageProto proto = builder.build();
int size = proto.getSerializedSize();
@@ -493,7 +447,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
List<CipherOption> cipherOptions =
- PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList());
+ PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
if (cipherOptions == null || cipherOptions.isEmpty()) {
return null;
}