You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by md...@apache.org on 2018/07/30 14:17:12 UTC

hbase git commit: HBASE-19369 Switch to Builder Pattern In WAL

Repository: hbase
Updated Branches:
  refs/heads/branch-2.0 e7eadd61d -> 7b07a8bca


HBASE-19369 Switch to Builder Pattern In WAL

This patch switches to the builder pattern by adding a helper method.
It also checks to ensure that the pattern is available (i.e. that
HBase is running on a hadoop version that supports it).

Amending-Author: Mike Drob <md...@apache.org>
Signed-off-by: tedyu <yu...@gmail.com>
Signed-off-by: zhangduo <zh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7b07a8bc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7b07a8bc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7b07a8bc

Branch: refs/heads/branch-2.0
Commit: 7b07a8bca884dbcb8f4709158789658f73123b97
Parents: e7eadd6
Author: Alex Leblang <al...@cloudera.com>
Authored: Thu Jul 26 15:47:13 2018 -0500
Committer: Mike Drob <md...@apache.org>
Committed: Mon Jul 30 09:16:36 2018 -0500

----------------------------------------------------------------------
 .../apache/hadoop/hbase/util/CommonFSUtils.java | 127 +++++++++++++++++++
 .../procedure2/store/wal/WALProcedureStore.java |   2 +-
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java   |   1 +
 .../regionserver/wal/ProtobufLogWriter.java     |  12 +-
 .../regionserver/wal/TestHBaseWalOnEC.java      | 119 +++++++++++++++++
 5 files changed, 256 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
index 76a3601..a34048a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
@@ -817,6 +817,133 @@ public abstract class CommonFSUtils {
     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
   }
 
