You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/01 01:23:26 UTC

[1/2] hbase git commit: HBASE-16968 Refactor FanOutOneBlockAsyncDFSOutput

Repository: hbase
Updated Branches:
  refs/heads/master d0be36deb -> 45a259424


http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index a222e1b..c0121d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -18,12 +18,11 @@
 package org.apache.hadoop.hbase.io.asyncfs;
 
 import static io.netty.handler.timeout.IdleState.READER_IDLE;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.protobuf.CodedOutputStream;
 
@@ -47,13 +46,13 @@ import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -73,9 +72,17 @@ import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.Decryptor;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -83,9 +90,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.security.SaslPropertiesResolver;
@@ -110,469 +118,105 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
   private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
   private static final String NAME_DELIMITER = " ";
 
-  @VisibleForTesting
-  static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites";
-
-  @VisibleForTesting
-  static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
-
   private interface SaslAdaptor {
 
-    SaslPropertiesResolver getSaslPropsResolver(DFSClient client);
+    TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
 
-    TrustedChannelResolver getTrustedChannelResolver(DFSClient client);
+    SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
 
-    AtomicBoolean getFallbackToSimpleAuth(DFSClient client);
-
-    DataEncryptionKey createDataEncryptionKey(DFSClient client);
+    AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
   }
 
   private static final SaslAdaptor SASL_ADAPTOR;
 
-  private interface CipherOptionHelper {
-
-    List<Object> getCipherOptions(Configuration conf) throws IOException;
-
-    void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder,
-        List<Object> cipherOptions);
-
-    Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy,
-        SaslClient saslClient) throws IOException;
-
-    Object getCipherSuite(Object cipherOption);
-
-    byte[] getInKey(Object cipherOption);
+  // helper class for convert protos.
+  private interface PBHelper {
 
-    byte[] getInIv(Object cipherOption);
+    List<CipherOptionProto> convertCipherOptions(List<CipherOption> options);
 
-    byte[] getOutKey(Object cipherOption);
-
-    byte[] getOutIv(Object cipherOption);
+    List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options);
   }
 
-  private static final CipherOptionHelper CIPHER_OPTION_HELPER;
+  private static final PBHelper PB_HELPER;
 
   private interface TransparentCryptoHelper {
 
-    Object getFileEncryptionInfo(HdfsFileStatus stat);
-
-    CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client)
+    Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
         throws IOException;
   }
 
   private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
 
