You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2024/03/11 13:11:07 UTC
(hbase) branch master updated: HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation (#5733)
This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 34b738d2ac0 HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation (#5733)
34b738d2ac0 is described below
commit 34b738d2ac04f7a9acead98b90ee1e2f220afff5
Author: Charles Connell <ch...@connells.org>
AuthorDate: Mon Mar 11 09:11:01 2024 -0400
HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation (#5733)
Signed-off-by: Bryan Beaudreault <bb...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
.../hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java | 5 +++--
.../io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java | 16 ++++++++++------
.../io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 17 +++++++++--------
.../asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java | 2 +-
.../hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java | 2 +-
.../asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java | 2 +-
.../hadoop/hbase/regionserver/wal/AbstractFSWAL.java | 4 ++++
.../regionserver/wal/AbstractProtobufLogWriter.java | 8 ++++++--
.../hbase/regionserver/wal/AsyncProtobufLogWriter.java | 4 ++--
.../hbase/regionserver/wal/ProtobufLogWriter.java | 11 ++++++++---
10 files changed, 45 insertions(+), 26 deletions(-)
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index a530ca4a2a0..cbb0648f3af 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -48,11 +48,12 @@ public final class AsyncFSOutputHelper {
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
- Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
+ Class<? extends Channel> channelClass, StreamSlowMonitor monitor, boolean noLocalWrite)
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
- overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor);
+ overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor,
+ noLocalWrite);
}
final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 98590173ed2..d4a71a77a79 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -445,20 +445,24 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
}
- private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
+ private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite,
+ boolean noLocalWrite) {
List<CreateFlag> flags = new ArrayList<>();
flags.add(CreateFlag.CREATE);
if (overwrite) {
flags.add(CreateFlag.OVERWRITE);
}
+ if (noLocalWrite) {
+ flags.add(CreateFlag.NO_LOCAL_WRITE);
+ }
flags.add(CreateFlag.SHOULD_REPLICATE);
return new EnumSetWritable<>(EnumSet.copyOf(flags));
}
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
- EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
- throws IOException {
+ EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor,
+ boolean noLocalWrite) throws IOException {
Configuration conf = dfs.getConf();
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
@@ -475,7 +479,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
- getCreateFlags(overwrite), createParent, replication, blockSize,
+ getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize,
CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
@@ -561,14 +565,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
- final StreamSlowMonitor monitor) throws IOException {
+ final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
- blockSize, eventLoopGroup, channelClass, monitor);
+ blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
}
@Override
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 68b8bfa3d9f..f0910684edd 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -141,7 +141,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
- false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+ false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
writeAndVerify(FS, f, out);
}
@@ -154,7 +154,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
- false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+ false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
byte[] b = new byte[10];
Bytes.random(b);
out.write(b, 0, b.length);
@@ -183,7 +183,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
- false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+ false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive.
writeAndVerify(FS, f, out);
@@ -198,7 +198,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
- FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+ FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
fail("should fail with parent does not exist");
} catch (RemoteException e) {
LOG.info("expected exception caught", e);
@@ -220,8 +220,9 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
- try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
- f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) {
+ try (FanOutOneBlockAsyncDFSOutput output =
+ FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
+ FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
} finally {
@@ -251,7 +252,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
try (FanOutOneBlockAsyncDFSOutput output =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
- FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) {
+ FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
@@ -266,7 +267,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
- false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
+ false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true);
byte[] b = new byte[50 * 1024 * 1024];
Bytes.random(b);
out.write(b);
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
index 77752789dbb..7f6535a93a9 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
@@ -98,7 +98,7 @@ public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase {
Path f = new Path("/testHang");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2,
- FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+ FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
}
@AfterClass
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index d1ce128b118..4171b60c5b8 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -65,7 +65,7 @@ public class TestLocalAsyncOutput {
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
- fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR);
+ fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR, true);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
}
}
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index 479b8f4e603..99048ff2bed 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -255,7 +255,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
private void test(Path file) throws IOException, InterruptedException, ExecutionException {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
- true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
+ true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index ef25068512f..a94d827e8e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -179,6 +179,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
+ public static final String WAL_AVOID_LOCAL_WRITES_KEY =
+ "hbase.regionserver.wal.avoid-local-writes";
+ public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;
+
/**
* file system instance
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 890fb4e444c..e6463c563a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC;
@@ -163,8 +165,10 @@ public abstract class AbstractProtobufLogWriter {
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path));
+ boolean noLocalWrite =
+ conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT);
- initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
+ initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor, noLocalWrite);
boolean doTagCompress =
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
@@ -253,7 +257,7 @@ public abstract class AbstractProtobufLogWriter {
}
protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
- short replication, long blockSize, StreamSlowMonitor monitor)
+ short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
throws IOException, StreamLacksCapabilityException;
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e50a02f6f80..f10f3922272 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -178,10 +178,10 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
@Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
- short replication, long blockSize, StreamSlowMonitor monitor)
+ short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
throws IOException, StreamLacksCapabilityException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
- blockSize, eventLoopGroup, channelClass, monitor);
+ blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 212788c940e..52317949cc8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -100,13 +100,18 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter implements FSHL
@Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
- short replication, long blockSize, StreamSlowMonitor monitor)
+ short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
throws IOException, StreamLacksCapabilityException {
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwritable)
.bufferSize(bufferSize).replication(replication).blockSize(blockSize);
if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) {
- this.output =
- ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder).replicate().build();
+ DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder =
+ (DistributedFileSystem.HdfsDataOutputStreamBuilder) builder;
+ dfsBuilder.replicate();
+ if (noLocalWrite) {
+ dfsBuilder.noLocalWrite();
+ }
+ this.output = dfsBuilder.build();
} else {
this.output = builder.build();
}