You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by md...@apache.org on 2018/07/30 14:17:12 UTC
hbase git commit: HBASE-19369 Switch to Builder Pattern In WAL
Repository: hbase
Updated Branches:
refs/heads/branch-2.0 e7eadd61d -> 7b07a8bca
HBASE-19369 Switch to Builder Pattern In WAL
This patch switches to the builder pattern by adding a helper method.
It also checks to ensure that the pattern is available (i.e. that
HBase is running on a hadoop version that supports it).
Amending-Author: Mike Drob <md...@apache.org>
Signed-off-by: tedyu <yu...@gmail.com>
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7b07a8bc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7b07a8bc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7b07a8bc
Branch: refs/heads/branch-2.0
Commit: 7b07a8bca884dbcb8f4709158789658f73123b97
Parents: e7eadd6
Author: Alex Leblang <al...@cloudera.com>
Authored: Thu Jul 26 15:47:13 2018 -0500
Committer: Mike Drob <md...@apache.org>
Committed: Mon Jul 30 09:16:36 2018 -0500
----------------------------------------------------------------------
.../apache/hadoop/hbase/util/CommonFSUtils.java | 127 +++++++++++++++++++
.../procedure2/store/wal/WALProcedureStore.java | 2 +-
.../hbase/io/asyncfs/AsyncFSOutputHelper.java | 1 +
.../regionserver/wal/ProtobufLogWriter.java | 12 +-
.../regionserver/wal/TestHBaseWalOnEC.java | 119 +++++++++++++++++
5 files changed, 256 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
index 76a3601..a34048a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
@@ -817,6 +817,133 @@ public abstract class CommonFSUtils {
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
}
+ private static class DfsBuilderUtility {
+ static Class<?> dfsClass = null;
+ static Method createMethod;
+ static Method overwriteMethod;
+ static Method bufferSizeMethod;
+ static Method blockSizeMethod;
+ static Method recursiveMethod;
+ static Method replicateMethod;
+ static Method replicationMethod;
+ static Method buildMethod;
+ static boolean allMethodsPresent = false;
+
+ static {
+ String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
+ String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
+ Class<?> builderClass = null;
+
+ try {
+ dfsClass = Class.forName(dfsName);
+ } catch (ClassNotFoundException e) {
+ LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
+ }
+ try {
+ builderClass = Class.forName(builderName);
+ } catch (ClassNotFoundException e) {
+ LOG.debug("{} not available, will not use builder API for file creation.", builderName);
+ }
+
+ if (dfsClass != null && builderClass != null) {
+ try {
+ createMethod = dfsClass.getMethod("createFile", Path.class);
+ overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
+ bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
+ blockSizeMethod = builderClass.getMethod("blockSize", long.class);
+ recursiveMethod = builderClass.getMethod("recursive");
+ replicateMethod = builderClass.getMethod("replicate");
+ replicationMethod = builderClass.getMethod("replication", short.class);
+ buildMethod = builderClass.getMethod("build");
+
+ allMethodsPresent = true;
+ LOG.debug("Using builder API via reflection for DFS file creation.");
+ } catch (NoSuchMethodException e) {
+ LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
+ e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Attempt to use builder API via reflection to create a file with the given parameters and
+ * replication enabled.
+ */
+ static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
+ int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
+ if (allMethodsPresent && dfsClass.isInstance(fs)) {
+ try {
+ Object builder;
+
+ builder = createMethod.invoke(fs, path);
+ builder = overwriteMethod.invoke(builder, overwritable);
+ builder = bufferSizeMethod.invoke(builder, bufferSize);
+ builder = blockSizeMethod.invoke(builder, blockSize);
+ if (isRecursive) {
+ builder = recursiveMethod.invoke(builder);
+ }
+ builder = replicateMethod.invoke(builder);
+ builder = replicationMethod.invoke(builder, replication);
+ return (FSDataOutputStream) buildMethod.invoke(builder);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ // Should have caught this failure during initialization, so log full trace here
+ LOG.warn("Couldn't use reflection with builder API", e);
+ }
+ }
+
+ if (isRecursive) {
+ return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
+ }
+ return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
+ }
+
+ /**
+ * Attempt to use builder API via reflection to create a file with the given parameters and
+ * replication enabled.
+ */
+ static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
+ throws IOException {
+ if (allMethodsPresent && dfsClass.isInstance(fs)) {
+ try {
+ Object builder;
+
+ builder = createMethod.invoke(fs, path);
+ builder = overwriteMethod.invoke(builder, overwritable);
+ builder = replicateMethod.invoke(builder);
+ return (FSDataOutputStream) buildMethod.invoke(builder);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ // Should have caught this failure during initialization, so log full trace here
+ LOG.warn("Couldn't use reflection with builder API", e);
+ }
+ }
+
+ return fs.create(path, overwritable);
+ }
+ }
+
+ /**
+ * Attempt to use builder API via reflection to create a file with the given parameters and
+ * replication enabled.
+ * <p>
+ * Will not attempt to enable replication when passed an HFileSystem.
+ */
+ public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
+ throws IOException {
+ return DfsBuilderUtility.createHelper(fs, path, overwritable);
+ }
+
+ /**
+ * Attempt to use builder API via reflection to create a file with the given parameters and
+ * replication enabled.
+ * <p>
+ * Will not attempt to enable replication when passed an HFileSystem.
+ */
+ public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
+ int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
+ return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
+ blockSize, isRecursive);
+ }
+
// Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
// not until we attempt to reference it.
private static class StreamCapabilities {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 99001f7..cc3eea3 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -1028,7 +1028,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
long startPos = -1;
newLogFile = getLogFilePath(logId);
try {
- newStream = fs.create(newLogFile, false);
+ newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
} catch (FileAlreadyExistsException e) {
LOG.error("Log file with id=" + logId + " already exists", e);
return false;
http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index f9d04b9..d1645f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -54,6 +54,7 @@ public final class AsyncFSOutputHelper {
final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+ // This is not a Distributed File System, so it won't be erasure coded; no builder API needed
if (createParent) {
out = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 9d36429..0ad3087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -61,7 +61,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
public void close() throws IOException {
if (this.output != null) {
try {
- if (!trailerWritten) writeWALTrailer();
+ if (!trailerWritten) {
+ writeWALTrailer();
+ }
this.output.close();
} catch (NullPointerException npe) {
// Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
@@ -74,7 +76,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
@Override
public void sync() throws IOException {
FSDataOutputStream fsdos = this.output;
- if (fsdos == null) return; // Presume closed
+ if (fsdos == null) {
+ return; // Presume closed
+ }
fsdos.flush();
fsdos.hflush();
}
@@ -87,8 +91,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
@Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
- this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize,
- null);
+ this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
+ blockSize, false);
// TODO Be sure to add a check for hsync if this branch includes HBASE-19024
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
!(CommonFSUtils.hasCapability(output, "hflush"))) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
new file mode 100644
index 0000000..a7f1624
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestHBaseWalOnEC {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
+
+ private static final HBaseTestingUtility util = new HBaseTestingUtility();
+
+ private static final String HFLUSH = "hflush";
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ try {
+ MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ Method enableAllECPolicies = DFSTestUtil.class.getMethod("enableAllECPolicies",
+ DistributedFileSystem.class);
+ enableAllECPolicies.invoke(null, fs);
+
+ DFSClient client = fs.getClient();
+ Method setErasureCodingPolicy = DFSClient.class.getMethod("setErasureCodingPolicy",
+ String.class, String.class);
+ setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy
+
+ try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
+ // If this comes back as having hflush then some test setup assumption is wrong.
+ // Fail the test so that a developer has to look and triage
+ assertFalse("Did not enable EC!", CommonFSUtils.hasCapability(out, HFLUSH));
+ }
+ } catch (NoSuchMethodException e) {
+ // We're not testing anything interesting if EC is not available, so skip the rest of the test
+ Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
+ }
+
+ util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
+ util.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testStreamCreate() throws IOException {
+ try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
+ new Path("/testStreamCreate"), true)) {
+ assertTrue(CommonFSUtils.hasCapability(out, HFLUSH));
+ }
+ }
+
+ @Test
+ public void testFlush() throws IOException {
+ byte[] row = Bytes.toBytes("row");
+ byte[] cf = Bytes.toBytes("cf");
+ byte[] cq = Bytes.toBytes("cq");
+ byte[] value = Bytes.toBytes("value");
+
+ TableName name = TableName.valueOf(getClass().getSimpleName());
+
+ Table t = util.createTable(name, cf);
+ t.put(new Put(row).addColumn(cf, cq, value));
+
+ util.getAdmin().flush(name);
+
+ assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq));
+ }
+}
+