You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/07/12 05:47:09 UTC
hbase git commit: HBASE-18177 FanOutOneBlockAsyncDFSOutputHelper
fails to compile against Hadoop 3
Repository: hbase
Updated Branches:
refs/heads/master 22dce22e0 -> cb5299ae9
HBASE-18177 FanOutOneBlockAsyncDFSOutputHelper fails to compile against Hadoop 3
Because ClientProtocol::create has API changes between Hadoop 2/3
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb5299ae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb5299ae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb5299ae
Branch: refs/heads/master
Commit: cb5299ae9b3360a6cca93958a74417d663135a60
Parents: 22dce22
Author: Mike Drob <md...@apache.org>
Authored: Mon Jun 26 11:29:34 2017 -0500
Committer: zhangduo <zh...@apache.org>
Committed: Wed Jul 12 13:40:05 2017 +0800
----------------------------------------------------------------------
.../FanOutOneBlockAsyncDFSOutputHelper.java | 63 +++++++++++++++++++-
1 file changed, 62 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb5299ae/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index d14d4d8..1dbe131 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -67,6 +67,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path;
@@ -195,6 +196,32 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static final ChecksumCreater CHECKSUM_CREATER;
+ // helper class for creating files.
+ private interface FileCreator {
+ default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
+ String clientName, EnumSetWritable<CreateFlag> flag,
+ boolean createParent, short replication, long blockSize,
+ CryptoProtocolVersion[] supportedVersions) throws Exception {
+ try {
+ return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
+ replication, blockSize, supportedVersions);
+ } catch (InvocationTargetException e) {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception) e.getCause();
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ };
+
+ Object createObject(ClientProtocol instance, String src, FsPermission masked,
+ String clientName, EnumSetWritable<CreateFlag> flag,
+ boolean createParent, short replication, long blockSize,
+ CryptoProtocolVersion[] supportedVersions) throws Exception;
+ }
+
+ private static final FileCreator FILE_CREATOR;
+
private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
isClientRunningMethod.setAccessible(true);
@@ -460,6 +487,39 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
}
+ private static FileCreator createFileCreator3() throws NoSuchMethodException {
+ Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+ String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class,
+ String.class);
+
+ return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+ supportedVersions) -> {
+ return (HdfsFileStatus) createMethod.invoke(instance,
+ src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions,
+ null);
+ };
+ }
+
+ private static FileCreator createFileCreator2() throws NoSuchMethodException {
+ Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+ String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class);
+
+ return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+ supportedVersions) -> {
+ return (HdfsFileStatus) createMethod.invoke(instance,
+ src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions);
+ };
+ }
+
+ private static FileCreator createFileCreator() throws NoSuchMethodException {
+ try {
+ return createFileCreator3();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
+ }
+ return createFileCreator2();
+ }
+
// cancel the processing if DFSClient is already closed.
static final class CancelOnClose implements CancelableProgressable {
@@ -484,6 +544,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
PB_HELPER = createPBHelper();
CHECKSUM_CREATER = createChecksumCreater();
+ FILE_CREATOR = createFileCreator();
} catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
@@ -679,7 +740,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
ClientProtocol namenode = client.getNamenode();
HdfsFileStatus stat;
try {
- stat = namenode.create(src,
+ stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
createParent, replication, blockSize, CryptoProtocolVersion.supported());