+  private static class DfsBuilderUtility {
+    static Class<?> dfsClass = null;
+    static Method createMethod;
+    static Method overwriteMethod;
+    static Method bufferSizeMethod;
+    static Method blockSizeMethod;
+    static Method recursiveMethod;
+    static Method replicateMethod;
+    static Method replicationMethod;
+    static Method buildMethod;
+    static boolean allMethodsPresent = false;
+
+    static {
+      String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
+      String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
+      Class<?> builderClass = null;
+
+      try {
+        dfsClass = Class.forName(dfsName);
+      } catch (ClassNotFoundException e) {
+        LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
+      }
+      try {
+        builderClass = Class.forName(builderName);
+      } catch (ClassNotFoundException e) {
+        LOG.debug("{} not available, will not use builder API for file creation.", builderName);
+      }
+
+      if (dfsClass != null && builderClass != null) {
+        try {
+          createMethod = dfsClass.getMethod("createFile", Path.class);
+          overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
+          bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
+          blockSizeMethod = builderClass.getMethod("blockSize", long.class);
+          recursiveMethod = builderClass.getMethod("recursive");
+          replicateMethod = builderClass.getMethod("replicate");
+          replicationMethod = builderClass.getMethod("replication", short.class);
+          buildMethod = builderClass.getMethod("build");
+
+          allMethodsPresent = true;
+          LOG.debug("Using builder API via reflection for DFS file creation.");
+        } catch (NoSuchMethodException e) {
+          LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
+              e.getMessage());
+        }
+      }
+    }
+
+    /**
+     * Attempt to use builder API via reflection to create a file with the given parameters and
+     * replication enabled.
+     */
+    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
+        int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
+      if (allMethodsPresent && dfsClass.isInstance(fs)) {
+        try {
+          Object builder;
+
+          builder = createMethod.invoke(fs, path);
+          builder = overwriteMethod.invoke(builder, overwritable);
+          builder = bufferSizeMethod.invoke(builder, bufferSize);
+          builder = blockSizeMethod.invoke(builder, blockSize);
+          if (isRecursive) {
+            builder = recursiveMethod.invoke(builder);
+          }
+          builder = replicateMethod.invoke(builder);
+          builder = replicationMethod.invoke(builder, replication);
+          return (FSDataOutputStream) buildMethod.invoke(builder);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          // Should have caught this failure during initialization, so log full trace here
+          LOG.warn("Couldn't use reflection with builder API", e);
+        }
+      }
+
+      if (isRecursive) {
+        return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
+      }
+      return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
+    }
+
+    /**
+     * Attempt to use builder API via reflection to create a file with the given parameters and
+     * replication enabled.
+     */
+    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
+        throws IOException {
+      if (allMethodsPresent && dfsClass.isInstance(fs)) {
+        try {
+          Object builder;
+
+          builder = createMethod.invoke(fs, path);
+          builder = overwriteMethod.invoke(builder, overwritable);
+          builder = replicateMethod.invoke(builder);
+          return (FSDataOutputStream) buildMethod.invoke(builder);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          // Should have caught this failure during initialization, so log full trace here
+          LOG.warn("Couldn't use reflection with builder API", e);
+        }
+      }
+
+      return fs.create(path, overwritable);
+    }
+  }
+
+  /**
+   * Attempt to use builder API via reflection to create a file with the given parameters and
+   * replication enabled.
+   * <p>
+   * Will not attempt to enable replication when passed an HFileSystem.
+   */
+  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
+      throws IOException {
+    return DfsBuilderUtility.createHelper(fs, path, overwritable);
+  }
+
+  /**
+   * Attempt to use builder API via reflection to create a file with the given parameters and
+   * replication enabled.
+   * <p>
+   * Will not attempt to enable replication when passed an HFileSystem.
+   */
+  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
+      int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
+    return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
+        blockSize, isRecursive);
+  }
+
   // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
   // not until we attempt to reference it.
   private static class StreamCapabilities {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 99001f7..cc3eea3 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -1028,7 +1028,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
     long startPos = -1;
     newLogFile = getLogFilePath(logId);
     try {
-      newStream = fs.create(newLogFile, false);
+      newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
     } catch (FileAlreadyExistsException e) {
       LOG.error("Log file with id=" + logId + " already exists", e);
       return false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index f9d04b9..d1645f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -54,6 +54,7 @@ public final class AsyncFSOutputHelper {
     final FSDataOutputStream out;
     int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
       CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+    // This is not a Distributed File System, so it won't be erasure coded; no builder API needed
     if (createParent) {
       out = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 9d36429..0ad3087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -61,7 +61,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
   public void close() throws IOException {
     if (this.output != null) {
       try {
-        if (!trailerWritten) writeWALTrailer();
+        if (!trailerWritten) {
+          writeWALTrailer();
+        }
         this.output.close();
       } catch (NullPointerException npe) {
         // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
@@ -74,7 +76,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
   @Override
   public void sync() throws IOException {
     FSDataOutputStream fsdos = this.output;
-    if (fsdos == null) return; // Presume closed
+    if (fsdos == null) {
+      return; // Presume closed
+    }
     fsdos.flush();
     fsdos.hflush();
   }
@@ -87,8 +91,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
       short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
-    this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize,
-      null);
+    this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
+        blockSize, false);
     // TODO Be sure to add a check for hsync if this branch includes HBASE-19024
     if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
         !(CommonFSUtils.hasCapability(output, "hflush"))) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b07a8bc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
new file mode 100644
index 0000000..a7f1624
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestHBaseWalOnEC {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
+
+  private static final HBaseTestingUtility util = new HBaseTestingUtility();
+
+  private static final String HFLUSH = "hflush";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    try {
+      MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      Method enableAllECPolicies = DFSTestUtil.class.getMethod("enableAllECPolicies",
+          DistributedFileSystem.class);
+      enableAllECPolicies.invoke(null, fs);
+
+      DFSClient client = fs.getClient();
+      Method setErasureCodingPolicy = DFSClient.class.getMethod("setErasureCodingPolicy",
+          String.class, String.class);
+      setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy
+
+      try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
+        // If this comes back as having hflush then some test setup assumption is wrong.
+        // Fail the test so that a developer has to look and triage
+        assertFalse("Did not enable EC!", CommonFSUtils.hasCapability(out, HFLUSH));
+      }
+    } catch (NoSuchMethodException e) {
+      // We're not testing anything interesting if EC is not available, so skip the rest of the test
+      Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
+    }
+
+    util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
+    util.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testStreamCreate() throws IOException {
+    try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
+        new Path("/testStreamCreate"), true)) {
+      assertTrue(CommonFSUtils.hasCapability(out, HFLUSH));
+    }
+  }
+
+  @Test
+  public void testFlush() throws IOException {
+    byte[] row = Bytes.toBytes("row");
+    byte[] cf = Bytes.toBytes("cf");
+    byte[] cq = Bytes.toBytes("cq");
+    byte[] value = Bytes.toBytes("value");
+
+    TableName name = TableName.valueOf(getClass().getSimpleName());
+
+    Table t = util.createTable(name, cf);
+    t.put(new Put(row).addColumn(cf, cq, value));
+
+    util.getAdmin().flush(name);
+
+    assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq));
+  }
+}
+