You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/05/18 06:14:09 UTC

[hbase] branch branch-2.2 updated: HBASE-22400 Remove the adapter code in async fs implementation for hadoop-2.7.x

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new a612537  HBASE-22400 Remove the adapter code in async fs implementation for hadoop-2.7.x
a612537 is described below

commit a6125374825bd513f5f979abfe4ed48828c4c8ea
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed May 15 09:59:41 2019 +0800

    HBASE-22400 Remove the adapter code in async fs implementation for hadoop-2.7.x
---
 .../FanOutOneBlockAsyncDFSOutputHelper.java        | 328 ++-------------------
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java    |  66 +----
 2 files changed, 38 insertions(+), 356 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 34d9f29..0c8a49b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -21,9 +21,9 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
 import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
 import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemLinkResolver;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.ECN;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@@ -71,23 +73,19 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationH
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
@@ -136,35 +134,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
 
-  // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
-  // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
-  // get from proto directly, or combined by the reply field of the proto and a ECN object. See
-  // createPipelineAckStatusGetter for more details.
-  private interface PipelineAckStatusGetter {
-    Status get(PipelineAckProto ack);
-  }
-
-  private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
-
-  // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
-  // we need to use reflection to set it.See createStorageTypeSetter for more details.
-  private interface StorageTypeSetter {
-    OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
-  }
-
-  private static final StorageTypeSetter STORAGE_TYPE_SETTER;
-
-  // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
-  // hadoop 2.8 or later. See createBlockAdder for more details.
-  private interface BlockAdder {
-
-    LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
-        ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
-        throws IOException;
-  }
-
-  private static final BlockAdder BLOCK_ADDER;
-
   private interface LeaseManager {
 
     void begin(DFSClient client, long inodeId);
@@ -183,23 +152,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
 
-  // helper class for convert protos.
-  private interface PBHelper {
-
-    ExtendedBlockProto convert(ExtendedBlock b);
-
-    TokenProto convert(Token<?> tok);
-  }
-
-  private static final PBHelper PB_HELPER;
-
-  // helper class for creating data checksum.
-  private interface ChecksumCreater {
-    DataChecksum createChecksum(DFSClient client);
-  }
-
-  private static final ChecksumCreater CHECKSUM_CREATER;
-
   // helper class for creating files.
   private interface FileCreator {
     default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
@@ -269,234 +221,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     };
   }
 
