You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/07/18 12:01:31 UTC
hbase git commit: HBASE-16110 AsyncFS WAL doesn't work with Hadoop
2.8+
Repository: hbase
Updated Branches:
refs/heads/master a1cc2c4bf -> 515c499f9
HBASE-16110 AsyncFS WAL doesn't work with Hadoop 2.8+
Signed-off-by: Sean Busbey <bu...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/515c499f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/515c499f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/515c499f
Branch: refs/heads/master
Commit: 515c499f951b864035c5772906b2c0750d9a608f
Parents: a1cc2c4
Author: zhangduo <zh...@apache.org>
Authored: Tue Jul 12 11:15:08 2016 +0800
Committer: Sean Busbey <bu...@apache.org>
Committed: Mon Jul 18 06:54:20 2016 -0500
----------------------------------------------------------------------
.../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 4 +-
.../FanOutOneBlockAsyncDFSOutputHelper.java | 565 ++++++++++++-------
.../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 213 +++----
3 files changed, 470 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/515c499f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 8dd7f5e..9aab924 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import static io.netty.handler.timeout.IdleState.WRITER_IDLE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
@@ -71,7 +72,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.util.DataChecksum;
/**
@@ -339,7 +339,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
this.alloc = alloc;
this.buf = alloc.directBuffer();
this.state = State.STREAMING;
- setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT));
+ setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/515c499f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 2e88ff2..51c48ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -99,15 +99,15 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -128,8 +128,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
// copied from DFSPacket since it is package private.
public static final long HEART_BEAT_SEQNO = -1L;
- // helper class for creating DataChecksum object.
- private static final Method CREATE_CHECKSUM;
+ // Timeouts for communicating with DataNode for streaming writes/reads
+ public static final int READ_TIMEOUT = 60 * 1000;
+ public static final int READ_TIMEOUT_EXTENSION = 5 * 1000;
+ public static final int WRITE_TIMEOUT = 8 * 60 * 1000;
// helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
// getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
@@ -161,6 +163,17 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final FileCreater FILE_CREATER;
+ // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
+ // hadoop 2.8 or later. See createBlockAdder for more details.
+ private interface BlockAdder {
+
+ LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
+ throws IOException;
+ }
+
+ private static final BlockAdder BLOCK_ADDER;
+
// helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and
// hadoop 2.5 or after use inodeId. See createLeaseManager for more details.
private interface LeaseManager {
@@ -181,156 +194,182 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
- private static DFSClientAdaptor createDFSClientAdaptor() {
- try {
- final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
- isClientRunningMethod.setAccessible(true);
- return new DFSClientAdaptor() {
+ // helper class for convert protos.
+ private interface PBHelper {
- @Override
- public boolean isClientRunning(DFSClient client) {
- try {
- return (Boolean) isClientRunningMethod.invoke(client);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
+ ExtendedBlockProto convert(final ExtendedBlock b);
+
+ TokenProto convert(Token<?> tok);
+ }
+
+ private static final PBHelper PB_HELPER;
+
+ // helper class for creating data checksum.
+ private interface ChecksumCreater {
+ DataChecksum createChecksum(Object conf);
+ }
+
+ private static final ChecksumCreater CHECKSUM_CREATER;
+
+ private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
+ final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+ isClientRunningMethod.setAccessible(true);
+ return new DFSClientAdaptor() {
+
+ @Override
+ public boolean isClientRunning(DFSClient client) {
+ try {
+ return (Boolean) isClientRunningMethod.invoke(client);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
}
- };
- } catch (NoSuchMethodException e) {
- throw new Error(e);
- }
+ }
+ };
}
- private static LeaseManager createLeaseManager() {
- try {
- final Method beginFileLeaseMethod =
- DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
- beginFileLeaseMethod.setAccessible(true);
- final Method endFileLeaseMethod =
- DFSClient.class.getDeclaredMethod("endFileLease", long.class);
- endFileLeaseMethod.setAccessible(true);
- return new LeaseManager() {
+ private static LeaseManager createLeaseManager25() throws NoSuchMethodException {
+ final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
+ long.class, DFSOutputStream.class);
+ beginFileLeaseMethod.setAccessible(true);
+ final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+ endFileLeaseMethod.setAccessible(true);
+ return new LeaseManager() {
- @Override
- public void begin(DFSClient client, String src, long inodeId) {
- try {
- beginFileLeaseMethod.invoke(client, inodeId, null);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
+ @Override
+ public void begin(DFSClient client, String src, long inodeId) {
+ try {
+ beginFileLeaseMethod.invoke(client, inodeId, null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
}
+ }
- @Override
- public void end(DFSClient client, String src, long inodeId) {
- try {
- endFileLeaseMethod.invoke(client, inodeId);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
+ @Override
+ public void end(DFSClient client, String src, long inodeId) {
+ try {
+ endFileLeaseMethod.invoke(client, inodeId);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
}
- };
- } catch (NoSuchMethodException e) {
- LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e);
- }
- try {
- final Method beginFileLeaseMethod =
- DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
- beginFileLeaseMethod.setAccessible(true);
- final Method endFileLeaseMethod =
- DFSClient.class.getDeclaredMethod("endFileLease", String.class);
- endFileLeaseMethod.setAccessible(true);
- return new LeaseManager() {
+ }
+ };
+ }
- @Override
- public void begin(DFSClient client, String src, long inodeId) {
- try {
- beginFileLeaseMethod.invoke(client, src, null);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
+ private static LeaseManager createLeaseManager24() throws NoSuchMethodException {
+ final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
+ String.class, DFSOutputStream.class);
+ beginFileLeaseMethod.setAccessible(true);
+ final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
+ String.class);
+ endFileLeaseMethod.setAccessible(true);
+ return new LeaseManager() {
+
+ @Override
+ public void begin(DFSClient client, String src, long inodeId) {
+ try {
+ beginFileLeaseMethod.invoke(client, src, null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
}
+ }
- @Override
- public void end(DFSClient client, String src, long inodeId) {
- try {
- endFileLeaseMethod.invoke(client, src);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
+ @Override
+ public void end(DFSClient client, String src, long inodeId) {
+ try {
+ endFileLeaseMethod.invoke(client, src);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
}
- };
+ }
+ };
+ }
+
+ private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+ try {
+ return createLeaseManager25();
} catch (NoSuchMethodException e) {
- throw new Error(e);
+ LOG.debug("No inodeId related lease methods found, should be hadoop 2.4-", e);
}
+ return createLeaseManager24();
}
- private static PipelineAckStatusGetter createPipelineAckStatusGetter() {
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
+ throws NoSuchMethodException {
+ final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+ @SuppressWarnings("rawtypes")
+ Class<? extends Enum> ecnClass;
try {
- final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
- @SuppressWarnings("rawtypes")
- Class<? extends Enum> ecnClass;
- try {
- ecnClass =
- Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
- .asSubclass(Enum.class);
- } catch (ClassNotFoundException e) {
- throw new Error(e);
- }
- @SuppressWarnings("unchecked")
- final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
- final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
- final Method combineHeaderMethod =
- PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
- final Method getStatusFromHeaderMethod =
- PipelineAck.class.getMethod("getStatusFromHeader", int.class);
- return new PipelineAckStatusGetter() {
+ ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
+ .asSubclass(Enum.class);
+ } catch (ClassNotFoundException e) {
+ final String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
+ + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ + "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ @SuppressWarnings("unchecked")
+ final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+ final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+ final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass,
+ Status.class);
+ final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader",
+ int.class);
+ return new PipelineAckStatusGetter() {
- @Override
- public Status get(PipelineAckProto ack) {
- try {
- @SuppressWarnings("unchecked")
- List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
- Integer headerFlag;
- if (flagList.isEmpty()) {
- Status reply = (Status) getReplyMethod.invoke(ack, 0);
- headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
- } else {
- headerFlag = flagList.get(0);
- }
- return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
+ @Override
+ public Status get(PipelineAckProto ack) {
+ try {
+ @SuppressWarnings("unchecked")
+ List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
+ Integer headerFlag;
+ if (flagList.isEmpty()) {
+ Status reply = (Status) getReplyMethod.invoke(ack, 0);
+ headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
+ } else {
+ headerFlag = flagList.get(0);
}
+ return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
}
- };
- } catch (NoSuchMethodException e) {
- LOG.warn("Can not get expected methods, should be hadoop 2.6-", e);
- }
- try {
- final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
- return new PipelineAckStatusGetter() {
+ }
+ };
+ }
- @Override
- public Status get(PipelineAckProto ack) {
- try {
- return (Status) getStatusMethod.invoke(ack, 0);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
+ throws NoSuchMethodException {
+ final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+ return new PipelineAckStatusGetter() {
+
+ @Override
+ public Status get(PipelineAckProto ack) {
+ try {
+ return (Status) getStatusMethod.invoke(ack, 0);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
}
- };
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter()
+ throws NoSuchMethodException {
+ try {
+ return createPipelineAckStatusGetter27();
} catch (NoSuchMethodException e) {
- throw new Error(e);
+ LOG.debug("Can not get expected methods, should be hadoop 2.6-", e);
}
+ return createPipelineAckStatusGetter26();
}
private static StorageTypeSetter createStorageTypeSetter() {
final Method setStorageTypeMethod;
try {
- setStorageTypeMethod =
- OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
+ setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType",
+ StorageTypeProto.class);
} catch (NoSuchMethodException e) {
- LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e);
+ LOG.debug("noSetStorageType method found, should be hadoop 2.5-", e);
return new StorageTypeSetter() {
@Override
@@ -359,7 +398,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
};
}
- private static FileCreater createFileCreater() {
+ private static FileCreater createFileCreater() throws ClassNotFoundException,
+ NoSuchMethodException, IllegalAccessException, InvocationTargetException {
for (Method method : ClientProtocol.class.getMethods()) {
if (method.getName().equals("create")) {
final Method createMethod = method;
@@ -372,8 +412,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
short replication, long blockSize) throws IOException {
try {
- return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
- flag, createParent, replication, blockSize);
+ return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
+ createParent, replication, blockSize);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
@@ -383,36 +423,159 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
};
} else {
- try {
- Class<?> cryptoProtocolVersionClass =
- Class.forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
- Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
- final Object supported = supportedMethod.invoke(null);
- return new FileCreater() {
+ Class<?> cryptoProtocolVersionClass = Class
+ .forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
+ Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
+ final Object supported = supportedMethod.invoke(null);
+ return new FileCreater() {
- @Override
- public HdfsFileStatus create(ClientProtocol namenode, String src,
- FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag,
- boolean createParent, short replication, long blockSize) throws IOException {
- try {
- return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
- flag, createParent, replication, blockSize, supported);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
- throw new RuntimeException(e);
- }
+ @Override
+ public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
+ String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
+ short replication, long blockSize) throws IOException {
+ try {
+ return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
+ createParent, replication, blockSize, supported);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+ }
+ throw new NoSuchMethodException("Can not find create method in ClientProtocol");
+ }
+
+ private static BlockAdder createBlockAdder() throws NoSuchMethodException {
+ for (Method method : ClientProtocol.class.getMethods()) {
+ if (method.getName().equals("addBlock")) {
+ final Method addBlockMethod = method;
+ Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
+ if (paramTypes[paramTypes.length - 1] == String[].class) {
+ return new BlockAdder() {
+
+ @Override
+ public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+ String[] favoredNodes) throws IOException {
+ try {
+ return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+ excludeNodes, fileId, favoredNodes);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ } else {
+ return new BlockAdder() {
+
+ @Override
+ public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+ String[] favoredNodes) throws IOException {
+ try {
+ return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+ excludeNodes, fileId, favoredNodes, null);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e);
}
- };
- } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
- | InvocationTargetException e) {
- throw new Error(e);
+ }
+ };
+ }
+ }
+ }
+ throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
+ }
+
+ private static PBHelper createPBHelper() throws NoSuchMethodException {
+ Class<?> helperClass;
+ try {
+ helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
+ } catch (ClassNotFoundException e) {
+ LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
+ helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+ }
+ final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+ final Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+ return new PBHelper() {
+
+ @Override
+ public ExtendedBlockProto convert(ExtendedBlock b) {
+ try {
+ return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public TokenProto convert(Token<?> tok) {
+ try {
+ return (TokenProto) convertTokenMethod.invoke(null, tok);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static ChecksumCreater createChecksumCreater28(Class<?> confClass)
+ throws NoSuchMethodException {
+ for (Method method : confClass.getMethods()) {
+ if (method.getName().equals("createChecksum")) {
+ final Method createChecksumMethod = method;
+ return new ChecksumCreater() {
+
+ @Override
+ public DataChecksum createChecksum(Object conf) {
+ try {
+ return (DataChecksum) createChecksumMethod.invoke(conf, (Object) null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
}
+ };
+ }
+ }
+ throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
+ }
+
+ private static ChecksumCreater createChecksumCreater27(Class<?> confClass)
+ throws NoSuchMethodException {
+ final Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+ createChecksumMethod.setAccessible(true);
+ return new ChecksumCreater() {
+
+ @Override
+ public DataChecksum createChecksum(Object conf) {
+ try {
+ return (DataChecksum) createChecksumMethod.invoke(conf);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
}
}
+ };
+ }
+
+ private static ChecksumCreater createChecksumCreater()
+ throws NoSuchMethodException, ClassNotFoundException {
+ try {
+ return createChecksumCreater28(
+ Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
+ } catch (ClassNotFoundException e) {
+ LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
}
- throw new Error("No create method found for " + ClientProtocol.class.getName());
+ return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
}
// cancel the processing if DFSClient is already closed.
@@ -432,17 +595,21 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
static {
try {
- CREATE_CHECKSUM = DFSClient.Conf.class.getDeclaredMethod("createChecksum");
- CREATE_CHECKSUM.setAccessible(true);
- } catch (NoSuchMethodException e) {
- throw new Error(e);
+ PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
+ STORAGE_TYPE_SETTER = createStorageTypeSetter();
+ FILE_CREATER = createFileCreater();
+ BLOCK_ADDER = createBlockAdder();
+ LEASE_MANAGER = createLeaseManager();
+ DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
+ PB_HELPER = createPBHelper();
+ CHECKSUM_CREATER = createChecksumCreater();
+ } catch (Exception e) {
+ final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ + "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
}
-
- PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
- STORAGE_TYPE_SETTER = createStorageTypeSetter();
- FILE_CREATER = createFileCreater();
- LEASE_MANAGER = createLeaseManager();
- DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
}
static void beginFileLease(DFSClient client, String src, long inodeId) {
@@ -454,11 +621,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
static DataChecksum createChecksum(DFSClient client) {
- try {
- return (DataChecksum) CREATE_CHECKSUM.invoke(client.getConf());
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
+ return CHECKSUM_CREATER.createChecksum(client.getConf());
}
static Status getStatus(PipelineAckProto ack) {
@@ -530,8 +693,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
int protoLen = proto.getSerializedSize();
- ByteBuf buffer =
- channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+ ByteBuf buffer = channel.alloc()
+ .buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
buffer.writeByte(Op.WRITE_BLOCK.code);
proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
@@ -540,8 +703,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static void initialize(Configuration conf, final Channel channel,
final DatanodeInfo dnInfo, final Enum<?> storageType,
- final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs,
- DFSClient client, Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
+ final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client,
+ Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
Promise<Void> saslPromise = channel.eventLoop().newPromise();
trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
saslPromise.addListener(new FutureListener<Void>() {
@@ -560,32 +723,26 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
private static List<Future<Channel>> connectToDataNodes(final Configuration conf,
- final DFSClient client, String clientName, final LocatedBlock locatedBlock,
- long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer,
- EventLoop eventLoop) {
+ final DFSClient client, String clientName, final LocatedBlock locatedBlock, long maxBytesRcvd,
+ long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
- boolean connectToDnViaHostname =
- conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
- final int timeoutMs =
- conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT);
+ boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
+ DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
blockCopy.setNumBytes(locatedBlock.getBlockSize());
- ClientOperationHeaderProto header =
- ClientOperationHeaderProto
- .newBuilder()
- .setBaseHeader(
- BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy))
- .setToken(PBHelper.convert(locatedBlock.getBlockToken())))
- .setClientName(clientName).build();
+ ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
+ .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
+ .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
+ .setClientName(clientName).build();
ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
- final OpWriteBlockProto.Builder writeBlockProtoBuilder =
- OpWriteBlockProto.newBuilder().setHeader(header)
- .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
- .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
- .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
- .setRequestedChecksum(checksumProto)
- .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
+ final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+ .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
+ .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
+ .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
+ .setRequestedChecksum(checksumProto)
+ .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
for (int i = 0; i < datanodeInfos.length; i++) {
final DatanodeInfo dnInfo = datanodeInfos[i];
@@ -642,14 +799,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
ClientProtocol namenode = client.getNamenode();
HdfsFileStatus stat;
try {
- stat =
- FILE_CREATER.create(
- namenode,
- src,
- FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)),
- clientName,
- new EnumSetWritable<CreateFlag>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet
- .of(CREATE)), createParent, replication, blockSize);
+ stat = FILE_CREATER.create(namenode, src,
+ FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
+ new EnumSetWritable<CreateFlag>(
+ overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+ createParent, replication, blockSize);
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
@@ -663,12 +817,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
List<Future<Channel>> futureList = null;
try {
DataChecksum summer = createChecksum(client);
- locatedBlock =
- namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null);
+ locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, null,
+ stat.getFileId(), null);
List<Channel> datanodeList = new ArrayList<>();
- futureList =
- connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE,
- summer, eventLoop);
+ futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
+ PIPELINE_SETUP_CREATE, summer, eventLoop);
for (Future<Channel> future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed.
@@ -712,8 +865,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override
- public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException,
- UnresolvedLinkException {
+ public FanOutOneBlockAsyncDFSOutput doCall(Path p)
+ throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoop);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/515c499f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index 33e8841..0546253 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -86,7 +86,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.security.SaslPropertiesResolver;
@@ -112,8 +111,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
private static final String NAME_DELIMITER = " ";
@VisibleForTesting
- static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
- "dfs.encrypt.data.transfer.cipher.suites";
+ static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites";
@VisibleForTesting
static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
@@ -185,7 +183,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
try {
cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec");
} catch (ClassNotFoundException e) {
- LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e);
+ LOG.debug("No CryptoCodec class found, should be hadoop 2.5-", e);
}
if (cryptoCodecClass != null) {
Method getInstanceMethod = null;
@@ -195,8 +193,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
break;
}
}
- CREATE_CODEC = getInstanceMethod;
try {
+ if (getInstanceMethod == null) {
+ throw new NoSuchMethodException(
+ "Can not find suitable getInstance method in CryptoCodec");
+ }
+ CREATE_CODEC = getInstanceMethod;
CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor");
CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor");
@@ -207,11 +209,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor");
INIT_DECRYPTOR = decryptorClass.getMethod("init", byte[].class, byte[].class);
DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class);
- } catch (NoSuchMethodException | ClassNotFoundException e) {
- throw new Error(e);
+ } catch (Exception e) {
+ final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ + "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
}
} else {
- LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-");
CREATE_CODEC = null;
CREATE_ENCRYPTOR = null;
CREATE_DECRYPTOR = null;
@@ -329,62 +334,53 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
};
}
- private static SaslAdaptor createSaslAdaptor25() {
- try {
- final Field trustedChannelResolverField = DFSClient.class
- .getDeclaredField("trustedChannelResolver");
- trustedChannelResolverField.setAccessible(true);
- final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
- return new SaslAdaptor() {
-
- @Override
- public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
- try {
- return (TrustedChannelResolver) trustedChannelResolverField.get(client);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
+ private static SaslAdaptor createSaslAdaptor25()
+ throws NoSuchFieldException, NoSuchMethodException {
+ final Field trustedChannelResolverField = DFSClient.class
+ .getDeclaredField("trustedChannelResolver");
+ trustedChannelResolverField.setAccessible(true);
+ final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
+ return new SaslAdaptor() {
- @Override
- public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
- return null;
+ @Override
+ public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
+ try {
+ return (TrustedChannelResolver) trustedChannelResolverField.get(client);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
}
+ }
- @Override
- public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
- return null;
- }
+ @Override
+ public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
+ return null;
+ }
- @Override
- public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
- try {
- return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
- };
- } catch (NoSuchFieldException | NoSuchMethodException e) {
- throw new Error(e);
- }
+ @Override
+ public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
+ return null;
+ }
+ @Override
+ public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
+ try {
+ return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
}
- private static SaslAdaptor createSaslAdaptor() {
- Class<?> saslDataTransferClientClass = null;
+ private static SaslAdaptor createSaslAdaptor()
+ throws NoSuchFieldException, NoSuchMethodException {
try {
- saslDataTransferClientClass = Class
- .forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient");
+ return createSaslAdaptor27(
+ Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"));
} catch (ClassNotFoundException e) {
- LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-");
- }
- try {
- return saslDataTransferClientClass != null ? createSaslAdaptor27(saslDataTransferClientClass)
- : createSaslAdaptor25();
- } catch (NoSuchFieldException | NoSuchMethodException e) {
- throw new Error(e);
+ LOG.debug("No SaslDataTransferClient class found, should be hadoop 2.5-", e);
}
+ return createSaslAdaptor25();
}
private static CipherOptionHelper createCipherHelper25() {
@@ -451,9 +447,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
- final Method convertCipherOptionsMethod = PBHelper.class.getMethod("convertCipherOptions",
+ Class<?> pbHelperClass;
+ try {
+ pbHelperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
+ } catch (ClassNotFoundException e) {
+ LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
+ pbHelperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+ }
+ final Method convertCipherOptionsMethod = pbHelperClass.getMethod("convertCipherOptions",
List.class);
- final Method convertCipherOptionProtosMethod = PBHelper.class
+ final Method convertCipherOptionProtosMethod = pbHelperClass
.getMethod("convertCipherOptionProtos", List.class);
final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class
.getMethod("addAllCipherOption", Iterable.class);
@@ -577,19 +580,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
};
}
- private static CipherOptionHelper createCipherHelper() {
+ private static CipherOptionHelper createCipherHelper()
+ throws ClassNotFoundException, NoSuchMethodException {
Class<?> cipherOptionClass;
try {
cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption");
} catch (ClassNotFoundException e) {
- LOG.warn("No CipherOption class found, should be hadoop 2.5-");
+ LOG.debug("No CipherOption class found, should be hadoop 2.5-", e);
return createCipherHelper25();
}
- try {
- return createCipherHelper27(cipherOptionClass);
- } catch (NoSuchMethodException | ClassNotFoundException e) {
- throw new Error(e);
- }
+ return createCipherHelper27(cipherOptionClass);
}
private static TransparentCryptoHelper createTransparentCryptoHelper25() {
@@ -646,25 +646,30 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
};
}
- private static TransparentCryptoHelper createTransparentCryptoHelper() {
+ private static TransparentCryptoHelper createTransparentCryptoHelper()
+ throws NoSuchMethodException, ClassNotFoundException {
Class<?> feInfoClass;
try {
feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo");
} catch (ClassNotFoundException e) {
- LOG.warn("No FileEncryptionInfo class found, should be hadoop 2.5-");
+ LOG.debug("No FileEncryptionInfo class found, should be hadoop 2.5-", e);
return createTransparentCryptoHelper25();
}
- try {
- return createTransparentCryptoHelper27(feInfoClass);
- } catch (NoSuchMethodException | ClassNotFoundException e) {
- throw new Error(e);
- }
+ return createTransparentCryptoHelper27(feInfoClass);
}
static {
- SASL_ADAPTOR = createSaslAdaptor();
- CIPHER_OPTION_HELPER = createCipherHelper();
- TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+ try {
+ SASL_ADAPTOR = createSaslAdaptor();
+ CIPHER_OPTION_HELPER = createCipherHelper();
+ TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+ } catch (Exception e) {
+ final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ + "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
}
/**
@@ -828,40 +833,40 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
byte[] challenge = proto.getPayload().toByteArray();
byte[] response = saslClient.evaluateChallenge(challenge);
switch (step) {
- case 1: {
- List<Object> cipherOptions = null;
- if (requestedQopContainsPrivacy()) {
- cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
- }
- sendSaslMessage(ctx, response, cipherOptions);
- ctx.flush();
- step++;
- break;
+ case 1: {
+ List<Object> cipherOptions = null;
+ if (requestedQopContainsPrivacy()) {
+ cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
}
- case 2: {
- assert response == null;
- checkSaslComplete();
- Object cipherOption =
- CIPHER_OPTION_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
- ChannelPipeline p = ctx.pipeline();
- while (p.first() != null) {
- p.removeFirst();
- }
- if (cipherOption != null) {
- CryptoCodec codec = new CryptoCodec(conf, cipherOption);
- p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
- } else {
- if (useWrap()) {
- p.addLast(new SaslWrapHandler(saslClient),
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
- new SaslUnwrapHandler(saslClient));
- }
+ sendSaslMessage(ctx, response, cipherOptions);
+ ctx.flush();
+ step++;
+ break;
+ }
+ case 2: {
+ assert response == null;
+ checkSaslComplete();
+ Object cipherOption = CIPHER_OPTION_HELPER.getCipherOption(proto,
+ isNegotiatedQopPrivacy(), saslClient);
+ ChannelPipeline p = ctx.pipeline();
+ while (p.first() != null) {
+ p.removeFirst();
+ }
+ if (cipherOption != null) {
+ CryptoCodec codec = new CryptoCodec(conf, cipherOption);
+ p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
+ } else {
+ if (useWrap()) {
+ p.addLast(new SaslWrapHandler(saslClient),
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+ new SaslUnwrapHandler(saslClient));
}
- promise.trySuccess(null);
- break;
}
- default:
- throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+ promise.trySuccess(null);
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
}
} else {
ctx.fireChannelRead(msg);