You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2024/03/11 13:11:07 UTC

(hbase) branch master updated: HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation (#5733)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 34b738d2ac0 HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation (#5733)
34b738d2ac0 is described below

commit 34b738d2ac04f7a9acead98b90ee1e2f220afff5
Author: Charles Connell <ch...@connells.org>
AuthorDate: Mon Mar 11 09:11:01 2024 -0400

    HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation (#5733)
    
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
 .../hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java    |  5 +++--
 .../io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java  | 16 ++++++++++------
 .../io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java    | 17 +++++++++--------
 .../asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java   |  2 +-
 .../hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java   |  2 +-
 .../asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java   |  2 +-
 .../hadoop/hbase/regionserver/wal/AbstractFSWAL.java    |  4 ++++
 .../regionserver/wal/AbstractProtobufLogWriter.java     |  8 ++++++--
 .../hbase/regionserver/wal/AsyncProtobufLogWriter.java  |  4 ++--
 .../hbase/regionserver/wal/ProtobufLogWriter.java       | 11 ++++++++---
 10 files changed, 45 insertions(+), 26 deletions(-)

diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index a530ca4a2a0..cbb0648f3af 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -48,11 +48,12 @@ public final class AsyncFSOutputHelper {
    */
   public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
     boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
-    Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
+    Class<? extends Channel> channelClass, StreamSlowMonitor monitor, boolean noLocalWrite)
     throws IOException, CommonFSUtils.StreamLacksCapabilityException {
     if (fs instanceof DistributedFileSystem) {
       return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
-        overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor);
+        overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor,
+        noLocalWrite);
     }
     final FSDataOutputStream out;
     int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 98590173ed2..d4a71a77a79 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -445,20 +445,24 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     }
   }
 
-  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
+  private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite,
+    boolean noLocalWrite) {
     List<CreateFlag> flags = new ArrayList<>();
     flags.add(CreateFlag.CREATE);
     if (overwrite) {
       flags.add(CreateFlag.OVERWRITE);
     }
+    if (noLocalWrite) {
+      flags.add(CreateFlag.NO_LOCAL_WRITE);
+    }
     flags.add(CreateFlag.SHOULD_REPLICATE);
     return new EnumSetWritable<>(EnumSet.copyOf(flags));
   }
 
   private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
     boolean overwrite, boolean createParent, short replication, long blockSize,
-    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
-    throws IOException {
+    EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor,
+    boolean noLocalWrite) throws IOException {
     Configuration conf = dfs.getConf();
     DFSClient client = dfs.getClient();
     String clientName = client.getClientName();
@@ -475,7 +479,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       try {
         stat = FILE_CREATOR.create(namenode, src,
           FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
-          getCreateFlags(overwrite), createParent, replication, blockSize,
+          getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize,
           CryptoProtocolVersion.supported());
       } catch (Exception e) {
         if (e instanceof RemoteException) {
@@ -561,14 +565,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
     boolean overwrite, boolean createParent, short replication, long blockSize,
     EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
-    final StreamSlowMonitor monitor) throws IOException {
+    final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException {
     return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
 
       @Override
       public FanOutOneBlockAsyncDFSOutput doCall(Path p)
         throws IOException, UnresolvedLinkException {
         return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
-          blockSize, eventLoopGroup, channelClass, monitor);
+          blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
       }
 
       @Override
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 68b8bfa3d9f..f0910684edd 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -141,7 +141,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
     writeAndVerify(FS, f, out);
   }
 
@@ -154,7 +154,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
     byte[] b = new byte[10];
     Bytes.random(b);
     out.write(b, 0, b.length);
@@ -183,7 +183,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
     Thread.sleep(READ_TIMEOUT_MS * 2);
     // the connection to datanode should still alive.
     writeAndVerify(FS, f, out);
@@ -198,7 +198,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     try {
       FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
-        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
       fail("should fail with parent does not exist");
     } catch (RemoteException e) {
       LOG.info("expected exception caught", e);
@@ -220,8 +220,9 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
     Path f = new Path("/test");
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
-      f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) {
+    try (FanOutOneBlockAsyncDFSOutput output =
+      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
+        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) {
       // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
       assertEquals(2, output.getPipeline().length);
     } finally {
@@ -251,7 +252,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
     try (FanOutOneBlockAsyncDFSOutput output =
       FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
-        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) {
+        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) {
       // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
       assertEquals(2, output.getPipeline().length);
       assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
@@ -266,7 +267,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
+      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true);
     byte[] b = new byte[50 * 1024 * 1024];
     Bytes.random(b);
     out.write(b);
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
index 77752789dbb..7f6535a93a9 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
@@ -98,7 +98,7 @@ public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase {
     Path f = new Path("/testHang");
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2,
-      FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+      FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
   }
 
   @AfterClass
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index d1ce128b118..4171b60c5b8 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -65,7 +65,7 @@ public class TestLocalAsyncOutput {
     Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
     FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
     AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
-      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR);
+      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR, true);
     TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
   }
 }
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index 479b8f4e603..99048ff2bed 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -255,7 +255,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
   private void test(Path file) throws IOException, InterruptedException, ExecutionException {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
-      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
     TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index ef25068512f..a94d827e8e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -179,6 +179,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
   public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
   public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
 
+  public static final String WAL_AVOID_LOCAL_WRITES_KEY =
+    "hbase.regionserver.wal.avoid-local-writes";
+  public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;
+
   /**
    * file system instance
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 890fb4e444c..e6463c563a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY;
 import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
 import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC;
 import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC;
@@ -163,8 +165,10 @@ public abstract class AbstractProtobufLogWriter {
       int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
       short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
         CommonFSUtils.getDefaultReplication(fs, path));
+      boolean noLocalWrite =
+        conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT);
 
-      initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
+      initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor, noLocalWrite);
 
       boolean doTagCompress =
         doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
@@ -253,7 +257,7 @@ public abstract class AbstractProtobufLogWriter {
   }
 
   protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
-    short replication, long blockSize, StreamSlowMonitor monitor)
+    short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
     throws IOException, StreamLacksCapabilityException;
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e50a02f6f80..f10f3922272 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -178,10 +178,10 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
 
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
-    short replication, long blockSize, StreamSlowMonitor monitor)
+    short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
     throws IOException, StreamLacksCapabilityException {
     this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
-      blockSize, eventLoopGroup, channelClass, monitor);
+      blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
     this.asyncOutputWrapper = new OutputStreamWrapper(output);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 212788c940e..52317949cc8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -100,13 +100,18 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter implements FSHL
 
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
-    short replication, long blockSize, StreamSlowMonitor monitor)
+    short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
     throws IOException, StreamLacksCapabilityException {
     FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwritable)
       .bufferSize(bufferSize).replication(replication).blockSize(blockSize);
     if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) {
-      this.output =
-        ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder).replicate().build();
+      DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder =
+        (DistributedFileSystem.HdfsDataOutputStreamBuilder) builder;
+      dfsBuilder.replicate();
+      if (noLocalWrite) {
+        dfsBuilder.noLocalWrite();
+      }
+      this.output = dfsBuilder.build();
     } else {
       this.output = builder.build();
     }