You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/01 01:23:26 UTC
[1/2] hbase git commit: HBASE-16968 Refactor
FanOutOneBlockAsyncDFSOutput
Repository: hbase
Updated Branches:
refs/heads/master d0be36deb -> 45a259424
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index a222e1b..c0121d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -18,12 +18,11 @@
package org.apache.hadoop.hbase.io.asyncfs;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.CodedOutputStream;
@@ -47,13 +46,13 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
-import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -73,9 +72,17 @@ import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.Decryptor;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hdfs.DFSClient;
@@ -83,9 +90,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
@@ -110,469 +118,105 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
private static final String NAME_DELIMITER = " ";
- @VisibleForTesting
- static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites";
-
- @VisibleForTesting
- static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
-
private interface SaslAdaptor {
- SaslPropertiesResolver getSaslPropsResolver(DFSClient client);
+ TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
- TrustedChannelResolver getTrustedChannelResolver(DFSClient client);
+ SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
- AtomicBoolean getFallbackToSimpleAuth(DFSClient client);
-
- DataEncryptionKey createDataEncryptionKey(DFSClient client);
+ AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
}
private static final SaslAdaptor SASL_ADAPTOR;
- private interface CipherOptionHelper {
-
- List<Object> getCipherOptions(Configuration conf) throws IOException;
-
- void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder,
- List<Object> cipherOptions);
-
- Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy,
- SaslClient saslClient) throws IOException;
-
- Object getCipherSuite(Object cipherOption);
-
- byte[] getInKey(Object cipherOption);
+ // helper class for convert protos.
+ private interface PBHelper {
- byte[] getInIv(Object cipherOption);
+ List<CipherOptionProto> convertCipherOptions(List<CipherOption> options);
- byte[] getOutKey(Object cipherOption);
-
- byte[] getOutIv(Object cipherOption);
+ List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options);
}
- private static final CipherOptionHelper CIPHER_OPTION_HELPER;
+ private static final PBHelper PB_HELPER;
private interface TransparentCryptoHelper {
- Object getFileEncryptionInfo(HdfsFileStatus stat);
-
- CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client)
+ Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
throws IOException;
}
private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
- static final class CryptoCodec {
-
- private static final Method CREATE_CODEC;
-
- private static final Method CREATE_ENCRYPTOR;
-
- private static final Method CREATE_DECRYPTOR;
-
- private static final Method INIT_ENCRYPTOR;
-
- private static final Method INIT_DECRYPTOR;
-
- private static final Method ENCRYPT;
-
- private static final Method DECRYPT;
-
- static {
- Class<?> cryptoCodecClass = null;
- try {
- cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec");
- } catch (ClassNotFoundException e) {
- LOG.debug("No CryptoCodec class found, should be hadoop 2.5-", e);
- }
- if (cryptoCodecClass != null) {
- Method getInstanceMethod = null;
- for (Method method : cryptoCodecClass.getMethods()) {
- if (method.getName().equals("getInstance") && method.getParameterTypes().length == 2) {
- getInstanceMethod = method;
- break;
- }
- }
- try {
- if (getInstanceMethod == null) {
- throw new NoSuchMethodException(
- "Can not find suitable getInstance method in CryptoCodec");
- }
- CREATE_CODEC = getInstanceMethod;
- CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor");
- CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor");
-
- Class<?> encryptorClass = Class.forName("org.apache.hadoop.crypto.Encryptor");
- INIT_ENCRYPTOR = encryptorClass.getMethod("init", byte[].class, byte[].class);
- ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, ByteBuffer.class);
-
- Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor");
- INIT_DECRYPTOR = decryptorClass.getMethod("init", byte[].class, byte[].class);
- DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class);
- } catch (Exception e) {
- final String msg = "Couldn't properly initialize access to HDFS internals. Please "
- + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
- + "HBASE-16110 for more information.";
- LOG.error(msg, e);
- throw new Error(msg, e);
- }
- } else {
- CREATE_CODEC = null;
- CREATE_ENCRYPTOR = null;
- CREATE_DECRYPTOR = null;
- INIT_ENCRYPTOR = null;
- INIT_DECRYPTOR = null;
- ENCRYPT = null;
- DECRYPT = null;
- }
- }
-
- private final Object encryptor;
-
- private final Object decryptor;
-
- public CryptoCodec(Configuration conf, Object cipherOption) {
- try {
- Object codec = CREATE_CODEC.invoke(null, conf,
- CIPHER_OPTION_HELPER.getCipherSuite(cipherOption));
- encryptor = CREATE_ENCRYPTOR.invoke(codec);
- byte[] encKey = CIPHER_OPTION_HELPER.getInKey(cipherOption);
- byte[] encIv = CIPHER_OPTION_HELPER.getInIv(cipherOption);
- INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length));
-
- decryptor = CREATE_DECRYPTOR.invoke(codec);
- byte[] decKey = CIPHER_OPTION_HELPER.getOutKey(cipherOption);
- byte[] decIv = CIPHER_OPTION_HELPER.getOutIv(cipherOption);
- INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length));
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- public CryptoCodec(Configuration conf, Object cipherSuite, byte[] encKey, byte[] encIv) {
- try {
- Object codec = CREATE_CODEC.invoke(null, conf, cipherSuite);
- encryptor = CREATE_ENCRYPTOR.invoke(codec);
- INIT_ENCRYPTOR.invoke(encryptor, encKey, encIv);
- decryptor = null;
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
- try {
- ENCRYPT.invoke(encryptor, inBuffer, outBuffer);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
- try {
- DECRYPT.invoke(decryptor, inBuffer, outBuffer);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private static SaslAdaptor createSaslAdaptor27(Class<?> saslDataTransferClientClass)
+ private static SaslAdaptor createSaslAdaptor()
throws NoSuchFieldException, NoSuchMethodException {
- final Field saslPropsResolverField = saslDataTransferClientClass
- .getDeclaredField("saslPropsResolver");
+ Field saslPropsResolverField =
+ SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
saslPropsResolverField.setAccessible(true);
- final Field trustedChannelResolverField = saslDataTransferClientClass
- .getDeclaredField("trustedChannelResolver");
+ Field trustedChannelResolverField =
+ SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
trustedChannelResolverField.setAccessible(true);
- final Field fallbackToSimpleAuthField = saslDataTransferClientClass
- .getDeclaredField("fallbackToSimpleAuth");
+ Field fallbackToSimpleAuthField =
+ SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
fallbackToSimpleAuthField.setAccessible(true);
- final Method getSaslDataTransferClientMethod = DFSClient.class
- .getMethod("getSaslDataTransferClient");
- final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey");
return new SaslAdaptor() {
@Override
- public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
- try {
- return (TrustedChannelResolver) trustedChannelResolverField
- .get(getSaslDataTransferClientMethod.invoke(client));
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
- try {
- return (SaslPropertiesResolver) saslPropsResolverField
- .get(getSaslDataTransferClientMethod.invoke(client));
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
+ public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
try {
- return (AtomicBoolean) fallbackToSimpleAuthField
- .get(getSaslDataTransferClientMethod.invoke(client));
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
- try {
- return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client);
- } catch (IllegalAccessException | InvocationTargetException e) {
+ return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
+ } catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
- };
- }
-
- private static SaslAdaptor createSaslAdaptor25()
- throws NoSuchFieldException, NoSuchMethodException {
- final Field trustedChannelResolverField = DFSClient.class
- .getDeclaredField("trustedChannelResolver");
- trustedChannelResolverField.setAccessible(true);
- final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
- return new SaslAdaptor() {
@Override
- public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
+ public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
try {
- return (TrustedChannelResolver) trustedChannelResolverField.get(client);
+ return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
- public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
- return null;
- }
-
- @Override
- public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
- return null;
- }
-
- @Override
- public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
+ public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
try {
- return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
- } catch (IllegalAccessException | InvocationTargetException e) {
+ return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
+ } catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
};
}
- private static SaslAdaptor createSaslAdaptor()
- throws NoSuchFieldException, NoSuchMethodException {
+ private static PBHelper createPBHelper() throws NoSuchMethodException {
+ Class<?> helperClass;
try {
- return createSaslAdaptor27(
- Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"));
- } catch (ClassNotFoundException e) {
- LOG.debug("No SaslDataTransferClient class found, should be hadoop 2.5-", e);
- }
- return createSaslAdaptor25();
- }
-
- private static CipherOptionHelper createCipherHelper25() {
- return new CipherOptionHelper() {
-
- @Override
- public byte[] getOutKey(Object cipherOption) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public byte[] getOutIv(Object cipherOption) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public byte[] getInKey(Object cipherOption) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public byte[] getInIv(Object cipherOption) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object getCipherSuite(Object cipherOption) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<Object> getCipherOptions(Configuration conf) {
- return null;
- }
-
- @Override
- public Object getCipherOption(DataTransferEncryptorMessageProto proto,
- boolean isNegotiatedQopPrivacy, SaslClient saslClient) {
- return null;
- }
-
- @Override
- public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- private static CipherOptionHelper createCipherHelper27(Class<?> cipherOptionClass)
- throws ClassNotFoundException, NoSuchMethodException {
- @SuppressWarnings("rawtypes")
- Class<? extends Enum> cipherSuiteClass = Class.forName("org.apache.hadoop.crypto.CipherSuite")
- .asSubclass(Enum.class);
- @SuppressWarnings("unchecked")
- final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING");
- final Constructor<?> cipherOptionConstructor = cipherOptionClass
- .getConstructor(cipherSuiteClass);
- final Constructor<?> cipherOptionWithKeyAndIvConstructor = cipherOptionClass
- .getConstructor(cipherSuiteClass, byte[].class, byte[].class, byte[].class, byte[].class);
-
- final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite");
- final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey");
- final Method getInIvMethod = cipherOptionClass.getMethod("getInIv");
- final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
- final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
-
- Class<?> pbHelperClass;
- try {
- pbHelperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
+ helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
} catch (ClassNotFoundException e) {
LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
- pbHelperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+ helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
}
- final Method convertCipherOptionsMethod = pbHelperClass.getMethod("convertCipherOptions",
- List.class);
- final Method convertCipherOptionProtosMethod = pbHelperClass
- .getMethod("convertCipherOptionProtos", List.class);
- final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class
- .getMethod("addAllCipherOption", Iterable.class);
- final Method getCipherOptionListMethod = DataTransferEncryptorMessageProto.class
- .getMethod("getCipherOptionList");
- return new CipherOptionHelper() {
-
- @Override
- public byte[] getOutKey(Object cipherOption) {
- try {
- return (byte[]) getOutKeyMethod.invoke(cipherOption);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public byte[] getOutIv(Object cipherOption) {
- try {
- return (byte[]) getOutIvMethod.invoke(cipherOption);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public byte[] getInKey(Object cipherOption) {
- try {
- return (byte[]) getInKeyMethod.invoke(cipherOption);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public byte[] getInIv(Object cipherOption) {
- try {
- return (byte[]) getInIvMethod.invoke(cipherOption);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Object getCipherSuite(Object cipherOption) {
- try {
- return getCipherSuiteMethod.invoke(cipherOption);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public List<Object> getCipherOptions(Configuration conf) throws IOException {
- // Negotiate cipher suites if configured. Currently, the only supported
- // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
- // values for future expansion.
- String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
- if (cipherSuites == null || cipherSuites.isEmpty()) {
- return null;
- }
- if (!cipherSuites.equals(AES_CTR_NOPADDING)) {
- throw new IOException(String.format("Invalid cipher suite, %s=%s",
- DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
- }
- Object option;
- try {
- option = cipherOptionConstructor.newInstance(aesCipherSuite);
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- List<Object> cipherOptions = Lists.newArrayListWithCapacity(1);
- cipherOptions.add(option);
- return cipherOptions;
- }
-
- private Object unwrap(Object option, SaslClient saslClient) throws IOException {
- byte[] inKey = getInKey(option);
- if (inKey != null) {
- inKey = saslClient.unwrap(inKey, 0, inKey.length);
- }
- byte[] outKey = getOutKey(option);
- if (outKey != null) {
- outKey = saslClient.unwrap(outKey, 0, outKey.length);
- }
- try {
- return cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey,
- getInIv(option), outKey, getOutIv(option));
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
+ Method convertCipherOptionsMethod = helperClass.getMethod("convertCipherOptions", List.class);
+ Method convertCipherOptionProtosMethod =
+ helperClass.getMethod("convertCipherOptionProtos", List.class);
+ return new PBHelper() {
@SuppressWarnings("unchecked")
@Override
- public Object getCipherOption(DataTransferEncryptorMessageProto proto,
- boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
- List<Object> cipherOptions;
+ public List<CipherOptionProto> convertCipherOptions(List<CipherOption> options) {
try {
- cipherOptions = (List<Object>) convertCipherOptionProtosMethod.invoke(null,
- getCipherOptionListMethod.invoke(proto));
+ return (List<CipherOptionProto>) convertCipherOptionsMethod.invoke(null, options);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
- if (cipherOptions == null || cipherOptions.isEmpty()) {
- return null;
- }
- Object cipherOption = cipherOptions.get(0);
- return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
}
+ @SuppressWarnings("unchecked")
@Override
- public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
+ public List<CipherOption> convertCipherOptionProtos(List<CipherOptionProto> options) {
try {
- addAllCipherOptionMethod.invoke(builder,
- convertCipherOptionsMethod.invoke(null, cipherOptions));
+ return (List<CipherOption>) convertCipherOptionProtosMethod.invoke(null, options);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
@@ -580,65 +224,28 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
};
}
- private static CipherOptionHelper createCipherHelper()
- throws ClassNotFoundException, NoSuchMethodException {
- Class<?> cipherOptionClass;
- try {
- cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption");
- } catch (ClassNotFoundException e) {
- LOG.debug("No CipherOption class found, should be hadoop 2.5-", e);
- return createCipherHelper25();
- }
- return createCipherHelper27(cipherOptionClass);
- }
-
- private static TransparentCryptoHelper createTransparentCryptoHelper25() {
- return new TransparentCryptoHelper() {
-
- @Override
- public Object getFileEncryptionInfo(HdfsFileStatus stat) {
- return null;
- }
-
- @Override
- public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client) {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- private static TransparentCryptoHelper createTransparentCryptoHelper27(Class<?> feInfoClass)
- throws NoSuchMethodException, ClassNotFoundException {
- final Method getFileEncryptionInfoMethod = HdfsFileStatus.class
- .getMethod("getFileEncryptionInfo");
- final Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
- .getDeclaredMethod("decryptEncryptedDataEncryptionKey", feInfoClass);
+ private static TransparentCryptoHelper createTransparentCryptoHelper()
+ throws NoSuchMethodException {
+ Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
+ .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
- final Method getCipherSuiteMethod = feInfoClass.getMethod("getCipherSuite");
- Class<?> keyVersionClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider$KeyVersion");
- final Method getMaterialMethod = keyVersionClass.getMethod("getMaterial");
- final Method getIVMethod = feInfoClass.getMethod("getIV");
return new TransparentCryptoHelper() {
@Override
- public Object getFileEncryptionInfo(HdfsFileStatus stat) {
- try {
- return getFileEncryptionInfoMethod.invoke(stat);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient client)
- throws IOException {
+ public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+ DFSClient client) throws IOException {
try {
- Object decrypted = decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
- return new CryptoCodec(conf, getCipherSuiteMethod.invoke(feInfo),
- (byte[]) getMaterialMethod.invoke(decrypted), (byte[]) getIVMethod.invoke(feInfo));
+ KeyVersion decryptedKey =
+ (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+ Encryptor encryptor = cryptoCodec.createEncryptor();
+ encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+ return encryptor;
} catch (InvocationTargetException e) {
Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
throw new RuntimeException(e.getTargetException());
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
@@ -646,25 +253,13 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
};
}
- private static TransparentCryptoHelper createTransparentCryptoHelper()
- throws NoSuchMethodException, ClassNotFoundException {
- Class<?> feInfoClass;
- try {
- feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo");
- } catch (ClassNotFoundException e) {
- LOG.debug("No FileEncryptionInfo class found, should be hadoop 2.5-", e);
- return createTransparentCryptoHelper25();
- }
- return createTransparentCryptoHelper27(feInfoClass);
- }
-
static {
try {
SASL_ADAPTOR = createSaslAdaptor();
- CIPHER_OPTION_HELPER = createCipherHelper();
+ PB_HELPER = createPBHelper();
TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
} catch (Exception e) {
- final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
@@ -748,16 +343,31 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
sendSaslMessage(ctx, payload, null);
}
- private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List<Object> options)
- throws IOException {
- DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto
- .newBuilder();
+ private List<CipherOption> getCipherOptions() throws IOException {
+ // Negotiate cipher suites if configured. Currently, the only supported
+ // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+ // values for future expansion.
+ String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+ if (StringUtils.isBlank(cipherSuites)) {
+ return null;
+ }
+ if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+ throw new IOException(String.format("Invalid cipher suite, %s=%s",
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+ }
+ return Arrays.asList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
+ }
+
+ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
+ List<CipherOption> options) throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
if (payload != null) {
builder.setPayload(ByteStringer.wrap(payload));
}
if (options != null) {
- CIPHER_OPTION_HELPER.addCipherOptions(builder, options);
+ builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options));
}
DataTransferEncryptorMessageProto proto = builder.build();
int size = proto.getSerializedSize();
@@ -798,8 +408,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
}
private boolean requestedQopContainsPrivacy() {
- Set<String> requestedQop = ImmutableSet
- .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+ Set<String> requestedQop =
+ ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
return requestedQop.contains("auth-conf");
}
@@ -807,8 +417,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
if (!saslClient.isComplete()) {
throw new IOException("Failed to complete SASL handshake");
}
- Set<String> requestedQop = ImmutableSet
- .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+ Set<String> requestedQop =
+ ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
String negotiatedQop = getNegotiatedQop();
LOG.debug(
"Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
@@ -825,48 +435,73 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
return qop != null && !"auth".equalsIgnoreCase(qop);
}
+ private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
+ byte[] inKey = option.getInKey();
+ if (inKey != null) {
+ inKey = saslClient.unwrap(inKey, 0, inKey.length);
+ }
+ byte[] outKey = option.getOutKey();
+ if (outKey != null) {
+ outKey = saslClient.unwrap(outKey, 0, outKey.length);
+ }
+ return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
+ option.getOutIv());
+ }
+
+ private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
+ boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
+ List<CipherOption> cipherOptions =
+ PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList());
+ if (cipherOptions == null || cipherOptions.isEmpty()) {
+ return null;
+ }
+ CipherOption cipherOption = cipherOptions.get(0);
+ return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
+ }
+
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof DataTransferEncryptorMessageProto) {
DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
check(proto);
byte[] challenge = proto.getPayload().toByteArray();
byte[] response = saslClient.evaluateChallenge(challenge);
switch (step) {
- case 1: {
- List<Object> cipherOptions = null;
- if (requestedQopContainsPrivacy()) {
- cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
- }
- sendSaslMessage(ctx, response, cipherOptions);
- ctx.flush();
- step++;
- break;
- }
- case 2: {
- assert response == null;
- checkSaslComplete();
- Object cipherOption = CIPHER_OPTION_HELPER.getCipherOption(proto,
- isNegotiatedQopPrivacy(), saslClient);
- ChannelPipeline p = ctx.pipeline();
- while (p.first() != null) {
- p.removeFirst();
+ case 1: {
+ List<CipherOption> cipherOptions = null;
+ if (requestedQopContainsPrivacy()) {
+ cipherOptions = getCipherOptions();
+ }
+ sendSaslMessage(ctx, response, cipherOptions);
+ ctx.flush();
+ step++;
+ break;
}
- if (cipherOption != null) {
- CryptoCodec codec = new CryptoCodec(conf, cipherOption);
- p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
- } else {
- if (useWrap()) {
- p.addLast(new SaslWrapHandler(saslClient),
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
- new SaslUnwrapHandler(saslClient));
+ case 2: {
+ assert response == null;
+ checkSaslComplete();
+ CipherOption cipherOption =
+ getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
+ ChannelPipeline p = ctx.pipeline();
+ while (p.first() != null) {
+ p.removeFirst();
+ }
+ if (cipherOption != null) {
+ CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
+ p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
+ new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
+ } else {
+ if (useWrap()) {
+ p.addLast(new SaslWrapHandler(saslClient),
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+ new SaslUnwrapHandler(saslClient));
+ }
}
+ promise.trySuccess(null);
+ break;
}
- promise.trySuccess(null);
- break;
- }
- default:
- throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+ default:
+ throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
}
} else {
ctx.fireChannelRead(msg);
@@ -961,10 +596,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
- private final CryptoCodec codec;
+ private final Decryptor decryptor;
- public DecryptHandler(CryptoCodec codec) {
- this.codec = codec;
+ public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+ throws GeneralSecurityException, IOException {
+ this.decryptor = codec.createDecryptor();
+ this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
}
@Override
@@ -981,7 +618,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
ByteBuffer inBuffer = inBuf.nioBuffer();
ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
- codec.decrypt(inBuffer, outBuffer);
+ decryptor.decrypt(inBuffer, outBuffer);
outBuf.writerIndex(inBuf.readableBytes());
if (release) {
inBuf.release();
@@ -992,11 +629,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
- private final CryptoCodec codec;
+ private final Encryptor encryptor;
- public EncryptHandler(CryptoCodec codec) {
- super(false);
- this.codec = codec;
+ public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+ throws GeneralSecurityException, IOException {
+ this.encryptor = codec.createEncryptor();
+ this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
}
@Override
@@ -1022,7 +660,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
}
ByteBuffer inBuffer = inBuf.nioBuffer();
ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
- codec.encrypt(inBuffer, outBuffer);
+ encryptor.encrypt(inBuffer, outBuffer);
out.writerIndex(inBuf.readableBytes());
if (release) {
inBuf.release();
@@ -1070,22 +708,18 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
- Promise<Void> saslPromise) {
- SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(client);
- TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(client);
- AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(client);
+ Promise<Void> saslPromise) throws IOException {
+ SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
+ SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
+ TrustedChannelResolver trustedChannelResolver =
+ SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
+ AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
saslPromise.trySuccess(null);
return;
}
- DataEncryptionKey encryptionKey;
- try {
- encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client);
- } catch (Exception e) {
- saslPromise.tryFailure(e);
- return;
- }
+ DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
if (encryptionKey != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -1131,12 +765,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
}
}
- static CryptoCodec createCryptoCodec(Configuration conf, HdfsFileStatus stat, DFSClient client)
+ static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
throws IOException {
- Object feInfo = TRANSPARENT_CRYPTO_HELPER.getFileEncryptionInfo(stat);
+ FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
if (feInfo == null) {
return null;
}
- return TRANSPARENT_CRYPTO_HELPER.createCryptoCodec(conf, feInfo, client);
+ return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index d5bccf0..279a6ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -29,7 +29,6 @@ import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.nio.channels.CompletionHandler;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Deque;
@@ -206,9 +205,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final long logRollerExitedCheckIntervalMs;
- private final ExecutorService closeExecutor = Executors
- .newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Close-WAL-Writer-%d").build());
+ private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
private volatile AsyncFSOutput fsOut;
@@ -216,8 +214,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final Deque<FSWALEntry> unackedEntries = new ArrayDeque<FSWALEntry>();
- private final PriorityQueue<SyncFuture> syncFutures = new PriorityQueue<SyncFuture>(11,
- SEQ_COMPARATOR);
+ private final PriorityQueue<SyncFuture> syncFutures =
+ new PriorityQueue<SyncFuture>(11, SEQ_COMPARATOR);
private Promise<Void> rollPromise;
@@ -285,8 +283,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
- String prefix, String suffix, EventLoop eventLoop) throws FailedLogCloseException,
- IOException {
+ String prefix, String suffix, EventLoop eventLoop)
+ throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
this.eventLoop = eventLoop;
int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200);
@@ -294,9 +292,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
createMaxRetries =
conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
- logRollerExitedCheckIntervalMs =
- conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS,
- DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS);
+ logRollerExitedCheckIntervalMs = conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS,
+ DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS);
rollWriter();
}
@@ -310,82 +307,85 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
+ private void syncFailed(Throwable error) {
+ LOG.warn("sync failed", error);
+ // Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty.
+ // When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It
+ // is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener
+ // directly if it is already in the EventLoop thread. And in the listener method, it will
+ // call us. So here we know that all failed flush request will call us continuously, and
+ // before the last one finish, no other task can be executed in EventLoop. So here we are
+ // safe to use writerBroken as a guard.
+ // Do not forget to revisit this if we change the implementation of
+ // FanOutOneBlockAsyncDFSOutput!
+ synchronized (waitingConsumePayloads) {
+ if (writerBroken) {
+ return;
+ }
+ // schedule a periodical task to check if log roller is exited. Otherwise the the sync
+ // request maybe blocked forever since we are still waiting for a new writer to write the
+ // pending data and sync it...
+ logRollerExitedChecker = new LogRollerExitedChecker();
+ // we are currently in the EventLoop thread, so it is safe to set the future after
+ // schedule it since the task can not be executed before we release the thread.
+ logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
+ logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
+ writerBroken = true;
+ }
+ for (Iterator<FSWALEntry> iter = unackedEntries.descendingIterator(); iter.hasNext();) {
+ waitingAppendEntries.addFirst(iter.next());
+ }
+ highestUnsyncedTxid = highestSyncedTxid.get();
+ if (rollPromise != null) {
+ rollPromise.trySuccess(null);
+ rollPromise = null;
+ return;
+ }
+ // request a roll.
+ if (!rollWriterLock.tryLock()) {
+ return;
+ }
+ try {
+ requestLogRoll();
+ } finally {
+ rollWriterLock.unlock();
+ }
+ }
+
+ private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
+ highestSyncedTxid.set(processedTxid);
+ int syncCount = finishSync(true);
+ for (Iterator<FSWALEntry> iter = unackedEntries.iterator(); iter.hasNext();) {
+ if (iter.next().getTxid() <= processedTxid) {
+ iter.remove();
+ } else {
+ break;
+ }
+ }
+ postSync(System.nanoTime() - startTimeNs, syncCount);
+ tryFinishRoll();
+ if (!rollWriterLock.tryLock()) {
+ return;
+ }
+ try {
+ if (writer.getLength() >= logrollsize) {
+ requestLogRoll();
+ }
+ } finally {
+ rollWriterLock.unlock();
+ }
+ }
+
private void sync(final AsyncWriter writer, final long processedTxid) {
fileLengthAtLastSync = writer.getLength();
final long startTimeNs = System.nanoTime();
- writer.sync(new CompletionHandler<Long, Void>() {
-
- @Override
- public void completed(Long result, Void attachment) {
- highestSyncedTxid.set(processedTxid);
- int syncCount = finishSync(true);
- for (Iterator<FSWALEntry> iter = unackedEntries.iterator(); iter.hasNext();) {
- if (iter.next().getTxid() <= processedTxid) {
- iter.remove();
- } else {
- break;
- }
- }
- postSync(System.nanoTime() - startTimeNs, syncCount);
- tryFinishRoll();
- if (!rollWriterLock.tryLock()) {
- return;
- }
- try {
- if (writer.getLength() >= logrollsize) {
- requestLogRoll();
- }
- } finally {
- rollWriterLock.unlock();
- }
- }
-
- @Override
- public void failed(Throwable exc, Void attachment) {
- LOG.warn("sync failed", exc);
- // Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty.
- // When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It
- // is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener
- // directly if it is already in the EventLoop thread. And in the listener method, it will
- // call us. So here we know that all failed flush request will call us continuously, and
- // before the last one finish, no other task can be executed in EventLoop. So here we are
- // safe to use writerBroken as a guard.
- // Do not forget to revisit this if we change the implementation of
- // FanOutOneBlockAsyncDFSOutput!
- synchronized (waitingConsumePayloads) {
- if (writerBroken) {
- return;
- }
- // schedule a periodical task to check if log roller is exited. Otherwise the the sync
- // request maybe blocked forever since we are still waiting for a new writer to write the
- // pending data and sync it...
- logRollerExitedChecker = new LogRollerExitedChecker();
- // we are currently in the EventLoop thread, so it is safe to set the future after
- // schedule it since the task can not be executed before we release the thread.
- logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker,
- logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS));
- writerBroken = true;
- }
- for (Iterator<FSWALEntry> iter = unackedEntries.descendingIterator(); iter.hasNext();) {
- waitingAppendEntries.addFirst(iter.next());
- }
- highestUnsyncedTxid = highestSyncedTxid.get();
- if (rollPromise != null) {
- rollPromise.trySuccess(null);
- rollPromise = null;
- return;
- }
- // request a roll.
- if (!rollWriterLock.tryLock()) {
- return;
- }
- try {
- requestLogRoll();
- } finally {
- rollWriterLock.unlock();
- }
+ writer.sync().whenComplete((result, error) -> {
+ if (error != null) {
+ syncFailed(error);
+ } else {
+ syncCompleted(writer, processedTxid, startTimeNs);
}
- }, null);
+ });
}
private void addTimeAnnotation(SyncFuture future, String annotation) {
@@ -457,13 +457,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
- private static final Comparator<SyncFuture> SEQ_COMPARATOR = new Comparator<SyncFuture>() {
-
- @Override
- public int compare(SyncFuture o1, SyncFuture o2) {
- int c = Long.compare(o1.getTxid(), o2.getTxid());
- return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
- }
+ private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
+ int c = Long.compare(o1.getTxid(), o2.getTxid());
+ return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
};
private final Runnable consumer = new Runnable() {
@@ -690,15 +686,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
long oldFileLen;
if (oldWriter != null) {
oldFileLen = oldWriter.getLength();
- closeExecutor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- oldWriter.close();
- } catch (IOException e) {
- LOG.warn("close old writer failed", e);
- }
+ closeExecutor.execute(() -> {
+ try {
+ oldWriter.close();
+ } catch (IOException e) {
+ LOG.warn("close old writer failed", e);
}
});
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index db3088c..314bef0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -25,7 +25,9 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,50 +52,6 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class);
- private static final class BlockingCompletionHandler implements CompletionHandler<Long, Void> {
-
- private long size;
-
- private Throwable error;
-
- private boolean finished;
-
- @Override
- public void completed(Long result, Void attachment) {
- synchronized (this) {
- size = result.longValue();
- finished = true;
- notifyAll();
- }
- }
-
- @Override
- public void failed(Throwable exc, Void attachment) {
- synchronized (this) {
- error = exc;
- finished = true;
- notifyAll();
- }
- }
-
- public long get() throws IOException {
- synchronized (this) {
- while (!finished) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- }
- if (error != null) {
- Throwables.propagateIfPossible(error, IOException.class);
- throw new RuntimeException(error);
- }
- return size;
- }
- }
- }
-
private final EventLoop eventLoop;
private AsyncFSOutput output;
@@ -166,8 +124,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
}
@Override
- public <A> void sync(CompletionHandler<Long, A> handler, A attachment) {
- output.flush(attachment, handler, false);
+ public CompletableFuture<Long> sync() {
+ return output.flush(false);
}
@Override
@@ -197,10 +155,24 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}
+ private long write(Consumer<CompletableFuture<Long>> action) throws IOException {
+ CompletableFuture<Long> future = new CompletableFuture<Long>();
+ eventLoop.execute(() -> action.accept(future));
+ try {
+ return future.get().longValue();
+ } catch (InterruptedException e) {
+ InterruptedIOException ioe = new InterruptedIOException();
+ ioe.initCause(e);
+ throw ioe;
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
@Override
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
- final BlockingCompletionHandler handler = new BlockingCompletionHandler();
- eventLoop.execute(() -> {
+ return write(future -> {
output.write(magic);
try {
header.writeDelimitedTo(asyncOutputWrapper);
@@ -208,16 +180,19 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
// should not happen
throw new AssertionError(e);
}
- output.flush(null, handler, false);
+ output.flush(false).whenComplete((len, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else {
+ future.complete(len);
+ }
+ });
});
- return handler.get();
}
@Override
- protected long writeWALTrailerAndMagic(WALTrailer trailer, final byte[] magic)
- throws IOException {
- final BlockingCompletionHandler handler = new BlockingCompletionHandler();
- eventLoop.execute(() -> {
+ protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException {
+ return write(future -> {
try {
trailer.writeTo(asyncOutputWrapper);
} catch (IOException e) {
@@ -226,9 +201,14 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
}
output.writeInt(trailer.getSerializedSize());
output.write(magic);
- output.flush(null, handler, false);
+ output.flush(false).whenComplete((len, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else {
+ future.complete(len);
+ }
+ });
});
- return handler.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 1a5b140..328f1b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
-import java.nio.channels.CompletionHandler;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -85,7 +85,7 @@ public interface WALProvider {
}
interface AsyncWriter extends Closeable {
- <A> void sync(CompletionHandler<Long, A> handler, A attachment);
+ CompletableFuture<Long> sync();
void append(WAL.Entry entry);
long getLength();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java
deleted file mode 100644
index 58b5301..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io.asyncfs;
-
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.ExecutionException;
-
-public final class FanOutOneBlockAsyncDFSOutputFlushHandler
- implements CompletionHandler<Long, Void> {
-
- private long size;
-
- private Throwable error;
-
- private boolean finished;
-
- @Override
- public synchronized void completed(Long result, Void attachment) {
- size = result.longValue();
- finished = true;
- notifyAll();
- }
-
- @Override
- public synchronized void failed(Throwable exc, Void attachment) {
- error = exc;
- finished = true;
- notifyAll();
- }
-
- public synchronized long get() throws InterruptedException, ExecutionException {
- while (!finished) {
- wait();
- }
- if (error != null) {
- throw new ExecutionException(error);
- }
- return size;
- }
-
- public void reset() {
- size = 0L;
- error = null;
- finished = false;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index a6d3177..7897472 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -107,17 +107,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
throws IOException, InterruptedException, ExecutionException {
final byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
- final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
- new FanOutOneBlockAsyncDFSOutputFlushHandler();
- eventLoop.execute(new Runnable() {
-
- @Override
- public void run() {
- out.write(b, 0, b.length);
- out.flush(null, handler, false);
- }
- });
- assertEquals(b.length, handler.get());
+ out.write(b, 0, b.length);
+ assertEquals(b.length, out.flush(false).get().longValue());
out.close();
assertEquals(b.length, dfs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
@@ -144,31 +135,14 @@ public class TestFanOutOneBlockAsyncDFSOutput {
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
final byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
- final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
- new FanOutOneBlockAsyncDFSOutputFlushHandler();
- eventLoop.execute(new Runnable() {
-
- @Override
- public void run() {
- out.write(b, 0, b.length);
- out.flush(null, handler, false);
- }
- });
- handler.get();
+ out.write(b, 0, b.length);
+ out.flush(false).get();
// restart one datanode which causes one connection broken
TEST_UTIL.getDFSCluster().restartDataNode(0);
try {
- handler.reset();
- eventLoop.execute(new Runnable() {
-
- @Override
- public void run() {
- out.write(b, 0, b.length);
- out.flush(null, handler, false);
- }
- });
+ out.write(b, 0, b.length);
try {
- handler.get();
+ out.flush(false).get();
fail("flush should fail");
} catch (ExecutionException e) {
// we restarted one datanode so the flush should fail
@@ -254,17 +228,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
true, false, (short) 3, 1024 * 1024 * 1024, eventLoop);
byte[] b = new byte[50 * 1024 * 1024];
ThreadLocalRandom.current().nextBytes(b);
- FanOutOneBlockAsyncDFSOutputFlushHandler handler =
- new FanOutOneBlockAsyncDFSOutputFlushHandler();
- eventLoop.execute(new Runnable() {
-
- @Override
- public void run() {
- out.write(b);
- out.flush(null, handler, false);
- }
- });
- assertEquals(b.length, handler.get());
+ out.write(b);
+ out.flush(false);
+ assertEquals(b.length, out.flush(false).get().longValue());
out.close();
assertEquals(b.length, FS.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index 04cb0ef..6bd2d3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -60,10 +60,8 @@ public class TestLocalAsyncOutput {
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next());
byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
- FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
out.write(b);
- out.flush(null, handler, true);
- assertEquals(b.length, handler.get());
+ assertEquals(b.length, out.flush(true).get().longValue());
out.close();
assertEquals(b.length, fs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index 89c7996..e05d869 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -17,11 +17,19 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;
-import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.AES_CTR_NOPADDING;
-import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
@@ -38,6 +46,9 @@ import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
@@ -45,7 +56,6 @@ import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.minikdc.MiniKdc;
@@ -113,7 +123,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
List<Object[]> params = new ArrayList<>();
for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
- for (String cipherSuite : Arrays.asList("", AES_CTR_NOPADDING)) {
+ for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
for (boolean useTransparentEncryption : Arrays.asList(false, true)) {
params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite,
useTransparentEncryption });
@@ -125,17 +135,15 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
}
private static void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
- // change XXX_USER_NAME_KEY to XXX_KERBEROS_PRINCIPAL_KEY after we drop support for hadoop-2.4.1
- conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm());
- conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
- conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm());
- conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
- conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
- HTTP_PRINCIPAL + "@" + KDC.getRealm());
- conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
- conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
- conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
- conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+ conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm());
+ conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
+ conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm());
+ conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
+ conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm());
+ conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+ conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+ conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
File keystoresDir = new File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath());
keystoresDir.mkdirs();
@@ -146,32 +154,13 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
}
private static void setUpKeyProvider(Configuration conf) throws Exception {
- Class<?> keyProviderFactoryClass;
- try {
- keyProviderFactoryClass = Class.forName("org.apache.hadoop.crypto.key.KeyProviderFactory");
- } catch (ClassNotFoundException e) {
- // should be hadoop 2.5-, give up
- TEST_TRANSPARENT_ENCRYPTION = false;
- return;
- }
-
URI keyProviderUri =
new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
- Method getKeyProviderMethod =
- keyProviderFactoryClass.getMethod("get", URI.class, Configuration.class);
- Object keyProvider = getKeyProviderMethod.invoke(null, keyProviderUri, conf);
- Class<?> keyProviderClass = Class.forName("org.apache.hadoop.crypto.key.KeyProvider");
- Class<?> keyProviderOptionsClass =
- Class.forName("org.apache.hadoop.crypto.key.KeyProvider$Options");
- Method createKeyMethod =
- keyProviderClass.getMethod("createKey", String.class, keyProviderOptionsClass);
- Object options = keyProviderOptionsClass.getConstructor(Configuration.class).newInstance(conf);
- createKeyMethod.invoke(keyProvider, TEST_KEY_NAME, options);
- Method flushMethod = keyProviderClass.getMethod("flush");
- flushMethod.invoke(keyProvider);
- Method closeMethod = keyProviderClass.getMethod("close");
- closeMethod.invoke(keyProvider);
+ KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
+ keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
+ keyProvider.flush();
+ keyProvider.close();
}
@BeforeClass
@@ -231,7 +220,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
}
- TEST_UTIL.startMiniDFSCluster(3);
+ TEST_UTIL.startMiniDFSCluster(1);
FS = TEST_UTIL.getDFSCluster().getFileSystem();
testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
FS.mkdirs(testDirOnTestFs);
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
index b64d458..72fc4b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
@@ -17,27 +17,26 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import com.google.common.base.Throwables;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputFlushHandler;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
-import com.google.common.base.Throwables;
-
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-
@Category({ RegionServerTests.class, MediumTests.class })
public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.AsyncWriter> {
@@ -68,14 +67,12 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
@Override
protected void sync(AsyncWriter writer) throws IOException {
- FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
- writer.sync(handler, null);
try {
- handler.get();
+ writer.sync().get();
} catch (InterruptedException e) {
throw new InterruptedIOException();
} catch (ExecutionException e) {
- Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ Throwables.propagateIfPossible(e.getCause());
throw new IOException(e.getCause());
}
}
[2/2] hbase git commit: HBASE-16968 Refactor
FanOutOneBlockAsyncDFSOutput
Posted by zh...@apache.org.
HBASE-16968 Refactor FanOutOneBlockAsyncDFSOutput
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/45a25942
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/45a25942
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/45a25942
Branch: refs/heads/master
Commit: 45a2594249dcb624eca664c09fcfae414916ac6b
Parents: d0be36d
Author: zhangduo <zh...@apache.org>
Authored: Sun Oct 30 20:03:08 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Nov 1 09:19:02 2016 +0800
----------------------------------------------------------------------
.../hadoop/hbase/io/asyncfs/AsyncFSOutput.java | 10 +-
.../hbase/io/asyncfs/AsyncFSOutputHelper.java | 28 +-
.../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 210 ++----
.../FanOutOneBlockAsyncDFSOutputHelper.java | 257 ++-----
.../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 702 +++++--------------
.../hbase/regionserver/wal/AsyncFSWAL.java | 190 +++--
.../wal/AsyncProtobufLogWriter.java | 94 +--
.../apache/hadoop/hbase/wal/WALProvider.java | 4 +-
...anOutOneBlockAsyncDFSOutputFlushHandler.java | 61 --
.../TestFanOutOneBlockAsyncDFSOutput.java | 52 +-
.../hbase/io/asyncfs/TestLocalAsyncOutput.java | 4 +-
.../TestSaslFanOutOneBlockAsyncDFSOutput.java | 67 +-
.../regionserver/wal/TestAsyncProtobufLog.java | 21 +-
13 files changed, 502 insertions(+), 1198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index 0c60d3cf..7d513db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -39,8 +40,8 @@ public interface AsyncFSOutput extends Closeable {
void write(byte[] b);
/**
- * Copy the data into the buffer. Note that you need to call
- * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually.
+ * Copy the data into the buffer. Note that you need to call {@link #flush(boolean)} to flush the
+ * buffer manually.
*/
void write(byte[] b, int off, int len);
@@ -66,11 +67,10 @@ public interface AsyncFSOutput extends Closeable {
/**
* Flush the buffer out.
- * @param attachment will be passed to handler when completed.
- * @param handler will set the acked length as result when completed.
* @param sync persistent the data to device
+ * @return A CompletableFuture that hold the acked length after flushing.
*/
- <A> void flush(A attachment, final CompletionHandler<Long, ? super A> handler, boolean sync);
+ CompletableFuture<Long> flush(boolean sync);
/**
* The close method when error occurred.
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index c9d4e70..7fe86be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -25,7 +25,7 @@ import io.netty.channel.EventLoop;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -80,11 +80,7 @@ public final class AsyncFSOutputHelper {
if (eventLoop.inEventLoop()) {
out.write(b, off, len);
} else {
- eventLoop.submit(new Runnable() {
- public void run() {
- out.write(b, off, len);
- }
- }).syncUninterruptibly();
+ eventLoop.submit(() -> out.write(b, off, len)).syncUninterruptibly();
}
}
@@ -103,15 +99,14 @@ public final class AsyncFSOutputHelper {
return new DatanodeInfo[0];
}
- private <A> void flush0(A attachment, CompletionHandler<Long, ? super A> handler,
- boolean sync) {
+ private void flush0(CompletableFuture<Long> future, boolean sync) {
try {
synchronized (out) {
fsOut.write(out.getBuffer(), 0, out.size());
out.reset();
}
} catch (IOException e) {
- eventLoop.execute(() -> handler.failed(e, attachment));
+ eventLoop.execute(() -> future.completeExceptionally(e));
return;
}
try {
@@ -120,17 +115,18 @@ public final class AsyncFSOutputHelper {
} else {
fsOut.hflush();
}
- final long pos = fsOut.getPos();
- eventLoop.execute(() -> handler.completed(pos, attachment));
- } catch (final IOException e) {
- eventLoop.execute(() -> handler.failed(e, attachment));
+ long pos = fsOut.getPos();
+ eventLoop.execute(() -> future.complete(pos));
+ } catch (IOException e) {
+ eventLoop.execute(() -> future.completeExceptionally(e));
}
}
@Override
- public <A> void flush(A attachment, CompletionHandler<Long, ? super A> handler,
- boolean sync) {
- flushExecutor.execute(() -> flush0(attachment, handler, sync));
+ public CompletableFuture<Long> flush(boolean sync) {
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ flushExecutor.execute(() -> flush0(future, sync));
+ return future;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 916e534..02ffcd5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -26,8 +26,6 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
-import com.google.common.base.Supplier;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@@ -39,14 +37,11 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
@@ -54,13 +49,15 @@ import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
-import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
@@ -87,8 +84,8 @@ import org.apache.hadoop.util.DataChecksum;
* need one thread here. But be careful, we do some blocking operations in {@link #close()} and
* {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside
* {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)},
- * {@link #buffered()} and {@link #flush(Object, CompletionHandler, boolean)}, if you call them
- * outside {@link EventLoop}, there will be an extra context-switch.
+ * {@link #buffered()} and {@link #flush(boolean)}, if you call them outside {@link EventLoop},
+ * there will be an extra context-switch.
* <p>
* Advantages compare to DFSOutputStream:
* <ol>
@@ -125,7 +122,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private final LocatedBlock locatedBlock;
- private final CryptoCodec cryptoCodec;
+ private final Encryptor encryptor;
private final EventLoop eventLoop;
@@ -151,8 +148,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
if (replicas.isEmpty()) {
this.unfinishedReplicas = Collections.emptySet();
} else {
- this.unfinishedReplicas = Collections
- .newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size()));
+ this.unfinishedReplicas =
+ Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size()));
this.unfinishedReplicas.addAll(replicas);
}
}
@@ -215,13 +212,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// disable further write, and fail all pending ack.
state = State.BROKEN;
Throwable error = errorSupplier.get();
- for (Callback c : waitingAckQueue) {
- c.promise.tryFailure(error);
- }
+ waitingAckQueue.stream().forEach(c -> c.promise.tryFailure(error));
waitingAckQueue.clear();
- for (Channel ch : datanodeList) {
- ch.close();
- }
+ datanodeList.forEach(ch -> ch.close());
}
@Sharable
@@ -234,29 +227,16 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
}
@Override
- protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack)
- throws Exception {
- final Status reply = getStatus(ack);
+ protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
+ Status reply = getStatus(ack);
if (reply != Status.SUCCESS) {
- failed(ctx.channel(), new Supplier<Throwable>() {
-
- @Override
- public Throwable get() {
- return new IOException("Bad response " + reply + " for block " + locatedBlock.getBlock()
- + " from datanode " + ctx.channel().remoteAddress());
- }
- });
+ failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block "
+ + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
return;
}
if (PipelineAck.isRestartOOBStatus(reply)) {
- failed(ctx.channel(), new Supplier<Throwable>() {
-
- @Override
- public Throwable get() {
- return new IOException("Restart response " + reply + " for block "
- + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
- }
- });
+ failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block "
+ + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
return;
}
if (ack.getSeqno() == HEART_BEAT_SEQNO) {
@@ -266,25 +246,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
}
@Override
- public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
- failed(ctx.channel(), new Supplier<Throwable>() {
-
- @Override
- public Throwable get() {
- return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed");
- }
- });
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ failed(ctx.channel(),
+ () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
- failed(ctx.channel(), new Supplier<Throwable>() {
-
- @Override
- public Throwable get() {
- return cause;
- }
- });
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ failed(ctx.channel(), () -> cause);
}
@Override
@@ -292,13 +261,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == READER_IDLE) {
- failed(ctx.channel(), new Supplier<Throwable>() {
-
- @Override
- public Throwable get() {
- return new IOException("Timeout(" + timeoutMs + "ms) waiting for response");
- }
- });
+ failed(ctx.channel(),
+ () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
} else if (e.state() == WRITER_IDLE) {
PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
int len = heartbeat.getSerializedSize();
@@ -326,7 +290,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
- LocatedBlock locatedBlock, CryptoCodec cryptoCodec, EventLoop eventLoop,
+ LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop,
List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) {
this.conf = conf;
this.fsUtils = fsUtils;
@@ -337,7 +301,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
this.clientName = clientName;
this.src = src;
this.locatedBlock = locatedBlock;
- this.cryptoCodec = cryptoCodec;
+ this.encryptor = encryptor;
this.eventLoop = eventLoop;
this.datanodeList = datanodeList;
this.summer = summer;
@@ -350,14 +314,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private void writeInt0(int i) {
buf.ensureWritable(4);
- if (cryptoCodec == null) {
- buf.writeInt(i);
- } else {
- ByteBuffer inBuffer = ByteBuffer.allocate(4);
- inBuffer.putInt(0, i);
- cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), 4));
- buf.writerIndex(buf.writerIndex() + 4);
- }
+ buf.writeInt(i);
}
@Override
@@ -370,14 +327,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
}
private void write0(ByteBuffer bb) {
- int len = bb.remaining();
- buf.ensureWritable(len);
- if (cryptoCodec == null) {
- buf.writeBytes(bb);
- } else {
- cryptoCodec.encrypt(bb, buf.nioBuffer(buf.writerIndex(), len));
- buf.writerIndex(buf.writerIndex() + len);
- }
+ buf.ensureWritable(bb.remaining());
+ buf.writeBytes(bb);
}
@Override
@@ -394,19 +345,13 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
write(b, 0, b.length);
}
- private void write0(byte[] b, final int off, final int len) {
+ private void write0(byte[] b, int off, int len) {
buf.ensureWritable(len);
- if (cryptoCodec == null) {
- buf.writeBytes(b, off, len);
- } else {
- ByteBuffer inBuffer = ByteBuffer.wrap(b, off, len);
- cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), len));
- buf.writerIndex(buf.writerIndex() + len);
- }
+ buf.writeBytes(b, off, len);
}
@Override
- public void write(final byte[] b, final int off, final int len) {
+ public void write(byte[] b, int off, int len) {
if (eventLoop.inEventLoop()) {
write0(b, off, len);
} else {
@@ -464,27 +409,40 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
return promise;
}
- private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler,
- boolean syncBlock) {
+ private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
if (state != State.STREAMING) {
- handler.failed(new IOException("stream already broken"), attachment);
+ future.completeExceptionally(new IOException("stream already broken"));
return;
}
int dataLen = buf.readableBytes();
- final long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
+ if (encryptor != null) {
+ ByteBuf encryptBuf = alloc.directBuffer(dataLen);
+ try {
+ encryptor.encrypt(buf.nioBuffer(buf.readerIndex(), dataLen),
+ encryptBuf.nioBuffer(0, dataLen));
+ } catch (IOException e) {
+ encryptBuf.release();
+ future.completeExceptionally(e);
+ return;
+ }
+ encryptBuf.writerIndex(dataLen);
+ buf.release();
+ buf = encryptBuf;
+ }
+ long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) {
// no new data, just return
- handler.completed(locatedBlock.getBlock().getNumBytes(), attachment);
+ future.complete(locatedBlock.getBlock().getNumBytes());
return;
}
Callback c = waitingAckQueue.peekLast();
if (c != null && lengthAfterFlush == c.ackedLength) {
// just append it to the tail of waiting ack queue,, do not issue new hflush request.
- waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(future -> {
- if (future.isSuccess()) {
- handler.completed(lengthAfterFlush, attachment);
+ waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(f -> {
+ if (f.isSuccess()) {
+ future.complete(lengthAfterFlush);
} else {
- handler.failed(future.cause(), attachment);
+ future.completeExceptionally(f.cause());
}
}), lengthAfterFlush, Collections.<Channel> emptyList()));
return;
@@ -506,11 +464,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
} else {
promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock);
}
- promise.addListener(future -> {
- if (future.isSuccess()) {
- handler.completed(lengthAfterFlush, attachment);
+ promise.addListener(f -> {
+ if (f.isSuccess()) {
+ future.complete(lengthAfterFlush);
} else {
- handler.failed(future.cause(), attachment);
+ future.completeExceptionally(f.cause());
}
});
int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
@@ -525,23 +483,17 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
/**
* Flush the buffer out to datanodes.
- * @param attachment will be passed to handler when completed.
- * @param handler will set the acked length as result when completed.
* @param syncBlock will call hsync if true, otherwise hflush.
+ * @return A CompletableFuture that hold the acked length after flushing.
*/
- public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler,
- final boolean syncBlock) {
+ public CompletableFuture<Long> flush(boolean syncBlock) {
+ CompletableFuture<Long> future = new CompletableFuture<Long>();
if (eventLoop.inEventLoop()) {
- flush0(attachment, handler, syncBlock);
+ flush0(future, syncBlock);
} else {
- eventLoop.execute(new Runnable() {
-
- @Override
- public void run() {
- flush0(attachment, handler, syncBlock);
- }
- });
+ eventLoop.execute(() -> flush0(future, syncBlock));
}
+ return future;
}
private void endBlock(Promise<Void> promise, long size) {
@@ -558,13 +510,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
buf.release();
buf = null;
int headerLen = header.getSerializedSize();
- ByteBuf headerBuf = alloc.buffer(headerLen);
+ ByteBuf headerBuf = alloc.directBuffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen);
waitingAckQueue.add(new Callback(promise, size, datanodeList));
- for (Channel ch : datanodeList) {
- ch.writeAndFlush(headerBuf.duplicate().retain());
- }
+ datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.duplicate().retain()));
headerBuf.release();
}
@@ -574,10 +524,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
assert !eventLoop.inEventLoop();
- for (Channel ch : datanodeList) {
- ch.closeFuture().awaitUninterruptibly();
- }
- endFileLease(client, src, fileId);
+ datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
+ endFileLease(client, fileId);
fsUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}
@@ -589,26 +537,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
@Override
public void close() throws IOException {
assert !eventLoop.inEventLoop();
- final Promise<Void> promise = eventLoop.newPromise();
- eventLoop.execute(new Runnable() {
-
- @Override
- public void run() {
- endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes());
- }
- });
- promise.addListener(new FutureListener<Void>() {
-
- @Override
- public void operationComplete(Future<Void> future) throws Exception {
- for (Channel ch : datanodeList) {
- ch.close();
- }
- }
- }).syncUninterruptibly();
- for (Channel ch : datanodeList) {
- ch.closeFuture().awaitUninterruptibly();
- }
+ Promise<Void> promise = eventLoop.newPromise();
+ eventLoop.execute(() -> endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes()));
+ promise.addListener(f -> datanodeList.forEach(ch -> ch.close())).syncUninterruptibly();
+ datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/45a25942/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 51c48ce..875ff77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -21,7 +21,7 @@ import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
-import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createCryptoCodec;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
@@ -66,6 +66,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
@@ -74,7 +76,6 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
@@ -96,7 +97,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
@@ -143,26 +143,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
- // StorageType enum is added in hadoop 2.4, but it is moved to another package in hadoop 2.6 and
- // the setter method in OpWriteBlockProto is also added in hadoop 2.6. So we need to skip the
- // setStorageType call if it is hadoop 2.5 or before. See createStorageTypeSetter for more
- // details.
+ // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
+ // we need to use reflection to set it.See createStorageTypeSetter for more details.
private interface StorageTypeSetter {
OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
}
private static final StorageTypeSetter STORAGE_TYPE_SETTER;
- // helper class for calling create method on namenode. There is a supportedVersions parameter for
- // hadoop 2.6 or after. See createFileCreater for more details.
- private interface FileCreater {
- HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
- String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
- short replication, long blockSize) throws IOException;
- }
-
- private static final FileCreater FILE_CREATER;
-
// helper class for calling add block method on namenode. There is a addBlockFlags parameter for
// hadoop 2.8 or later. See createBlockAdder for more details.
private interface BlockAdder {
@@ -174,13 +162,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final BlockAdder BLOCK_ADDER;
- // helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and
- // hadoop 2.5 or after use inodeId. See createLeaseManager for more details.
private interface LeaseManager {
- void begin(DFSClient client, String src, long inodeId);
+ void begin(DFSClient client, long inodeId);
- void end(DFSClient client, String src, long inodeId);
+ void end(DFSClient client, long inodeId);
}
private static final LeaseManager LEASE_MANAGER;
@@ -197,7 +183,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
// helper class for convert protos.
private interface PBHelper {
- ExtendedBlockProto convert(final ExtendedBlock b);
+ ExtendedBlockProto convert(ExtendedBlock b);
TokenProto convert(Token<?> tok);
}
@@ -212,7 +198,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final ChecksumCreater CHECKSUM_CREATER;
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
- final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+ Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
isClientRunningMethod.setAccessible(true);
return new DFSClientAdaptor() {
@@ -227,16 +213,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
};
}
- private static LeaseManager createLeaseManager25() throws NoSuchMethodException {
- final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
- long.class, DFSOutputStream.class);
+ private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+ Method beginFileLeaseMethod =
+ DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
beginFileLeaseMethod.setAccessible(true);
- final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+ Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
endFileLeaseMethod.setAccessible(true);
return new LeaseManager() {
@Override
- public void begin(DFSClient client, String src, long inodeId) {
+ public void begin(DFSClient client, long inodeId) {
try {
beginFileLeaseMethod.invoke(client, inodeId, null);
} catch (IllegalAccessException | InvocationTargetException e) {
@@ -245,7 +231,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
@Override
- public void end(DFSClient client, String src, long inodeId) {
+ public void end(DFSClient client, long inodeId) {
try {
endFileLeaseMethod.invoke(client, inodeId);
} catch (IllegalAccessException | InvocationTargetException e) {
@@ -255,66 +241,28 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
};
}
- private static LeaseManager createLeaseManager24() throws NoSuchMethodException {
- final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
- String.class, DFSOutputStream.class);
- beginFileLeaseMethod.setAccessible(true);
- final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
- String.class);
- endFileLeaseMethod.setAccessible(true);
- return new LeaseManager() {
-
- @Override
- public void begin(DFSClient client, String src, long inodeId) {
- try {
- beginFileLeaseMethod.invoke(client, src, null);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void end(DFSClient client, String src, long inodeId) {
- try {
- endFileLeaseMethod.invoke(client, src);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private static LeaseManager createLeaseManager() throws NoSuchMethodException {
- try {
- return createLeaseManager25();
- } catch (NoSuchMethodException e) {
- LOG.debug("No inodeId related lease methods found, should be hadoop 2.4-", e);
- }
- return createLeaseManager24();
- }
-
private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
throws NoSuchMethodException {
- final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+ Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
@SuppressWarnings("rawtypes")
Class<? extends Enum> ecnClass;
try {
ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
.asSubclass(Enum.class);
} catch (ClassNotFoundException e) {
- final String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
+ String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
throw new Error(msg, e);
}
@SuppressWarnings("unchecked")
- final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
- final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
- final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass,
- Status.class);
- final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader",
- int.class);
+ Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+ Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+ Method combineHeaderMethod =
+ PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
+ Method getStatusFromHeaderMethod =
+ PipelineAck.class.getMethod("getStatusFromHeader", int.class);
return new PipelineAckStatusGetter() {
@Override
@@ -339,7 +287,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
throws NoSuchMethodException {
- final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+ Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
return new PipelineAckStatusGetter() {
@Override
@@ -363,30 +311,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return createPipelineAckStatusGetter26();
}
- private static StorageTypeSetter createStorageTypeSetter() {
- final Method setStorageTypeMethod;
- try {
- setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType",
- StorageTypeProto.class);
- } catch (NoSuchMethodException e) {
- LOG.debug("noSetStorageType method found, should be hadoop 2.5-", e);
- return new StorageTypeSetter() {
-
- @Override
- public Builder set(Builder builder, Enum<?> storageType) {
- return builder;
- }
- };
- }
+ private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
+ Method setStorageTypeMethod =
+ OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
builder.put(storageTypeProto.name(), storageTypeProto);
}
- final ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
+ ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
return new StorageTypeSetter() {
@Override
- public Builder set(Builder builder, Enum<?> storageType) {
+ public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
Object protoEnum = name2ProtoEnum.get(storageType.name());
try {
setStorageTypeMethod.invoke(builder, protoEnum);
@@ -398,62 +334,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
};
}
- private static FileCreater createFileCreater() throws ClassNotFoundException,
- NoSuchMethodException, IllegalAccessException, InvocationTargetException {
- for (Method method : ClientProtocol.class.getMethods()) {
- if (method.getName().equals("create")) {
- final Method createMethod = method;
- Class<?>[] paramTypes = createMethod.getParameterTypes();
- if (paramTypes[paramTypes.length - 1] == long.class) {
- return new FileCreater() {
-
- @Override
- public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
- String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
- short replication, long blockSize) throws IOException {
- try {
- return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
- createParent, replication, blockSize);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
- throw new RuntimeException(e);
- }
- }
- };
- } else {
- Class<?> cryptoProtocolVersionClass = Class
- .forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
- Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
- final Object supported = supportedMethod.invoke(null);
- return new FileCreater() {
-
- @Override
- public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
- String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
- short replication, long blockSize) throws IOException {
- try {
- return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
- createParent, replication, blockSize, supported);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
- throw new RuntimeException(e);
- }
- }
- };
- }
- }
- }
- throw new NoSuchMethodException("Can not find create method in ClientProtocol");
- }
-
private static BlockAdder createBlockAdder() throws NoSuchMethodException {
for (Method method : ClientProtocol.class.getMethods()) {
if (method.getName().equals("addBlock")) {
- final Method addBlockMethod = method;
+ Method addBlockMethod = method;
Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
if (paramTypes[paramTypes.length - 1] == String[].class) {
return new BlockAdder() {
@@ -505,8 +389,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
}
- final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
- final Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+ Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+ Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
return new PBHelper() {
@Override
@@ -533,7 +417,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
throws NoSuchMethodException {
for (Method method : confClass.getMethods()) {
if (method.getName().equals("createChecksum")) {
- final Method createChecksumMethod = method;
+ Method createChecksumMethod = method;
return new ChecksumCreater() {
@Override
@@ -552,7 +436,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static ChecksumCreater createChecksumCreater27(Class<?> confClass)
throws NoSuchMethodException {
- final Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+ Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
createChecksumMethod.setAccessible(true);
return new ChecksumCreater() {
@@ -597,14 +481,13 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
try {
PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
STORAGE_TYPE_SETTER = createStorageTypeSetter();
- FILE_CREATER = createFileCreater();
BLOCK_ADDER = createBlockAdder();
LEASE_MANAGER = createLeaseManager();
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
PB_HELPER = createPBHelper();
CHECKSUM_CREATER = createChecksumCreater();
} catch (Exception e) {
- final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ "HBASE-16110 for more information.";
LOG.error(msg, e);
@@ -612,12 +495,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
}
- static void beginFileLease(DFSClient client, String src, long inodeId) {
- LEASE_MANAGER.begin(client, src, inodeId);
+ static void beginFileLease(DFSClient client, long inodeId) {
+ LEASE_MANAGER.begin(client, inodeId);
}
- static void endFileLease(DFSClient client, String src, long inodeId) {
- LEASE_MANAGER.end(client, src, inodeId);
+ static void endFileLease(DFSClient client, long inodeId) {
+ LEASE_MANAGER.end(client, inodeId);
}
static DataChecksum createChecksum(DFSClient client) {
@@ -628,8 +511,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return PIPELINE_ACK_STATUS_GETTER.get(ack);
}
- private static void processWriteBlockResponse(Channel channel, final DatanodeInfo dnInfo,
- final Promise<Channel> promise, final int timeoutMs) {
+ private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
+ Promise<Channel> promise, int timeoutMs) {
channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
new ProtobufVarint32FrameDecoder(),
new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
@@ -693,18 +576,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
int protoLen = proto.getSerializedSize();
- ByteBuf buffer = channel.alloc()
- .buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+ ByteBuf buffer =
+ channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
buffer.writeByte(Op.WRITE_BLOCK.code);
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
channel.writeAndFlush(buffer);
}
- private static void initialize(Configuration conf, final Channel channel,
- final DatanodeInfo dnInfo, final Enum<?> storageType,
- final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client,
- Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
+ private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+ Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+ DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
+ throws IOException {
Promise<Void> saslPromise = channel.eventLoop().newPromise();
trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
saslPromise.addListener(new FutureListener<Void>() {
@@ -722,14 +605,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
});
}
- private static List<Future<Channel>> connectToDataNodes(final Configuration conf,
- final DFSClient client, String clientName, final LocatedBlock locatedBlock, long maxBytesRcvd,
- long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
+ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
+ String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
+ BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
- boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
- DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
- final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
+ boolean connectToDnViaHostname =
+ conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
blockCopy.setNumBytes(locatedBlock.getBlockSize());
ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
@@ -737,7 +620,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
.setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
.setClientName(clientName).build();
ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
- final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+ OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
.setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
.setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
.setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
@@ -745,11 +628,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
.setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
for (int i = 0; i < datanodeInfos.length; i++) {
- final DatanodeInfo dnInfo = datanodeInfos[i];
- // Use Enum here because StoregType is moved to another package in hadoop 2.6. Use StorageType
- // will cause compilation error for hadoop 2.5 or before.
- final Enum<?> storageType = storageTypes[i];
- final Promise<Channel> promise = eventLoop.newPromise();
+ DatanodeInfo dnInfo = datanodeInfos[i];
+ Enum<?> storageType = storageTypes[i];
+ Promise<Channel> promise = eventLoop.newPromise();
futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
new Bootstrap().group(eventLoop).channel(NioSocketChannel.class)
@@ -799,11 +680,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
ClientProtocol namenode = client.getNamenode();
HdfsFileStatus stat;
try {
- stat = FILE_CREATER.create(namenode, src,
+ stat = namenode.create(src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<CreateFlag>(
overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
- createParent, replication, blockSize);
+ createParent, replication, blockSize, CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
@@ -811,7 +692,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
throw new NameNodeException(e);
}
}
- beginFileLease(client, src, stat.getFileId());
+ beginFileLease(client, stat.getFileId());
boolean succ = false;
LocatedBlock locatedBlock = null;
List<Future<Channel>> futureList = null;
@@ -827,10 +708,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
// layer should retry itself if needed.
datanodeList.add(future.syncUninterruptibly().getNow());
}
- CryptoCodec cryptocodec = createCryptoCodec(conf, stat, client);
- FanOutOneBlockAsyncDFSOutput output = new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs,
- client, namenode, clientName, src, stat.getFileId(), locatedBlock, cryptocodec, eventLoop,
- datanodeList, summer, ALLOC);
+ Encryptor encryptor = createEncryptor(conf, stat, client);
+ FanOutOneBlockAsyncDFSOutput output =
+ new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
+ stat.getFileId(), locatedBlock, encryptor, eventLoop, datanodeList, summer, ALLOC);
succ = true;
return output;
} finally {
@@ -848,7 +729,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
});
}
}
- endFileLease(client, src, stat.getFileId());
+ endFileLease(client, stat.getFileId());
fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
}
}
@@ -859,9 +740,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
* inside {@link EventLoop}.
* @param eventLoop all connections to datanode will use the same event loop.
*/
- public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem dfs, Path f,
- final boolean overwrite, final boolean createParent, final short replication,
- final long blockSize, final EventLoop eventLoop) throws IOException {
+ public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
+ boolean overwrite, boolean createParent, short replication, long blockSize,
+ EventLoop eventLoop) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override
@@ -890,7 +771,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
for (int retry = 0;; retry++) {
try {
if (namenode.complete(src, clientName, block, fileId)) {
- endFileLease(client, src, fileId);
+ endFileLease(client, fileId);
return;
} else {
LOG.warn("complete file " + src + " not finished, retry = " + retry);