-  private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
-      throws NoSuchMethodException {
-    Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
-    @SuppressWarnings("rawtypes")
-    Class<? extends Enum> ecnClass;
-    try {
-      ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
-          .asSubclass(Enum.class);
-    } catch (ClassNotFoundException e) {
-      String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
-          "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
-          "HBASE-16110 for more information.";
-      LOG.error(msg, e);
-      throw new Error(msg, e);
-    }
-    @SuppressWarnings("unchecked")
-    Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
-    Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
-    Method combineHeaderMethod =
-        PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
-    Method getStatusFromHeaderMethod =
-        PipelineAck.class.getMethod("getStatusFromHeader", int.class);
-    return new PipelineAckStatusGetter() {
-
-      @Override
-      public Status get(PipelineAckProto ack) {
-        try {
-          @SuppressWarnings("unchecked")
-          List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
-          Integer headerFlag;
-          if (flagList.isEmpty()) {
-            Status reply = (Status) getReplyMethod.invoke(ack, 0);
-            headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
-          } else {
-            headerFlag = flagList.get(0);
-          }
-          return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
-      throws NoSuchMethodException {
-    Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
-    return new PipelineAckStatusGetter() {
-
-      @Override
-      public Status get(PipelineAckProto ack) {
-        try {
-          return (Status) getStatusMethod.invoke(ack, 0);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private static PipelineAckStatusGetter createPipelineAckStatusGetter()
-      throws NoSuchMethodException {
-    try {
-      return createPipelineAckStatusGetter27();
-    } catch (NoSuchMethodException e) {
-      LOG.debug("Can not get expected method " + e.getMessage() +
-          ", this usually because your Hadoop is pre 2.7.0, " +
-          "try the methods in Hadoop 2.6.x instead.");
-    }
-    return createPipelineAckStatusGetter26();
-  }
-
-  private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
-    Method setStorageTypeMethod =
-        OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
-    ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
-    for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
-      builder.put(storageTypeProto.name(), storageTypeProto);
-    }
-    ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
-    return new StorageTypeSetter() {
-
-      @Override
-      public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
-        Object protoEnum = name2ProtoEnum.get(storageType.name());
-        try {
-          setStorageTypeMethod.invoke(builder, protoEnum);
-        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-        return builder;
-      }
-    };
-  }
-
-  private static BlockAdder createBlockAdder() throws NoSuchMethodException {
-    for (Method method : ClientProtocol.class.getMethods()) {
-      if (method.getName().equals("addBlock")) {
-        Method addBlockMethod = method;
-        Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
-        if (paramTypes[paramTypes.length - 1] == String[].class) {
-          return new BlockAdder() {
-
-            @Override
-            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
-                ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
-                String[] favoredNodes) throws IOException {
-              try {
-                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
-                  excludeNodes, fileId, favoredNodes);
-              } catch (IllegalAccessException e) {
-                throw new RuntimeException(e);
-              } catch (InvocationTargetException e) {
-                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
-                throw new RuntimeException(e);
-              }
-            }
-          };
-        } else {
-          return new BlockAdder() {
-
-            @Override
-            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
-                ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
-                String[] favoredNodes) throws IOException {
-              try {
-                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
-                  excludeNodes, fileId, favoredNodes, null);
-              } catch (IllegalAccessException e) {
-                throw new RuntimeException(e);
-              } catch (InvocationTargetException e) {
-                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
-                throw new RuntimeException(e);
-              }
-            }
-          };
-        }
-      }
-    }
-    throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
-  }
-
-  private static PBHelper createPBHelper() throws NoSuchMethodException {
-    Class<?> helperClass;
-    String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
-    try {
-      helperClass = Class.forName(clazzName);
-    } catch (ClassNotFoundException e) {
-      helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
-      LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
-          helperClass.toString() + " instead.");
-    }
-    Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
-    Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
-    return new PBHelper() {
-
-      @Override
-      public ExtendedBlockProto convert(ExtendedBlock b) {
-        try {
-          return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public TokenProto convert(Token<?> tok) {
-        try {
-          return (TokenProto) convertTokenMethod.invoke(null, tok);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
-      throws NoSuchMethodException {
-    for (Method method : confClass.getMethods()) {
-      if (method.getName().equals("createChecksum")) {
-        Method createChecksumMethod = method;
-        return new ChecksumCreater() {
-
-          @Override
-          public DataChecksum createChecksum(DFSClient client) {
-            try {
-              return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
-                (Object) null);
-            } catch (IllegalAccessException | InvocationTargetException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        };
-      }
-    }
-    throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
-  }
-
-  private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
-      throws NoSuchMethodException {
-    Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
-    createChecksumMethod.setAccessible(true);
-    return new ChecksumCreater() {
-
-      @Override
-      public DataChecksum createChecksum(DFSClient client) {
-        try {
-          return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private static ChecksumCreater createChecksumCreater()
-      throws NoSuchMethodException, ClassNotFoundException {
-    Method getConfMethod = DFSClient.class.getMethod("getConf");
-    try {
-      return createChecksumCreater28(getConfMethod,
-        Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
-    } catch (ClassNotFoundException e) {
-      LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
-    }
-    return createChecksumCreater27(getConfMethod,
-      Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
-  }
-
   private static FileCreator createFileCreator3() throws NoSuchMethodException {
     Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
       String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
@@ -547,13 +271,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   static {
     try {
-      PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
-      STORAGE_TYPE_SETTER = createStorageTypeSetter();
-      BLOCK_ADDER = createBlockAdder();
       LEASE_MANAGER = createLeaseManager();
       DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
-      PB_HELPER = createPBHelper();
-      CHECKSUM_CREATER = createChecksumCreater();
       FILE_CREATOR = createFileCreator();
     } catch (Exception e) {
       String msg = "Couldn't properly initialize access to HDFS internals. Please " +
@@ -573,11 +292,19 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   }
 
   static DataChecksum createChecksum(DFSClient client) {
-    return CHECKSUM_CREATER.createChecksum(client);
+    return client.getConf().createChecksum(null);
   }
 
   static Status getStatus(PipelineAckProto ack) {
-    return PIPELINE_ACK_STATUS_GETTER.get(ack);
+    List<Integer> flagList = ack.getFlagList();
+    Integer headerFlag;
+    if (flagList.isEmpty()) {
+      Status reply = ack.getReply(0);
+      headerFlag = PipelineAck.combineHeader(ECN.DISABLED, reply);
+    } else {
+      headerFlag = flagList.get(0);
+    }
+    return PipelineAck.getStatusFromHeader(headerFlag);
   }
 
   private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
@@ -641,12 +368,13 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       });
   }
 
-  private static void requestWriteBlock(Channel channel, Enum<?> storageType,
+  private static void requestWriteBlock(Channel channel, StorageType storageType,
       OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
-    OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
+    OpWriteBlockProto proto =
+      writeBlockProtoBuilder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
     int protoLen = proto.getSerializedSize();
     ByteBuf buffer =
-        channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+      channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
     buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
     buffer.writeByte(Op.WRITE_BLOCK.code);
     proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
@@ -654,7 +382,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   }
 
   private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
-      Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+      StorageType storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
       DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
       throws IOException {
     Promise<Void> saslPromise = channel.eventLoop().newPromise();
@@ -678,7 +406,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
       BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
       Class<? extends Channel> channelClass) {
-    Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
+    StorageType[] storageTypes = locatedBlock.getStorageTypes();
     DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
     boolean connectToDnViaHostname =
         conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
@@ -686,9 +414,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
     blockCopy.setNumBytes(locatedBlock.getBlockSize());
     ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
-        .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
-            .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
-        .setClientName(clientName).build();
+      .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(blockCopy))
+        .setToken(PBHelperClient.convert(locatedBlock.getBlockToken())))
+      .setClientName(clientName).build();
     ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
     OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
         .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
@@ -699,7 +427,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
     for (int i = 0; i < datanodeInfos.length; i++) {
       DatanodeInfo dnInfo = datanodeInfos[i];
-      Enum<?> storageType = storageTypes[i];
+      StorageType storageType = storageTypes[i];
       Promise<Channel> promise = eventLoopGroup.next().newPromise();
       futureList.add(promise);
       String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
@@ -771,8 +499,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       List<Future<Channel>> futureList = null;
       try {
         DataChecksum summer = createChecksum(client);
-        locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
-          excludesNodes, stat.getFileId(), null);
+        locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes,
+          stat.getFileId(), null, null);
         List<Channel> datanodeList = new ArrayList<>();
         futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
           PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index ef6c1ca..4181dd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
 import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
 
 import com.google.protobuf.ByteString;
@@ -66,7 +66,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.security.SaslPropertiesResolver;
@@ -128,16 +128,6 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
 
   private static final SaslAdaptor SASL_ADAPTOR;
 
-  // helper class for convert protos.
-  private interface PBHelper {
-
-    List<CipherOptionProto> convertCipherOptions(List<CipherOption> options);
-
-    List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options);
-  }
-
-  private static final PBHelper PB_HELPER;
-
   private interface TransparentCryptoHelper {
 
     Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
@@ -188,42 +178,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     };
   }
 
-  private static PBHelper createPBHelper() throws NoSuchMethodException {
-    Class<?> helperClass;
-    try {
-      helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
-    } catch (ClassNotFoundException e) {
-      LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
-      helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
-    }
-    Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class);
-    Method convertCipherOptionProtosMethod =
-        helperClass.getMethod("convertCipherOptionProtos", List.class);
-    return new PBHelper() {
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public List<CipherOptionProto> convertCipherOptions(List<CipherOption> options) {
-        try {
-          return (List<CipherOptionProto>) convertCipherOptionsMethod.invoke(null, options);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options) {
-        try {
-          return (List<CipherOption>) convertCipherOptionProtosMethod.invoke(null, options);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private static TransparentCryptoHelper createTransparentCryptoHelper27()
+  private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
       throws NoSuchMethodException {
     Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
       .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
@@ -252,7 +207,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     };
   }
 
-  private static TransparentCryptoHelper createTransparentCryptoHelper28()
+  private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
       throws ClassNotFoundException, NoSuchMethodException {
     Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
     Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
@@ -285,18 +240,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
   private static TransparentCryptoHelper createTransparentCryptoHelper()
       throws NoSuchMethodException, ClassNotFoundException {
     try {
-      return createTransparentCryptoHelper27();
+      return createTransparentCryptoHelperWithoutHDFS12396();
     } catch (NoSuchMethodException e) {
-      LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient, should be hadoop 2.8+",
-        e);
+      LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
+        " should be hadoop version with HDFS-12396", e);
     }
-    return createTransparentCryptoHelper28();
+    return createTransparentCryptoHelperWithHDFS12396();
   }
 
   static {
     try {
       SASL_ADAPTOR = createSaslAdaptor();
-      PB_HELPER = createPBHelper();
       TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
     } catch (Exception e) {
       String msg = "Couldn't properly initialize access to HDFS internals. Please "
@@ -409,7 +363,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
         builder.setPayload(ByteString.copyFrom(payload));
       }
       if (options != null) {
-        builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options));
+        builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
       }
       DataTransferEncryptorMessageProto proto = builder.build();
       int size = proto.getSerializedSize();
@@ -493,7 +447,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
         boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
       List<CipherOption> cipherOptions =
-          PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList());
+          PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
       if (cipherOptions == null || cipherOptions.isEmpty()) {
         return null;
       }