-  static final class CryptoCodec {
-
-    private static final Method CREATE_CODEC;
-
-    private static final Method CREATE_ENCRYPTOR;
-
-    private static final Method CREATE_DECRYPTOR;
-
-    private static final Method INIT_ENCRYPTOR;
-
-    private static final Method INIT_DECRYPTOR;
-
-    private static final Method ENCRYPT;
-
-    private static final Method DECRYPT;
-
-    static {
-      Class<?> cryptoCodecClass = null;
-      try {
-        cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec");
-      } catch (ClassNotFoundException e) {
-        LOG.debug("No CryptoCodec class found, should be hadoop 2.5-", e);
-      }
-      if (cryptoCodecClass != null) {
-        Method getInstanceMethod = null;
-        for (Method method : cryptoCodecClass.getMethods()) {
-          if (method.getName().equals("getInstance") && method.getParameterTypes().length == 2) {
-            getInstanceMethod = method;
-            break;
-          }
-        }
-        try {
-          if (getInstanceMethod == null) {
-            throw new NoSuchMethodException(
-                "Can not find suitable getInstance method in CryptoCodec");
-          }
-          CREATE_CODEC = getInstanceMethod;
-          CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor");
-          CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor");
-
-          Class<?> encryptorClass = Class.forName("org.apache.hadoop.crypto.Encryptor");
-          INIT_ENCRYPTOR = encryptorClass.getMethod("init", byte[].class, byte[].class);
-          ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, ByteBuffer.class);
-
-          Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor");
-          INIT_DECRYPTOR = decryptorClass.getMethod("init", byte[].class, byte[].class);
-          DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class);
-        } catch (Exception e) {
-          final String msg = "Couldn't properly initialize access to HDFS internals. Please "
-              + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
-              + "HBASE-16110 for more information.";
-          LOG.error(msg, e);
-          throw new Error(msg, e);
-        }
-      } else {
-        CREATE_CODEC = null;
-        CREATE_ENCRYPTOR = null;
-        CREATE_DECRYPTOR = null;
-        INIT_ENCRYPTOR = null;
-        INIT_DECRYPTOR = null;
-        ENCRYPT = null;
-        DECRYPT = null;
-      }
-    }
-
-    private final Object encryptor;
-
-    private final Object decryptor;
-
-    public CryptoCodec(Configuration conf, Object cipherOption) {
-      try {
-        Object codec = CREATE_CODEC.invoke(null, conf,
-          CIPHER_OPTION_HELPER.getCipherSuite(cipherOption));
-        encryptor = CREATE_ENCRYPTOR.invoke(codec);
-        byte[] encKey = CIPHER_OPTION_HELPER.getInKey(cipherOption);
-        byte[] encIv = CIPHER_OPTION_HELPER.getInIv(cipherOption);
-        INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length));
-
-        decryptor = CREATE_DECRYPTOR.invoke(codec);
-        byte[] decKey = CIPHER_OPTION_HELPER.getOutKey(cipherOption);
-        byte[] decIv = CIPHER_OPTION_HELPER.getOutIv(cipherOption);
-        INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length));
-      } catch (IllegalAccessException | InvocationTargetException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public CryptoCodec(Configuration conf, Object cipherSuite, byte[] encKey, byte[] encIv) {
-      try {
-        Object codec = CREATE_CODEC.invoke(null, conf, cipherSuite);
-        encryptor = CREATE_ENCRYPTOR.invoke(codec);
-        INIT_ENCRYPTOR.invoke(encryptor, encKey, encIv);
-        decryptor = null;
-      } catch (IllegalAccessException | InvocationTargetException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
-      try {
-        ENCRYPT.invoke(encryptor, inBuffer, outBuffer);
-      } catch (IllegalAccessException | InvocationTargetException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
-      try {
-        DECRYPT.invoke(decryptor, inBuffer, outBuffer);
-      } catch (IllegalAccessException | InvocationTargetException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  private static SaslAdaptor createSaslAdaptor27(Class<?> saslDataTransferClientClass)
+  private static SaslAdaptor createSaslAdaptor()
       throws NoSuchFieldException, NoSuchMethodException {
-    final Field saslPropsResolverField = saslDataTransferClientClass
-        .getDeclaredField("saslPropsResolver");
+    Field saslPropsResolverField =
+        SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
     saslPropsResolverField.setAccessible(true);
-    final Field trustedChannelResolverField = saslDataTransferClientClass
-        .getDeclaredField("trustedChannelResolver");
+    Field trustedChannelResolverField =
+        SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
     trustedChannelResolverField.setAccessible(true);
-    final Field fallbackToSimpleAuthField = saslDataTransferClientClass
-        .getDeclaredField("fallbackToSimpleAuth");
+    Field fallbackToSimpleAuthField =
+        SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
     fallbackToSimpleAuthField.setAccessible(true);
-    final Method getSaslDataTransferClientMethod = DFSClient.class
-        .getMethod("getSaslDataTransferClient");
-    final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey");
     return new SaslAdaptor() {
 
       @Override
-      public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
-        try {
-          return (TrustedChannelResolver) trustedChannelResolverField
-              .get(getSaslDataTransferClientMethod.invoke(client));
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
-        try {
-          return (SaslPropertiesResolver) saslPropsResolverField
-              .get(getSaslDataTransferClientMethod.invoke(client));
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
+      public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
         try {
-          return (AtomicBoolean) fallbackToSimpleAuthField
-              .get(getSaslDataTransferClientMethod.invoke(client));
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
-        try {
-          return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client);
-        } catch (IllegalAccessException | InvocationTargetException e) {
+          return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
+        } catch (IllegalAccessException e) {
           throw new RuntimeException(e);
         }
       }
-    };
-  }
-
-  private static SaslAdaptor createSaslAdaptor25()
-      throws NoSuchFieldException, NoSuchMethodException {
-    final Field trustedChannelResolverField = DFSClient.class
-        .getDeclaredField("trustedChannelResolver");
-    trustedChannelResolverField.setAccessible(true);
-    final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
-    return new SaslAdaptor() {
 
       @Override
-      public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
+      public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
         try {
-          return (TrustedChannelResolver) trustedChannelResolverField.get(client);
+          return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
         } catch (IllegalAccessException e) {
           throw new RuntimeException(e);
         }
       }
 
       @Override
-      public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
-        return null;
-      }
-
-      @Override
-      public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
-        return null;
-      }
-
-      @Override
-      public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
+      public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
         try {
-          return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
-        } catch (IllegalAccessException | InvocationTargetException e) {
+          return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
+        } catch (IllegalAccessException e) {
           throw new RuntimeException(e);
         }
       }
     };
   }
 
-  private static SaslAdaptor createSaslAdaptor()
-      throws NoSuchFieldException, NoSuchMethodException {
+  private static PBHelper createPBHelper() throws NoSuchMethodException {
+    Class<?> helperClass;
     try {
-      return createSaslAdaptor27(
-        Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"));
-    } catch (ClassNotFoundException e) {
-      LOG.debug("No SaslDataTransferClient class found, should be hadoop 2.5-", e);
-    }
-    return createSaslAdaptor25();
-  }
-
-  private static CipherOptionHelper createCipherHelper25() {
-    return new CipherOptionHelper() {
-
-      @Override
-      public byte[] getOutKey(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public byte[] getOutIv(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public byte[] getInKey(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public byte[] getInIv(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Object getCipherSuite(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public List<Object> getCipherOptions(Configuration conf) {
-        return null;
-      }
-
-      @Override
-      public Object getCipherOption(DataTransferEncryptorMessageProto proto,
-          boolean isNegotiatedQopPrivacy, SaslClient saslClient) {
-        return null;
-      }
-
-      @Override
-      public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-
-  private static CipherOptionHelper createCipherHelper27(Class<?> cipherOptionClass)
-      throws ClassNotFoundException, NoSuchMethodException {
-    @SuppressWarnings("rawtypes")
-    Class<? extends Enum> cipherSuiteClass = Class.forName("org.apache.hadoop.crypto.CipherSuite")
-        .asSubclass(Enum.class);
-    @SuppressWarnings("unchecked")
-    final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING");
-    final Constructor<?> cipherOptionConstructor = cipherOptionClass
-        .getConstructor(cipherSuiteClass);
-    final Constructor<?> cipherOptionWithKeyAndIvConstructor = cipherOptionClass
-        .getConstructor(cipherSuiteClass, byte[].class, byte[].class, byte[].class, byte[].class);
-
-    final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite");
-    final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey");
-    final Method getInIvMethod = cipherOptionClass.getMethod("getInIv");
-    final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
-    final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
-
-    Class<?> pbHelperClass;
-    try {
-      pbHelperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
+      helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
     } catch (ClassNotFoundException e) {
       LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
-      pbHelperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+      helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
     }
-    final Method convertCipherOptionsMethod = pbHelperClass.getMethod("convertCipherOptions",
-      List.class);
-    final Method convertCipherOptionProtosMethod = pbHelperClass
-        .getMethod("convertCipherOptionProtos", List.class);
-    final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class
-        .getMethod("addAllCipherOption", Iterable.class);
-    final Method getCipherOptionListMethod = DataTransferEncryptorMessageProto.class
-        .getMethod("getCipherOptionList");
-    return new CipherOptionHelper() {
-
-      @Override
-      public byte[] getOutKey(Object cipherOption) {
-        try {
-          return (byte[]) getOutKeyMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public byte[] getOutIv(Object cipherOption) {
-        try {
-          return (byte[]) getOutIvMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public byte[] getInKey(Object cipherOption) {
-        try {
-          return (byte[]) getInKeyMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public byte[] getInIv(Object cipherOption) {
-        try {
-          return (byte[]) getInIvMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public Object getCipherSuite(Object cipherOption) {
-        try {
-          return getCipherSuiteMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public List<Object> getCipherOptions(Configuration conf) throws IOException {
-        // Negotiate cipher suites if configured. Currently, the only supported
-        // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
-        // values for future expansion.
-        String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
-        if (cipherSuites == null || cipherSuites.isEmpty()) {
-          return null;
-        }
-        if (!cipherSuites.equals(AES_CTR_NOPADDING)) {
-          throw new IOException(String.format("Invalid cipher suite, %s=%s",
-            DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
-        }
-        Object option;
-        try {
-          option = cipherOptionConstructor.newInstance(aesCipherSuite);
-        } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-        List<Object> cipherOptions = Lists.newArrayListWithCapacity(1);
-        cipherOptions.add(option);
-        return cipherOptions;
-      }
-
-      private Object unwrap(Object option, SaslClient saslClient) throws IOException {
-        byte[] inKey = getInKey(option);
-        if (inKey != null) {
-          inKey = saslClient.unwrap(inKey, 0, inKey.length);
-        }
-        byte[] outKey = getOutKey(option);
-        if (outKey != null) {
-          outKey = saslClient.unwrap(outKey, 0, outKey.length);
-        }
-        try {
-          return cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey,
-            getInIv(option), outKey, getOutIv(option));
-        } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
+    Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class);
+    Method convertCipherOptionProtosMethod =
+        helperClass.getMethod("convertCipherOptionProtos", List.class);
+    return new PBHelper() {
 
       @SuppressWarnings("unchecked")
       @Override
-      public Object getCipherOption(DataTransferEncryptorMessageProto proto,
-          boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
-        List<Object> cipherOptions;
+      public List<CipherOptionProto> convertCipherOptions(List<CipherOption> options) {
         try {
-          cipherOptions = (List<Object>) convertCipherOptionProtosMethod.invoke(null,
-            getCipherOptionListMethod.invoke(proto));
+          return (List<CipherOptionProto>) convertCipherOptionsMethod.invoke(null, options);
         } catch (IllegalAccessException | InvocationTargetException e) {
           throw new RuntimeException(e);
         }
-        if (cipherOptions == null || cipherOptions.isEmpty()) {
-          return null;
-        }
-        Object cipherOption = cipherOptions.get(0);
-        return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
       }
 
+      @SuppressWarnings("unchecked")
       @Override
-      public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
+      public List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options) {
         try {
-          addAllCipherOptionMethod.invoke(builder,
-            convertCipherOptionsMethod.invoke(null, cipherOptions));
+          return (List<CipherOption>) convertCipherOptionProtosMethod.invoke(null, options);
         } catch (IllegalAccessException | InvocationTargetException e) {
           throw new RuntimeException(e);
         }
@@ -580,65 +224,28 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     };
   }
 
-  private static CipherOptionHelper createCipherHelper()
-      throws ClassNotFoundException, NoSuchMethodException {
-    Class<?> cipherOptionClass;
-    try {
-      cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption");
-    } catch (ClassNotFoundException e) {
-      LOG.debug("No CipherOption class found, should be hadoop 2.5-", e);
-      return createCipherHelper25();
-    }
-    return createCipherHelper27(cipherOptionClass);
-  }
-
-  private static TransparentCryptoHelper createTransparentCryptoHelper25() {
-    return new TransparentCryptoHelper() {
-
-      @Override
-      public Object getFileEncryptionInfo(HdfsFileStatus stat) {
-        return null;
-      }
-
-      @Override
-      public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-
-  private static TransparentCryptoHelper createTransparentCryptoHelper27(Class<?> feInfoClass)
-      throws NoSuchMethodException, ClassNotFoundException {
-    final Method getFileEncryptionInfoMethod = HdfsFileStatus.class
-        .getMethod("getFileEncryptionInfo");
-    final Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
-        .getDeclaredMethod("decryptEncryptedDataEncryptionKey", feInfoClass);
+  private static TransparentCryptoHelper createTransparentCryptoHelper()
+      throws NoSuchMethodException {
+    Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
+        .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
     decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
-    final Method getCipherSuiteMethod = feInfoClass.getMethod("getCipherSuite");
-    Class<?> keyVersionClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider$KeyVersion");
-    final Method getMaterialMethod = keyVersionClass.getMethod("getMaterial");
-    final Method getIVMethod = feInfoClass.getMethod("getIV");
     return new TransparentCryptoHelper() {
 
       @Override
-      public Object getFileEncryptionInfo(HdfsFileStatus stat) {
-        try {
-          return getFileEncryptionInfoMethod.invoke(stat);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client)
-          throws IOException {
+      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+          DFSClient client) throws IOException {
         try {
-          Object decrypted = decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
-          return new CryptoCodec(conf, getCipherSuiteMethod.invoke(feInfo),
-              (byte[]) getMaterialMethod.invoke(decrypted), (byte[]) getIVMethod.invoke(feInfo));
+          KeyVersion decryptedKey =
+              (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
+          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+          Encryptor encryptor = cryptoCodec.createEncryptor();
+          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+          return encryptor;
         } catch (InvocationTargetException e) {
           Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
           throw new RuntimeException(e.getTargetException());
+        } catch (GeneralSecurityException e) {
+          throw new IOException(e);
         } catch (IllegalAccessException e) {
           throw new RuntimeException(e);
         }
@@ -646,25 +253,13 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     };
   }
 
-  private static TransparentCryptoHelper createTransparentCryptoHelper()
-      throws NoSuchMethodException, ClassNotFoundException {
-    Class<?> feInfoClass;
-    try {
-      feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo");
-    } catch (ClassNotFoundException e) {
-      LOG.debug("No FileEncryptionInfo class found, should be hadoop 2.5-", e);
-      return createTransparentCryptoHelper25();
-    }
-    return createTransparentCryptoHelper27(feInfoClass);
-  }
-
   static {
     try {
       SASL_ADAPTOR = createSaslAdaptor();
-      CIPHER_OPTION_HELPER = createCipherHelper();
+      PB_HELPER = createPBHelper();
       TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
     } catch (Exception e) {
-      final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+      String msg = "Couldn't properly initialize access to HDFS internals. Please "
           + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
           + "HBASE-16110 for more information.";
       LOG.error(msg, e);
@@ -748,16 +343,31 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
       sendSaslMessage(ctx, payload, null);
     }
 
-    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List<Object> options)
-        throws IOException {
-      DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto
-          .newBuilder();
+    private List<CipherOption> getCipherOptions() throws IOException {
+      // Negotiate cipher suites if configured. Currently, the only supported
+      // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+      // values for future expansion.
+      String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+      if (StringUtils.isBlank(cipherSuites)) {
+        return null;
+      }
+      if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+        throw new IOException(String.format("Invalid cipher suite, %s=%s",
+          DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+      }
+      return Arrays.asList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
+        List<CipherOption> options) throws IOException {
+      DataTransferEncryptorMessageProto.Builder builder =
+          DataTransferEncryptorMessageProto.newBuilder();
       builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
       if (payload != null) {
         builder.setPayload(ByteStringer.wrap(payload));
       }
       if (options != null) {
-        CIPHER_OPTION_HELPER.addCipherOptions(builder, options);
+        builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options));
       }
       DataTransferEncryptorMessageProto proto = builder.build();
       int size = proto.getSerializedSize();
@@ -798,8 +408,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     }
 
     private boolean requestedQopContainsPrivacy() {
-      Set<String> requestedQop = ImmutableSet
-          .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      Set<String> requestedQop =
+          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
       return requestedQop.contains("auth-conf");
     }
 
@@ -807,8 +417,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
       if (!saslClient.isComplete()) {
         throw new IOException("Failed to complete SASL handshake");
       }
-      Set<String> requestedQop = ImmutableSet
-          .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      Set<String> requestedQop =
+          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
       String negotiatedQop = getNegotiatedQop();
       LOG.debug(
         "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
@@ -825,48 +435,73 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
       return qop != null && !"auth".equalsIgnoreCase(qop);
     }
 
+    private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
+      byte[] inKey = option.getInKey();
+      if (inKey != null) {
+        inKey = saslClient.unwrap(inKey, 0, inKey.length);
+      }
+      byte[] outKey = option.getOutKey();
+      if (outKey != null) {
+        outKey = saslClient.unwrap(outKey, 0, outKey.length);
+      }
+      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
+          option.getOutIv());
+    }
+
+    private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
+        boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
+      List<CipherOption> cipherOptions =
+          PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList());
+      if (cipherOptions == null || cipherOptions.isEmpty()) {
+        return null;
+      }
+      CipherOption cipherOption = cipherOptions.get(0);
+      return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
+    }
+
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       if (msg instanceof DataTransferEncryptorMessageProto) {
         DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
         check(proto);
         byte[] challenge = proto.getPayload().toByteArray();
         byte[] response = saslClient.evaluateChallenge(challenge);
         switch (step) {
-        case 1: {
-          List<Object> cipherOptions = null;
-          if (requestedQopContainsPrivacy()) {
-            cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
-          }
-          sendSaslMessage(ctx, response, cipherOptions);
-          ctx.flush();
-          step++;
-          break;
-        }
-        case 2: {
-          assert response == null;
-          checkSaslComplete();
-          Object cipherOption = CIPHER_OPTION_HELPER.getCipherOption(proto,
-            isNegotiatedQopPrivacy(), saslClient);
-          ChannelPipeline p = ctx.pipeline();
-          while (p.first() != null) {
-            p.removeFirst();
+          case 1: {
+            List<CipherOption> cipherOptions = null;
+            if (requestedQopContainsPrivacy()) {
+              cipherOptions = getCipherOptions();
+            }
+            sendSaslMessage(ctx, response, cipherOptions);
+            ctx.flush();
+            step++;
+            break;
           }
-          if (cipherOption != null) {
-            CryptoCodec codec = new CryptoCodec(conf, cipherOption);
-            p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
-          } else {
-            if (useWrap()) {
-              p.addLast(new SaslWrapHandler(saslClient),
-                new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
-                new SaslUnwrapHandler(saslClient));
+          case 2: {
+            assert response == null;
+            checkSaslComplete();
+            CipherOption cipherOption =
+                getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
+            ChannelPipeline p = ctx.pipeline();
+            while (p.first() != null) {
+              p.removeFirst();
+            }
+            if (cipherOption != null) {
+              CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
+              p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
+                new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
+            } else {
+              if (useWrap()) {
+                p.addLast(new SaslWrapHandler(saslClient),
+                  new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+                  new SaslUnwrapHandler(saslClient));
+              }
             }
+            promise.trySuccess(null);
+            break;
           }
-          promise.trySuccess(null);
-          break;
-        }
-        default:
-          throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+          default:
+            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
         }
       } else {
         ctx.fireChannelRead(msg);
@@ -961,10 +596,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
 
   private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
-    private final CryptoCodec codec;
+    private final Decryptor decryptor;
 
-    public DecryptHandler(CryptoCodec codec) {
-      this.codec = codec;
+    public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+        throws GeneralSecurityException, IOException {
+      this.decryptor = codec.createDecryptor();
+      this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
     }
 
     @Override
@@ -981,7 +618,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
       ByteBuffer inBuffer = inBuf.nioBuffer();
       ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
       ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
-      codec.decrypt(inBuffer, outBuffer);
+      decryptor.decrypt(inBuffer, outBuffer);
       outBuf.writerIndex(inBuf.readableBytes());
       if (release) {
         inBuf.release();
@@ -992,11 +629,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
 
   private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
 
-    private final CryptoCodec codec;
+    private final Encryptor encryptor;
 
-    public EncryptHandler(CryptoCodec codec) {
-      super(false);
-      this.codec = codec;
+    public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+        throws GeneralSecurityException, IOException {
+      this.encryptor = codec.createEncryptor();
+      this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
     }
 
     @Override
@@ -1022,7 +660,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
       }
       ByteBuffer inBuffer = inBuf.nioBuffer();
       ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
-      codec.encrypt(inBuffer, outBuffer);
+      encryptor.encrypt(inBuffer, outBuffer);
       out.writerIndex(inBuf.readableBytes());
       if (release) {
         inBuf.release();
@@ -1070,22 +708,18 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
 
   static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
       int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
-      Promise<Void> saslPromise) {
-    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(client);
-    TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(client);
-    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(client);
+      Promise<Void> saslPromise) throws IOException {
+    SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
+    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
+    TrustedChannelResolver trustedChannelResolver =
+        SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
+    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
     InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
     if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
       saslPromise.trySuccess(null);
       return;
     }
-    DataEncryptionKey encryptionKey;
-    try {
-      encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client);
-    } catch (Exception e) {
-      saslPromise.tryFailure(e);
-      return;
-    }
+    DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
     if (encryptionKey != null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(
@@ -1131,12 +765,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     }
   }
 
-  static CryptoCodec createCryptoCodec(Configuration conf, HdfsFileStatus stat, DFSClient client)
+  static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
       throws IOException {
-    Object feInfo = TRANSPARENT_CRYPTO_HELPER.getFileEncryptionInfo(stat);
+    FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
     if (feInfo == null) {
       return null;
     }
-    return TRANSPARENT_CRYPTO_HELPER.createCryptoCodec(conf, feInfo, client);
+    return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index d5bccf0..279a6ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -29,7 +29,6 @@ import io.netty.util.concurrent.ScheduledFuture;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.nio.channels.CompletionHandler;
 import java.util.ArrayDeque;
 import java.util.Comparator;
 import java.util.Deque;
@@ -206,9 +205,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   private final long logRollerExitedCheckIntervalMs;
 
-  private final ExecutorService closeExecutor = Executors
-      .newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
-          .setNameFormat("Close-WAL-Writer-%d").build());
+  private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
 
   private volatile AsyncFSOutput fsOut;
 
@@ -216,8 +214,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   private final Deque<FSWALEntry> unackedEntries = new ArrayDeque<FSWALEntry>();
 
-  private final PriorityQueue<SyncFuture> syncFutures = new PriorityQueue<SyncFuture>(11,
-      SEQ_COMPARATOR);
+  private final PriorityQueue<SyncFuture> syncFutures =
+      new PriorityQueue<SyncFuture>(11, SEQ_COMPARATOR);
 
   private Promise<Void> rollPromise;
 
@@ -285,8 +283,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
       Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
-      String prefix, String suffix, EventLoop eventLoop) throws FailedLogCloseException,
-      IOException {
+      String prefix, String suffix, EventLoop eventLoop)
+      throws FailedLogCloseException, IOException {
     super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
     this.eventLoop = eventLoop;
     int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200);
@@ -294,9 +292,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
     createMaxRetries =
         conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
-    logRollerExitedCheckIntervalMs =
-        conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS,
-          DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS);
+    logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS,
+      DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS);
     rollWriter();
   }
 
@@ -310,82 +307,85 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     }
   }
 
+  private void syncFailed(Throwable error) {
+    LOG.warn("sync failed", error);
+    // Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty.
+    // When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It
+    // is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener
+    // directly if it is already in the EventLoop thread. And in the listener method, it will
+    // call us. So here we know that all failed flush request will call us continuously, and
+    // before the last one finish, no other task can be executed in EventLoop. So here we are
+    // safe to use writerBroken as a guard.
+    // Do not forget to revisit this if we change the implementation of
+    // FanOutOneBlockAsyncDFSOutput!
+    synchronized (waitingConsumePayloads) {
+      if (writerBroken) {
+        return;
+      }
+      // schedule a periodical task to check if log roller is exited. Otherwise the the sync
+      // request maybe blocked forever since we are still waiting for a new writer to write the
+      // pending data and sync it...
+      logRollerExitedChecker = new LogRollerExitedChecker();
+      // we are currently in the EventLoop thread, so it is safe to set the future after
+      // schedule it since the task can not be executed before we release the thread.
+      logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
+        logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
+      writerBroken = true;
+    }
+    for (Iterator<FSWALEntry> iter = unackedEntries.descendingIterator(); iter.hasNext();) {
+      waitingAppendEntries.addFirst(iter.next());
+    }
+    highestUnsyncedTxid = highestSyncedTxid.get();
+    if (rollPromise != null) {
+      rollPromise.trySuccess(null);
+      rollPromise = null;
+      return;
+    }
+    // request a roll.
+    if (!rollWriterLock.tryLock()) {
+      return;
+    }
+    try {
+      requestLogRoll();
+    } finally {
+      rollWriterLock.unlock();
+    }
+  }
+
+  private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
+    highestSyncedTxid.set(processedTxid);
+    int syncCount = finishSync(true);
+    for (Iterator<FSWALEntry> iter = unackedEntries.iterator(); iter.hasNext();) {
+      if (iter.next().getTxid() <= processedTxid) {
+        iter.remove();
+      } else {
+        break;
+      }
+    }
+    postSync(System.nanoTime() - startTimeNs, syncCount);
+    tryFinishRoll();
+    if (!rollWriterLock.tryLock()) {
+      return;
+    }
+    try {
+      if (writer.getLength() >= logrollsize) {
+        requestLogRoll();
+      }
+    } finally {
+      rollWriterLock.unlock();
+    }
+  }
+
   private void sync(final AsyncWriter writer, final long processedTxid) {
     fileLengthAtLastSync = writer.getLength();
     final long startTimeNs = System.nanoTime();
-    writer.sync(new CompletionHandler<Long, Void>() {
-
-      @Override
-      public void completed(Long result, Void attachment) {
-        highestSyncedTxid.set(processedTxid);
-        int syncCount = finishSync(true);
-        for (Iterator<FSWALEntry> iter = unackedEntries.iterator(); iter.hasNext();) {
-          if (iter.next().getTxid() <= processedTxid) {
-            iter.remove();
-          } else {
-            break;
-          }
-        }
-        postSync(System.nanoTime() - startTimeNs, syncCount);
-        tryFinishRoll();
-        if (!rollWriterLock.tryLock()) {
-          return;
-        }
-        try {
-          if (writer.getLength() >= logrollsize) {
-            requestLogRoll();
-          }
-        } finally {
-          rollWriterLock.unlock();
-        }
-      }
-
-      @Override
-      public void failed(Throwable exc, Void attachment) {
-        LOG.warn("sync failed", exc);
-        // Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty.
-        // When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It
-        // is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener
-        // directly if it is already in the EventLoop thread. And in the listener method, it will
-        // call us. So here we know that all failed flush request will call us continuously, and
-        // before the last one finish, no other task can be executed in EventLoop. So here we are
-        // safe to use writerBroken as a guard.
-        // Do not forget to revisit this if we change the implementation of
-        // FanOutOneBlockAsyncDFSOutput!
-        synchronized (waitingConsumePayloads) {
-          if (writerBroken) {
-            return;
-          }
-          // schedule a periodical task to check if log roller is exited. Otherwise the the sync
-          // request maybe blocked forever since we are still waiting for a new writer to write the
-          // pending data and sync it...
-          logRollerExitedChecker = new LogRollerExitedChecker();
-          // we are currently in the EventLoop thread, so it is safe to set the future after
-          // schedule it since the task can not be executed before we release the thread.
-          logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
-            logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
-          writerBroken = true;
-        }
-        for (Iterator<FSWALEntry> iter = unackedEntries.descendingIterator(); iter.hasNext();) {
-          waitingAppendEntries.addFirst(iter.next());
-        }
-        highestUnsyncedTxid = highestSyncedTxid.get();
-        if (rollPromise != null) {
-          rollPromise.trySuccess(null);
-          rollPromise = null;
-          return;
-        }
-        // request a roll.
-        if (!rollWriterLock.tryLock()) {
-          return;
-        }
-        try {
-          requestLogRoll();
-        } finally {
-          rollWriterLock.unlock();
-        }
+    writer.sync().whenComplete((result, error) -> {
+      if (error != null) {
+        syncFailed(error);
+      } else {
+        syncCompleted(writer, processedTxid, startTimeNs);
       }
-    }, null);
+    });
   }
 
   private void addTimeAnnotation(SyncFuture future, String annotation) {
@@ -457,13 +457,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     }
   }
 
-  private static final Comparator<SyncFuture> SEQ_COMPARATOR = new Comparator<SyncFuture>() {
-
-    @Override
-    public int compare(SyncFuture o1, SyncFuture o2) {
-      int c = Long.compare(o1.getTxid(), o2.getTxid());
-      return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
-    }
+  private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
+    int c = Long.compare(o1.getTxid(), o2.getTxid());
+    return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
   };
 
   private final Runnable consumer = new Runnable() {
@@ -690,15 +686,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     long oldFileLen;
     if (oldWriter != null) {
       oldFileLen = oldWriter.getLength();
-      closeExecutor.execute(new Runnable() {
-
-        @Override
-        public void run() {
-          try {
-            oldWriter.close();
-          } catch (IOException e) {
-            LOG.warn("close old writer failed", e);
-          }
+      closeExecutor.execute(() -> {
+        try {
+          oldWriter.close();
+        } catch (IOException e) {
+          LOG.warn("close old writer failed", e);
         }
       });
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index db3088c..314bef0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -25,7 +25,9 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,50 +52,6 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
 
   private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class);
 
-  private static final class BlockingCompletionHandler implements CompletionHandler<Long, Void> {
-
-    private long size;
-
-    private Throwable error;
-
-    private boolean finished;
-
-    @Override
-    public void completed(Long result, Void attachment) {
-      synchronized (this) {
-        size = result.longValue();
-        finished = true;
-        notifyAll();
-      }
-    }
-
-    @Override
-    public void failed(Throwable exc, Void attachment) {
-      synchronized (this) {
-        error = exc;
-        finished = true;
-        notifyAll();
-      }
-    }
-
-    public long get() throws IOException {
-      synchronized (this) {
-        while (!finished) {
-          try {
-            wait();
-          } catch (InterruptedException e) {
-            throw new InterruptedIOException();
-          }
-        }
-        if (error != null) {
-          Throwables.propagateIfPossible(error, IOException.class);
-          throw new RuntimeException(error);
-        }
-        return size;
-      }
-    }
-  }
-
   private final EventLoop eventLoop;
 
   private AsyncFSOutput output;
@@ -166,8 +124,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
   }
 
   @Override
-  public <A> void sync(CompletionHandler<Long, A> handler, A attachment) {
-    output.flush(attachment, handler, false);
+  public CompletableFuture<Long> sync() {
+    return output.flush(false);
   }
 
   @Override
@@ -197,10 +155,24 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
     this.asyncOutputWrapper = new OutputStreamWrapper(output);
   }
 
+  private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
+    CompletableFuture<Long> future = new CompletableFuture<Long>();
+    eventLoop.execute(() -> action.accept(future));
+    try {
+      return future.get().longValue();
+    } catch (InterruptedException e) {
+      InterruptedIOException ioe = new InterruptedIOException();
+      ioe.initCause(e);
+      throw ioe;
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause(), IOException.class);
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
   @Override
   protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
-    final BlockingCompletionHandler handler = new BlockingCompletionHandler();
-    eventLoop.execute(() -> {
+    return write(future -> {
       output.write(magic);
       try {
         header.writeDelimitedTo(asyncOutputWrapper);
@@ -208,16 +180,19 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
         // should not happen
         throw new AssertionError(e);
       }
-      output.flush(null, handler, false);
+      output.flush(false).whenComplete((len, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+        } else {
+          future.complete(len);
+        }
+      });
     });
-    return handler.get();
   }
 
   @Override
-  protected long writeWALTrailerAndMagic(WALTrailer trailer, final byte[] magic)
-      throws IOException {
-    final BlockingCompletionHandler handler = new BlockingCompletionHandler();
-    eventLoop.execute(() -> {
+  protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
+    return write(future -> {
       try {
         trailer.writeTo(asyncOutputWrapper);
       } catch (IOException e) {
@@ -226,9 +201,14 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
       }
       output.writeInt(trailer.getSerializedSize());
       output.write(magic);
-      output.flush(null, handler, false);
+      output.flush(false).whenComplete((len, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+        } else {
+          future.complete(len);
+        }
+      });
     });
-    return handler.get();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 1a5b140..328f1b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.channels.CompletionHandler;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -85,7 +85,7 @@ public interface WALProvider {
   }
 
   interface AsyncWriter extends Closeable {
-    <A> void sync(CompletionHandler<Long, A> handler, A attachment);
+    CompletableFuture<Long> sync();
     void append(WAL.Entry entry);
     long getLength();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java
deleted file mode 100644
index 58b5301..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io.asyncfs;
-
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.ExecutionException;
-
-public final class FanOutOneBlockAsyncDFSOutputFlushHandler
-    implements CompletionHandler<Long, Void> {
-
-  private long size;
-
-  private Throwable error;
-
-  private boolean finished;
-
-  @Override
-  public synchronized void completed(Long result, Void attachment) {
-    size = result.longValue();
-    finished = true;
-    notifyAll();
-  }
-
-  @Override
-  public synchronized void failed(Throwable exc, Void attachment) {
-    error = exc;
-    finished = true;
-    notifyAll();
-  }
-
-  public synchronized long get() throws InterruptedException, ExecutionException {
-    while (!finished) {
-      wait();
-    }
-    if (error != null) {
-      throw new ExecutionException(error);
-    }
-    return size;
-  }
-
-  public void reset() {
-    size = 0L;
-    error = null;
-    finished = false;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index a6d3177..7897472 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -107,17 +107,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       throws IOException, InterruptedException, ExecutionException {
     final byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
-    final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
-        new FanOutOneBlockAsyncDFSOutputFlushHandler();
-    eventLoop.execute(new Runnable() {
-
-      @Override
-      public void run() {
-        out.write(b, 0, b.length);
-        out.flush(null, handler, false);
-      }
-    });
-    assertEquals(b.length, handler.get());
+    out.write(b, 0, b.length);
+    assertEquals(b.length, out.flush(false).get().longValue());
     out.close();
     assertEquals(b.length, dfs.getFileStatus(f).getLen());
     byte[] actual = new byte[b.length];
@@ -144,31 +135,14 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
     final byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
-    final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
-        new FanOutOneBlockAsyncDFSOutputFlushHandler();
-    eventLoop.execute(new Runnable() {
-
-      @Override
-      public void run() {
-        out.write(b, 0, b.length);
-        out.flush(null, handler, false);
-      }
-    });
-    handler.get();
+    out.write(b, 0, b.length);
+    out.flush(false).get();
     // restart one datanode which causes one connection broken
     TEST_UTIL.getDFSCluster().restartDataNode(0);
     try {
-      handler.reset();
-      eventLoop.execute(new Runnable() {
-
-        @Override
-        public void run() {
-          out.write(b, 0, b.length);
-          out.flush(null, handler, false);
-        }
-      });
+      out.write(b, 0, b.length);
       try {
-        handler.get();
+        out.flush(false).get();
         fail("flush should fail");
       } catch (ExecutionException e) {
         // we restarted one datanode so the flush should fail
@@ -254,17 +228,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       true, false, (short) 3, 1024 * 1024 * 1024, eventLoop);
     byte[] b = new byte[50 * 1024 * 1024];
     ThreadLocalRandom.current().nextBytes(b);
-    FanOutOneBlockAsyncDFSOutputFlushHandler handler =
-        new FanOutOneBlockAsyncDFSOutputFlushHandler();
-    eventLoop.execute(new Runnable() {
-
-      @Override
-      public void run() {
-        out.write(b);
-        out.flush(null, handler, false);
-      }
-    });
-    assertEquals(b.length, handler.get());
+    out.write(b);
+    out.flush(false);
+    assertEquals(b.length, out.flush(false).get().longValue());
     out.close();
     assertEquals(b.length, FS.getFileStatus(f).getLen());
     byte[] actual = new byte[b.length];

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index 04cb0ef..6bd2d3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -60,10 +60,8 @@ public class TestLocalAsyncOutput {
       fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next());
     byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
-    FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
     out.write(b);
-    out.flush(null, handler, true);
-    assertEquals(b.length, handler.get());
+    assertEquals(b.length, out.flush(true).get().longValue());
     out.close();
     assertEquals(b.length, fs.getFileStatus(f).getLen());
     byte[] actual = new byte[b.length];

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index 89c7996..e05d869 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -17,11 +17,19 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
-import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.AES_CTR_NOPADDING;
-import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
 
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
@@ -38,6 +46,9 @@ import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
@@ -45,7 +56,6 @@ import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
 import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.minikdc.MiniKdc;
@@ -113,7 +123,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
     List<Object[]> params = new ArrayList<>();
     for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
       for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
-        for (String cipherSuite : Arrays.asList("", AES_CTR_NOPADDING)) {
+        for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
           for (boolean useTransparentEncryption : Arrays.asList(false, true)) {
             params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite,
                 useTransparentEncryption });
@@ -125,17 +135,15 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
   }
 
   private static void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
-    // change XXX_USER_NAME_KEY to XXX_KERBEROS_PRINCIPAL_KEY after we drop support for hadoop-2.4.1
-    conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm());
-    conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
-    conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm());
-    conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
-    conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
-      HTTP_PRINCIPAL + "@" + KDC.getRealm());
-    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
-    conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
-    conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
+    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm());
+    conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
+    conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm());
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
 
     File keystoresDir = new File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath());
     keystoresDir.mkdirs();
@@ -146,32 +154,13 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
   }
 
   private static void setUpKeyProvider(Configuration conf) throws Exception {
-    Class<?> keyProviderFactoryClass;
-    try {
-      keyProviderFactoryClass = Class.forName("org.apache.hadoop.crypto.key.KeyProviderFactory");
-    } catch (ClassNotFoundException e) {
-      // should be hadoop 2.5-, give up
-      TEST_TRANSPARENT_ENCRYPTION = false;
-      return;
-    }
-
     URI keyProviderUri =
         new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
     conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
-    Method getKeyProviderMethod =
-        keyProviderFactoryClass.getMethod("get", URI.class, Configuration.class);
-    Object keyProvider = getKeyProviderMethod.invoke(null, keyProviderUri, conf);
-    Class<?> keyProviderClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider");
-    Class<?> keyProviderOptionsClass =
-        Class.forName("org.apache.hadoop.crypto.key.KeyProvider$Options");
-    Method createKeyMethod =
-        keyProviderClass.getMethod("createKey", String.class, keyProviderOptionsClass);
-    Object options = keyProviderOptionsClass.getConstructor(Configuration.class).newInstance(conf);
-    createKeyMethod.invoke(keyProvider, TEST_KEY_NAME, options);
-    Method flushMethod = keyProviderClass.getMethod("flush");
-    flushMethod.invoke(keyProvider);
-    Method closeMethod = keyProviderClass.getMethod("close");
-    closeMethod.invoke(keyProvider);
+    KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
+    keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
+    keyProvider.flush();
+    keyProvider.close();
   }
 
   @BeforeClass
@@ -231,7 +220,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
       TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
     }
 
-    TEST_UTIL.startMiniDFSCluster(3);
+    TEST_UTIL.startMiniDFSCluster(1);
     FS = TEST_UTIL.getDFSCluster().getFileSystem();
     testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
     FS.mkdirs(testDirOnTestFs);

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
index b64d458..72fc4b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
@@ -17,27 +17,26 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import com.google.common.base.Throwables;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputFlushHandler;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.base.Throwables;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-
 @Category({ RegionServerTests.class, MediumTests.class })
 public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.AsyncWriter> {
 
@@ -68,14 +67,12 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
 
   @Override
   protected void sync(AsyncWriter writer) throws IOException {
-    FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
-    writer.sync(handler, null);
     try {
-      handler.get();
+      writer.sync().get();
     } catch (InterruptedException e) {
       throw new InterruptedIOException();
     } catch (ExecutionException e) {
-      Throwables.propagateIfPossible(e.getCause(), IOException.class);
+      Throwables.propagateIfPossible(e.getCause());
       throw new IOException(e.getCause());
     }
   }


[2/2] hbase git commit: HBASE-16968 Refactor FanOutOneBlockAsyncDFSOutput

Posted by zh...@apache.org.
HBASE-16968 Refactor FanOutOneBlockAsyncDFSOutput


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/45a25942
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/45a25942
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/45a25942

Branch: refs/heads/master
Commit: 45a2594249dcb624eca664c09fcfae414916ac6b
Parents: d0be36d
Author: zhangduo <zh...@apache.org>
Authored: Sun Oct 30 20:03:08 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Nov 1 09:19:02 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java  |  10 +-
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java   |  28 +-
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   | 210 ++----
 .../FanOutOneBlockAsyncDFSOutputHelper.java     | 257 ++-----
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 702 +++++--------------
 .../hbase/regionserver/wal/AsyncFSWAL.java      | 190 +++--
 .../wal/AsyncProtobufLogWriter.java             |  94 +--
 .../apache/hadoop/hbase/wal/WALProvider.java    |   4 +-
 ...anOutOneBlockAsyncDFSOutputFlushHandler.java |  61 --
 .../TestFanOutOneBlockAsyncDFSOutput.java       |  52 +-
 .../hbase/io/asyncfs/TestLocalAsyncOutput.java  |   4 +-
 .../TestSaslFanOutOneBlockAsyncDFSOutput.java   |  67 +-
 .../regionserver/wal/TestAsyncProtobufLog.java  |  21 +-
 13 files changed, 502 insertions(+), 1198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index 0c60d3cf..7d513db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -39,8 +40,8 @@ public interface AsyncFSOutput extends Closeable {
   void write(byte[] b);
 
   /**
-   * Copy the data into the buffer. Note that you need to call
-   * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually.
+   * Copy the data into the buffer. Note that you need to call {@link #flush(boolean)} to flush the
+   * buffer manually.
    */
   void write(byte[] b, int off, int len);
 
@@ -66,11 +67,10 @@ public interface AsyncFSOutput extends Closeable {
 
   /**
    * Flush the buffer out.
-   * @param attachment will be passed to handler when completed.
-   * @param handler will set the acked length as result when completed.
    * @param sync persistent the data to device
+   * @return A CompletableFuture that hold the acked length after flushing.
    */
-  <A> void flush(A attachment, final CompletionHandler<Long, ? super A> handler, boolean sync);
+  CompletableFuture<Long> flush(boolean sync);
 
   /**
    * The close method when error occurred.

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index c9d4e70..7fe86be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -25,7 +25,7 @@ import io.netty.channel.EventLoop;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -80,11 +80,7 @@ public final class AsyncFSOutputHelper {
         if (eventLoop.inEventLoop()) {
           out.write(b, off, len);
         } else {
-          eventLoop.submit(new Runnable() {
-            public void run() {
-              out.write(b, off, len);
-            }
-          }).syncUninterruptibly();
+          eventLoop.submit(() -> out.write(b, off, len)).syncUninterruptibly();
         }
       }
 
@@ -103,15 +99,14 @@ public final class AsyncFSOutputHelper {
         return new DatanodeInfo[0];
       }
 
-      private <A> void flush0(A attachment, CompletionHandler<Long, ? super A> handler,
-          boolean sync) {
+      private void flush0(CompletableFuture<Long> future, boolean sync) {
         try {
           synchronized (out) {
             fsOut.write(out.getBuffer(), 0, out.size());
             out.reset();
           }
         } catch (IOException e) {
-          eventLoop.execute(() -> handler.failed(e, attachment));
+          eventLoop.execute(() -> future.completeExceptionally(e));
           return;
         }
         try {
@@ -120,17 +115,18 @@ public final class AsyncFSOutputHelper {
           } else {
             fsOut.hflush();
           }
-          final long pos = fsOut.getPos();
-          eventLoop.execute(() -> handler.completed(pos, attachment));
-        } catch (final IOException e) {
-          eventLoop.execute(() -> handler.failed(e, attachment));
+          long pos = fsOut.getPos();
+          eventLoop.execute(() -> future.complete(pos));
+        } catch (IOException e) {
+          eventLoop.execute(() -> future.completeExceptionally(e));
         }
       }
 
       @Override
-      public <A> void flush(A attachment, CompletionHandler<Long, ? super A> handler,
-          boolean sync) {
-        flushExecutor.execute(() -> flush0(attachment, handler, sync));
+      public CompletableFuture<Long> flush(boolean sync) {
+        CompletableFuture<Long> future = new CompletableFuture<>();
+        flushExecutor.execute(() -> flush0(future, sync));
+        return future;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 916e534..02ffcd5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -26,8 +26,6 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 
-import com.google.common.base.Supplier;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
@@ -39,14 +37,11 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
 import io.netty.util.concurrent.PromiseCombiner;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
@@ -54,13 +49,15 @@ import java.util.Deque;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.Encryptor;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
-import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -87,8 +84,8 @@ import org.apache.hadoop.util.DataChecksum;
  * need one thread here. But be careful, we do some blocking operations in {@link #close()} and
  * {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside
  * {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)},
- * {@link #buffered()} and {@link #flush(Object, CompletionHandler, boolean)}, if you call them
- * outside {@link EventLoop}, there will be an extra context-switch.
+ * {@link #buffered()} and {@link #flush(boolean)}, if you call them outside {@link EventLoop},
+ * there will be an extra context-switch.
  * <p>
  * Advantages compare to DFSOutputStream:
  * <ol>
@@ -125,7 +122,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   private final LocatedBlock locatedBlock;
 
-  private final CryptoCodec cryptoCodec;
+  private final Encryptor encryptor;
 
   private final EventLoop eventLoop;
 
@@ -151,8 +148,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
       if (replicas.isEmpty()) {
         this.unfinishedReplicas = Collections.emptySet();
       } else {
-        this.unfinishedReplicas = Collections
-            .newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size()));
+        this.unfinishedReplicas =
+            Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size()));
         this.unfinishedReplicas.addAll(replicas);
       }
     }
@@ -215,13 +212,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     // disable further write, and fail all pending ack.
     state = State.BROKEN;
     Throwable error = errorSupplier.get();
-    for (Callback c : waitingAckQueue) {
-      c.promise.tryFailure(error);
-    }
+    waitingAckQueue.stream().forEach(c -> c.promise.tryFailure(error));
     waitingAckQueue.clear();
-    for (Channel ch : datanodeList) {
-      ch.close();
-    }
+    datanodeList.forEach(ch -> ch.close());
   }
 
   @Sharable
@@ -234,29 +227,16 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     }
 
     @Override
-    protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack)
-        throws Exception {
-      final Status reply = getStatus(ack);
+    protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
+      Status reply = getStatus(ack);
       if (reply != Status.SUCCESS) {
-        failed(ctx.channel(), new Supplier<Throwable>() {
-
-          @Override
-          public Throwable get() {
-            return new IOException("Bad response " + reply + " for block " + locatedBlock.getBlock()
-                + " from datanode " + ctx.channel().remoteAddress());
-          }
-        });
+        failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block "
+            + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
         return;
       }
       if (PipelineAck.isRestartOOBStatus(reply)) {
-        failed(ctx.channel(), new Supplier<Throwable>() {
-
-          @Override
-          public Throwable get() {
-            return new IOException("Restart response " + reply + " for block "
-                + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
-          }
-        });
+        failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block "
+            + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
         return;
       }
       if (ack.getSeqno() == HEART_BEAT_SEQNO) {
@@ -266,25 +246,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     }
 
     @Override
-    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
-      failed(ctx.channel(), new Supplier<Throwable>() {
-
-        @Override
-        public Throwable get() {
-          return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed");
-        }
-      });
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      failed(ctx.channel(),
+        () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
-      failed(ctx.channel(), new Supplier<Throwable>() {
-
-        @Override
-        public Throwable get() {
-          return cause;
-        }
-      });
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+      failed(ctx.channel(), () -> cause);
     }
 
     @Override
@@ -292,13 +261,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
       if (evt instanceof IdleStateEvent) {
         IdleStateEvent e = (IdleStateEvent) evt;
         if (e.state() == READER_IDLE) {
-          failed(ctx.channel(), new Supplier<Throwable>() {
-
-            @Override
-            public Throwable get() {
-              return new IOException("Timeout(" + timeoutMs + "ms) waiting for response");
-            }
-          });
+          failed(ctx.channel(),
+            () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
         } else if (e.state() == WRITER_IDLE) {
           PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
           int len = heartbeat.getSerializedSize();
@@ -326,7 +290,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
       DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
-      LocatedBlock locatedBlock, CryptoCodec cryptoCodec, EventLoop eventLoop,
+      LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop,
       List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) {
     this.conf = conf;
     this.fsUtils = fsUtils;
@@ -337,7 +301,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     this.clientName = clientName;
     this.src = src;
     this.locatedBlock = locatedBlock;
-    this.cryptoCodec = cryptoCodec;
+    this.encryptor = encryptor;
     this.eventLoop = eventLoop;
     this.datanodeList = datanodeList;
     this.summer = summer;
@@ -350,14 +314,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   private void writeInt0(int i) {
     buf.ensureWritable(4);
-    if (cryptoCodec == null) {
-      buf.writeInt(i);
-    } else {
-      ByteBuffer inBuffer = ByteBuffer.allocate(4);
-      inBuffer.putInt(0, i);
-      cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), 4));
-      buf.writerIndex(buf.writerIndex() + 4);
-    }
+    buf.writeInt(i);
   }
 
   @Override
@@ -370,14 +327,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
   }
 
   private void write0(ByteBuffer bb) {
-    int len = bb.remaining();
-    buf.ensureWritable(len);
-    if (cryptoCodec == null) {
-      buf.writeBytes(bb);
-    } else {
-      cryptoCodec.encrypt(bb, buf.nioBuffer(buf.writerIndex(), len));
-      buf.writerIndex(buf.writerIndex() + len);
-    }
+    buf.ensureWritable(bb.remaining());
+    buf.writeBytes(bb);
   }
 
   @Override
@@ -394,19 +345,13 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     write(b, 0, b.length);
   }
 
-  private void write0(byte[] b, final int off, final int len) {
+  private void write0(byte[] b, int off, int len) {
     buf.ensureWritable(len);
-    if (cryptoCodec == null) {
-      buf.writeBytes(b, off, len);
-    } else {
-      ByteBuffer inBuffer = ByteBuffer.wrap(b, off, len);
-      cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), len));
-      buf.writerIndex(buf.writerIndex() + len);
-    }
+    buf.writeBytes(b, off, len);
   }
 
   @Override
-  public void write(final byte[] b, final int off, final int len) {
+  public void write(byte[] b, int off, int len) {
     if (eventLoop.inEventLoop()) {
       write0(b, off, len);
     } else {
@@ -464,27 +409,40 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     return promise;
   }
 
-  private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler,
-      boolean syncBlock) {
+  private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
     if (state != State.STREAMING) {
-      handler.failed(new IOException("stream already broken"), attachment);
+      future.completeExceptionally(new IOException("stream already broken"));
       return;
     }
     int dataLen = buf.readableBytes();
-    final long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
+    if (encryptor != null) {
+      ByteBuf encryptBuf = alloc.directBuffer(dataLen);
+      try {
+        encryptor.encrypt(buf.nioBuffer(buf.readerIndex(), dataLen),
+          encryptBuf.nioBuffer(0, dataLen));
+      } catch (IOException e) {
+        encryptBuf.release();
+        future.completeExceptionally(e);
+        return;
+      }
+      encryptBuf.writerIndex(dataLen);
+      buf.release();
+      buf = encryptBuf;
+    }
+    long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
     if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) {
       // no new data, just return
-      handler.completed(locatedBlock.getBlock().getNumBytes(), attachment);
+      future.complete(locatedBlock.getBlock().getNumBytes());
       return;
     }
     Callback c = waitingAckQueue.peekLast();
     if (c != null && lengthAfterFlush == c.ackedLength) {
       // just append it to the tail of waiting ack queue,, do not issue new hflush request.
-      waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(future -> {
-        if (future.isSuccess()) {
-          handler.completed(lengthAfterFlush, attachment);
+      waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(f -> {
+        if (f.isSuccess()) {
+          future.complete(lengthAfterFlush);
         } else {
-          handler.failed(future.cause(), attachment);
+          future.completeExceptionally(f.cause());
         }
       }), lengthAfterFlush, Collections.<Channel> emptyList()));
       return;
@@ -506,11 +464,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     } else {
       promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock);
     }
-    promise.addListener(future -> {
-      if (future.isSuccess()) {
-        handler.completed(lengthAfterFlush, attachment);
+    promise.addListener(f -> {
+      if (f.isSuccess()) {
+        future.complete(lengthAfterFlush);
       } else {
-        handler.failed(future.cause(), attachment);
+        future.completeExceptionally(f.cause());
       }
     });
     int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
@@ -525,23 +483,17 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   /**
    * Flush the buffer out to datanodes.
-   * @param attachment will be passed to handler when completed.
-   * @param handler will set the acked length as result when completed.
    * @param syncBlock will call hsync if true, otherwise hflush.
+   * @return A CompletableFuture that hold the acked length after flushing.
    */
-  public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler,
-      final boolean syncBlock) {
+  public CompletableFuture<Long> flush(boolean syncBlock) {
+    CompletableFuture<Long> future = new CompletableFuture<Long>();
     if (eventLoop.inEventLoop()) {
-      flush0(attachment, handler, syncBlock);
+      flush0(future, syncBlock);
     } else {
-      eventLoop.execute(new Runnable() {
-
-        @Override
-        public void run() {
-          flush0(attachment, handler, syncBlock);
-        }
-      });
+      eventLoop.execute(() -> flush0(future, syncBlock));
     }
+    return future;
   }
 
   private void endBlock(Promise<Void> promise, long size) {
@@ -558,13 +510,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     buf.release();
     buf = null;
     int headerLen = header.getSerializedSize();
-    ByteBuf headerBuf = alloc.buffer(headerLen);
+    ByteBuf headerBuf = alloc.directBuffer(headerLen);
     header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
     headerBuf.writerIndex(headerLen);
     waitingAckQueue.add(new Callback(promise, size, datanodeList));
-    for (Channel ch : datanodeList) {
-      ch.writeAndFlush(headerBuf.duplicate().retain());
-    }
+    datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.duplicate().retain()));
     headerBuf.release();
   }
 
@@ -574,10 +524,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
   @Override
   public void recoverAndClose(CancelableProgressable reporter) throws IOException {
     assert !eventLoop.inEventLoop();
-    for (Channel ch : datanodeList) {
-      ch.closeFuture().awaitUninterruptibly();
-    }
-    endFileLease(client, src, fileId);
+    datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
+    endFileLease(client, fileId);
     fsUtils.recoverFileLease(dfs, new Path(src), conf,
       reporter == null ? new CancelOnClose(client) : reporter);
   }
@@ -589,26 +537,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
   @Override
   public void close() throws IOException {
     assert !eventLoop.inEventLoop();
-    final Promise<Void> promise = eventLoop.newPromise();
-    eventLoop.execute(new Runnable() {
-
-      @Override
-      public void run() {
-        endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes());
-      }
-    });
-    promise.addListener(new FutureListener<Void>() {
-
-      @Override
-      public void operationComplete(Future<Void> future) throws Exception {
-        for (Channel ch : datanodeList) {
-          ch.close();
-        }
-      }
-    }).syncUninterruptibly();
-    for (Channel ch : datanodeList) {
-      ch.closeFuture().awaitUninterruptibly();
-    }
+    Promise<Void> promise = eventLoop.newPromise();
+    eventLoop.execute(() -> endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes()));
+    promise.addListener(f -> datanodeList.forEach(ch -> ch.close())).syncUninterruptibly();
+    datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
     completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 51c48ce..875ff77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -21,7 +21,7 @@ import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
 import static io.netty.handler.timeout.IdleState.READER_IDLE;
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
-import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createCryptoCodec;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
@@ -66,6 +66,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.Encryptor;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemLinkResolver;
@@ -74,7 +76,6 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -96,7 +97,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
@@ -143,26 +143,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
 
-  // StorageType enum is added in hadoop 2.4, but it is moved to another package in hadoop 2.6 and
-  // the setter method in OpWriteBlockProto is also added in hadoop 2.6. So we need to skip the
-  // setStorageType call if it is hadoop 2.5 or before. See createStorageTypeSetter for more
-  // details.
+  // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
+  // we need to use reflection to set it.See createStorageTypeSetter for more details.
   private interface StorageTypeSetter {
     OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
   }
 
   private static final StorageTypeSetter STORAGE_TYPE_SETTER;
 
-  // helper class for calling create method on namenode. There is a supportedVersions parameter for
-  // hadoop 2.6 or after. See createFileCreater for more details.
-  private interface FileCreater {
-    HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
-        String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
-        short replication, long blockSize) throws IOException;
-  }
-
-  private static final FileCreater FILE_CREATER;
-
   // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
   // hadoop 2.8 or later. See createBlockAdder for more details.
   private interface BlockAdder {
@@ -174,13 +162,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static final BlockAdder BLOCK_ADDER;
 
-  // helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and
-  // hadoop 2.5 or after use inodeId. See createLeaseManager for more details.
   private interface LeaseManager {
 
-    void begin(DFSClient client, String src, long inodeId);
+    void begin(DFSClient client, long inodeId);
 
-    void end(DFSClient client, String src, long inodeId);
+    void end(DFSClient client, long inodeId);
   }
 
   private static final LeaseManager LEASE_MANAGER;
@@ -197,7 +183,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   // helper class for convert protos.
   private interface PBHelper {
 
-    ExtendedBlockProto convert(final ExtendedBlock b);
+    ExtendedBlockProto convert(ExtendedBlock b);
 
     TokenProto convert(Token<?> tok);
   }
@@ -212,7 +198,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   private static final ChecksumCreater CHECKSUM_CREATER;
 
   private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
-    final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+    Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
     isClientRunningMethod.setAccessible(true);
     return new DFSClientAdaptor() {
 
@@ -227,16 +213,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     };
   }
 
-  private static LeaseManager createLeaseManager25() throws NoSuchMethodException {
-    final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
-      long.class, DFSOutputStream.class);
+  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+    Method beginFileLeaseMethod =
+        DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
     beginFileLeaseMethod.setAccessible(true);
-    final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
     endFileLeaseMethod.setAccessible(true);
     return new LeaseManager() {
 
       @Override
-      public void begin(DFSClient client, String src, long inodeId) {
+      public void begin(DFSClient client, long inodeId) {
         try {
           beginFileLeaseMethod.invoke(client, inodeId, null);
         } catch (IllegalAccessException | InvocationTargetException e) {
@@ -245,7 +231,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       }
 
       @Override
-      public void end(DFSClient client, String src, long inodeId) {
+      public void end(DFSClient client, long inodeId) {
         try {
           endFileLeaseMethod.invoke(client, inodeId);
         } catch (IllegalAccessException | InvocationTargetException e) {
@@ -255,66 +241,28 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     };
   }
 
-  private static LeaseManager createLeaseManager24() throws NoSuchMethodException {
-    final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
-      String.class, DFSOutputStream.class);
-    beginFileLeaseMethod.setAccessible(true);
-    final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
-      String.class);
-    endFileLeaseMethod.setAccessible(true);
-    return new LeaseManager() {
-
-      @Override
-      public void begin(DFSClient client, String src, long inodeId) {
-        try {
-          beginFileLeaseMethod.invoke(client, src, null);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public void end(DFSClient client, String src, long inodeId) {
-        try {
-          endFileLeaseMethod.invoke(client, src);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
-    try {
-      return createLeaseManager25();
-    } catch (NoSuchMethodException e) {
-      LOG.debug("No inodeId related lease methods found, should be hadoop 2.4-", e);
-    }
-    return createLeaseManager24();
-  }
-
   private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
       throws NoSuchMethodException {
-    final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+    Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
     @SuppressWarnings("rawtypes")
     Class<? extends Enum> ecnClass;
     try {
       ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
           .asSubclass(Enum.class);
     } catch (ClassNotFoundException e) {
-      final String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
+      String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
           + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
           + "HBASE-16110 for more information.";
       LOG.error(msg, e);
       throw new Error(msg, e);
     }
     @SuppressWarnings("unchecked")
-    final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
-    final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
-    final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass,
-      Status.class);
-    final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader",
-      int.class);
+    Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+    Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+    Method combineHeaderMethod =
+        PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
+    Method getStatusFromHeaderMethod =
+        PipelineAck.class.getMethod("getStatusFromHeader", int.class);
     return new PipelineAckStatusGetter() {
 
       @Override
@@ -339,7 +287,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
       throws NoSuchMethodException {
-    final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+    Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
     return new PipelineAckStatusGetter() {
 
       @Override
@@ -363,30 +311,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     return createPipelineAckStatusGetter26();
   }
 
-  private static StorageTypeSetter createStorageTypeSetter() {
-    final Method setStorageTypeMethod;
-    try {
-      setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType",
-        StorageTypeProto.class);
-    } catch (NoSuchMethodException e) {
-      LOG.debug("noSetStorageType method found, should be hadoop 2.5-", e);
-      return new StorageTypeSetter() {
-
-        @Override
-        public Builder set(Builder builder, Enum<?> storageType) {
-          return builder;
-        }
-      };
-    }
+  private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
+    Method setStorageTypeMethod =
+        OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
     ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
     for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
       builder.put(storageTypeProto.name(), storageTypeProto);
     }
-    final ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
+    ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
     return new StorageTypeSetter() {
 
       @Override
-      public Builder set(Builder builder, Enum<?> storageType) {
+      public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
         Object protoEnum = name2ProtoEnum.get(storageType.name());
         try {
           setStorageTypeMethod.invoke(builder, protoEnum);
@@ -398,62 +334,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     };
   }
 
-  private static FileCreater createFileCreater() throws ClassNotFoundException,
-      NoSuchMethodException, IllegalAccessException, InvocationTargetException {
-    for (Method method : ClientProtocol.class.getMethods()) {
-      if (method.getName().equals("create")) {
-        final Method createMethod = method;
-        Class<?>[] paramTypes = createMethod.getParameterTypes();
-        if (paramTypes[paramTypes.length - 1] == long.class) {
-          return new FileCreater() {
-
-            @Override
-            public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
-                String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
-                short replication, long blockSize) throws IOException {
-              try {
-                return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
-                  createParent, replication, blockSize);
-              } catch (IllegalAccessException e) {
-                throw new RuntimeException(e);
-              } catch (InvocationTargetException e) {
-                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
-                throw new RuntimeException(e);
-              }
-            }
-          };
-        } else {
-          Class<?> cryptoProtocolVersionClass = Class
-              .forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
-          Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
-          final Object supported = supportedMethod.invoke(null);
-          return new FileCreater() {
-
-            @Override
-            public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
-                String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
-                short replication, long blockSize) throws IOException {
-              try {
-                return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
-                  createParent, replication, blockSize, supported);
-              } catch (IllegalAccessException e) {
-                throw new RuntimeException(e);
-              } catch (InvocationTargetException e) {
-                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
-                throw new RuntimeException(e);
-              }
-            }
-          };
-        }
-      }
-    }
-    throw new NoSuchMethodException("Can not find create method in ClientProtocol");
-  }
-
   private static BlockAdder createBlockAdder() throws NoSuchMethodException {
     for (Method method : ClientProtocol.class.getMethods()) {
       if (method.getName().equals("addBlock")) {
-        final Method addBlockMethod = method;
+        Method addBlockMethod = method;
         Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
         if (paramTypes[paramTypes.length - 1] == String[].class) {
           return new BlockAdder() {
@@ -505,8 +389,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
       helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
     }
-    final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
-    final Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+    Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+    Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
     return new PBHelper() {
 
       @Override
@@ -533,7 +417,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       throws NoSuchMethodException {
     for (Method method : confClass.getMethods()) {
       if (method.getName().equals("createChecksum")) {
-        final Method createChecksumMethod = method;
+        Method createChecksumMethod = method;
         return new ChecksumCreater() {
 
           @Override
@@ -552,7 +436,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static ChecksumCreater createChecksumCreater27(Class<?> confClass)
       throws NoSuchMethodException {
-    final Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+    Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
     createChecksumMethod.setAccessible(true);
     return new ChecksumCreater() {
 
@@ -597,14 +481,13 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     try {
       PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
       STORAGE_TYPE_SETTER = createStorageTypeSetter();
-      FILE_CREATER = createFileCreater();
       BLOCK_ADDER = createBlockAdder();
       LEASE_MANAGER = createLeaseManager();
       DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
       PB_HELPER = createPBHelper();
       CHECKSUM_CREATER = createChecksumCreater();
     } catch (Exception e) {
-      final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+      String msg = "Couldn't properly initialize access to HDFS internals. Please "
           + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
           + "HBASE-16110 for more information.";
       LOG.error(msg, e);
@@ -612,12 +495,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     }
   }
 
-  static void beginFileLease(DFSClient client, String src, long inodeId) {
-    LEASE_MANAGER.begin(client, src, inodeId);
+  static void beginFileLease(DFSClient client, long inodeId) {
+    LEASE_MANAGER.begin(client, inodeId);
   }
 
-  static void endFileLease(DFSClient client, String src, long inodeId) {
-    LEASE_MANAGER.end(client, src, inodeId);
+  static void endFileLease(DFSClient client, long inodeId) {
+    LEASE_MANAGER.end(client, inodeId);
   }
 
   static DataChecksum createChecksum(DFSClient client) {
@@ -628,8 +511,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     return PIPELINE_ACK_STATUS_GETTER.get(ack);
   }
 
-  private static void processWriteBlockResponse(Channel channel, final DatanodeInfo dnInfo,
-      final Promise<Channel> promise, final int timeoutMs) {
+  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
+      Promise<Channel> promise, int timeoutMs) {
     channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
       new ProtobufVarint32FrameDecoder(),
       new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
@@ -693,18 +576,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
     OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
     int protoLen = proto.getSerializedSize();
-    ByteBuf buffer = channel.alloc()
-        .buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+    ByteBuf buffer =
+        channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
     buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
     buffer.writeByte(Op.WRITE_BLOCK.code);
     proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
     channel.writeAndFlush(buffer);
   }
 
-  private static void initialize(Configuration conf, final Channel channel,
-      final DatanodeInfo dnInfo, final Enum<?> storageType,
-      final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client,
-      Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
+  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+      Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+      DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
+      throws IOException {
     Promise<Void> saslPromise = channel.eventLoop().newPromise();
     trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
     saslPromise.addListener(new FutureListener<Void>() {
@@ -722,14 +605,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     });
   }
 
-  private static List<Future<Channel>> connectToDataNodes(final Configuration conf,
-      final DFSClient client, String clientName, final LocatedBlock locatedBlock, long maxBytesRcvd,
-      long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
+  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
+      String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
+      BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
     Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
     DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
-    boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
-      DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
-    final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
+    boolean connectToDnViaHostname =
+        conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
     ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
     blockCopy.setNumBytes(locatedBlock.getBlockSize());
     ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
@@ -737,7 +620,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
             .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
         .setClientName(clientName).build();
     ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
-    final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+    OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
         .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
         .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
         .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
@@ -745,11 +628,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
     List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
     for (int i = 0; i < datanodeInfos.length; i++) {
-      final DatanodeInfo dnInfo = datanodeInfos[i];
-      // Use Enum here because StoregType is moved to another package in hadoop 2.6. Use StorageType
-      // will cause compilation error for hadoop 2.5 or before.
-      final Enum<?> storageType = storageTypes[i];
-      final Promise<Channel> promise = eventLoop.newPromise();
+      DatanodeInfo dnInfo = datanodeInfos[i];
+      Enum<?> storageType = storageTypes[i];
+      Promise<Channel> promise = eventLoop.newPromise();
       futureList.add(promise);
       String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
       new Bootstrap().group(eventLoop).channel(NioSocketChannel.class)
@@ -799,11 +680,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     ClientProtocol namenode = client.getNamenode();
     HdfsFileStatus stat;
     try {
-      stat = FILE_CREATER.create(namenode, src,
+      stat = namenode.create(src,
         FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
         new EnumSetWritable<CreateFlag>(
             overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
-        createParent, replication, blockSize);
+        createParent, replication, blockSize, CryptoProtocolVersion.supported());
     } catch (Exception e) {
       if (e instanceof RemoteException) {
         throw (RemoteException) e;
@@ -811,7 +692,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         throw new NameNodeException(e);
       }
     }
-    beginFileLease(client, src, stat.getFileId());
+    beginFileLease(client, stat.getFileId());
     boolean succ = false;
     LocatedBlock locatedBlock = null;
     List<Future<Channel>> futureList = null;
@@ -827,10 +708,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         // layer should retry itself if needed.
         datanodeList.add(future.syncUninterruptibly().getNow());
       }
-      CryptoCodec cryptocodec = createCryptoCodec(conf, stat, client);
-      FanOutOneBlockAsyncDFSOutput output = new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs,
-          client, namenode, clientName, src, stat.getFileId(), locatedBlock, cryptocodec, eventLoop,
-          datanodeList, summer, ALLOC);
+      Encryptor encryptor = createEncryptor(conf, stat, client);
+      FanOutOneBlockAsyncDFSOutput output =
+          new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
+              stat.getFileId(), locatedBlock, encryptor, eventLoop, datanodeList, summer, ALLOC);
       succ = true;
       return output;
     } finally {
@@ -848,7 +729,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
             });
           }
         }
-        endFileLease(client, src, stat.getFileId());
+        endFileLease(client, stat.getFileId());
         fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
       }
     }
@@ -859,9 +740,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
    * inside {@link EventLoop}.
    * @param eventLoop all connections to datanode will use the same event loop.
    */
-  public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem dfs, Path f,
-      final boolean overwrite, final boolean createParent, final short replication,
-      final long blockSize, final EventLoop eventLoop) throws IOException {
+  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
+      boolean overwrite, boolean createParent, short replication, long blockSize,
+      EventLoop eventLoop) throws IOException {
     return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
 
       @Override
@@ -890,7 +771,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     for (int retry = 0;; retry++) {
       try {
         if (namenode.complete(src, clientName, block, fileId)) {
-          endFileLease(client, src, fileId);
+          endFileLease(client, fileId);
           return;
         } else {
           LOG.warn("complete file " + src + " not finished, retry = " + retry);