You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/01/03 09:09:18 UTC
[hbase] branch branch-2.2 updated: HBASE-23587 The FSYNC_WAL flag
does not work on branch-2.x (#974)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new ef5bf2e HBASE-23587 The FSYNC_WAL flag does not work on branch-2.x (#974)
ef5bf2e is described below
commit ef5bf2e79f3b1e6ea99e890dc5cfdf084667c105
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Jan 3 16:57:49 2020 +0800
HBASE-23587 The FSYNC_WAL flag does not work on branch-2.x (#974)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
Signed-off-by: stack <st...@apache.org>
---
.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 20 ++++-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 2 +-
.../regionserver/wal/TestAsyncFSWALDurability.java | 43 +++++++++++
.../regionserver/wal/TestFSHLogDurability.java | 41 ++++++++++
.../regionserver/wal/WALDurabilityTestBase.java | 89 +++++++++++++---------
5 files changed, 159 insertions(+), 36 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index c4bbe67..38051ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -341,13 +341,31 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
requestLogRoll();
}
+ // find all the sync futures between these two txids to see if we need to issue a hsync, if no
+ // sync futures then just use the default one.
+ private boolean isHsync(long beginTxid, long endTxid) {
+ SortedSet<SyncFuture> futures =
+ syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
+ if (futures.isEmpty()) {
+ return useHsync;
+ }
+ for (SyncFuture future : futures) {
+ if (future.isForceSync()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void sync(AsyncWriter writer) {
fileLengthAtLastSync = writer.getLength();
long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
+ boolean shouldUseHsync =
+ isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime();
final long epoch = (long) epochAndState >>> 2L;
- addListener(writer.sync(useHsync), (result, error) -> {
+ addListener(writer.sync(shouldUseHsync), (result, error) -> {
if (error != null) {
syncFailed(epoch, error);
} else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index d63af0b..407edc1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -574,7 +574,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
Throwable lastException = null;
try {
TraceUtil.addTimelineAnnotation("syncing writer");
- writer.sync(useHsync);
+ writer.sync(takeSyncFuture.isForceSync());
TraceUtil.addTimelineAnnotation("writer synced");
currentSequence = updateHighestSyncedSequence(currentSequence);
} catch (IOException e) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
index 04996c0..f9dee07 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -72,11 +74,19 @@ public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncF
protected Boolean getSyncFlag(CustomAsyncFSWAL wal) {
return wal.getSyncFlag();
}
+
+ @Override
+ protected Boolean getWriterSyncFlag(CustomAsyncFSWAL wal) {
+ return wal.getWriterSyncFlag();
+ }
}
class CustomAsyncFSWAL extends AsyncFSWAL {
+
private Boolean syncFlag;
+ private Boolean writerSyncFlag;
+
public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
throws FailedLogCloseException, IOException {
@@ -85,6 +95,34 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
}
@Override
+ protected AsyncWriter createWriterInstance(Path path) throws IOException {
+ AsyncWriter writer = super.createWriterInstance(path);
+ return new AsyncWriter() {
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ @Override
+ public long getLength() {
+ return writer.getLength();
+ }
+
+ @Override
+ public CompletableFuture<Long> sync(boolean forceSync) {
+ writerSyncFlag = forceSync;
+ return writer.sync(forceSync);
+ }
+
+ @Override
+ public void append(Entry entry) {
+ writer.append(entry);
+ }
+ };
+ }
+
+ @Override
public void sync(boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(forceSync);
@@ -98,9 +136,14 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
void resetSyncFlag() {
this.syncFlag = null;
+ this.writerSyncFlag = null;
}
Boolean getSyncFlag() {
return syncFlag;
}
+
+ Boolean getWriterSyncFlag() {
+ return writerSyncFlag;
+ }
}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
index e7f73d0..9c46058 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@@ -51,17 +52,52 @@ public class TestFSHLogDurability extends WALDurabilityTestBase<CustomFSHLog> {
protected Boolean getSyncFlag(CustomFSHLog wal) {
return wal.getSyncFlag();
}
+
+ @Override
+ protected Boolean getWriterSyncFlag(CustomFSHLog wal) {
+ return wal.getWriterSyncFlag();
+ }
}
class CustomFSHLog extends FSHLog {
private Boolean syncFlag;
+ private Boolean writerSyncFlag;
+
public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}
@Override
+ protected Writer createWriterInstance(Path path) throws IOException {
+ Writer writer = super.createWriterInstance(path);
+ return new Writer() {
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ @Override
+ public long getLength() {
+ return writer.getLength();
+ }
+
+ @Override
+ public void sync(boolean forceSync) throws IOException {
+ writerSyncFlag = forceSync;
+ writer.sync(forceSync);
+ }
+
+ @Override
+ public void append(Entry entry) throws IOException {
+ writer.append(entry);
+ }
+ };
+ }
+
+ @Override
public void sync(boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(forceSync);
@@ -75,9 +111,14 @@ class CustomFSHLog extends FSHLog {
void resetSyncFlag() {
this.syncFlag = null;
+ this.writerSyncFlag = null;
}
Boolean getSyncFlag() {
return syncFlag;
}
+
+ Boolean getWriterSyncFlag() {
+ return writerSyncFlag;
+ }
}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java
index bc0255b..f100b06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -77,25 +76,39 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
protected abstract Boolean getSyncFlag(T wal);
+ protected abstract Boolean getWriterSyncFlag(T wal);
+
@Test
public void testWALDurability() throws IOException {
+ byte[] bytes = Bytes.toBytes(getName());
+ Put put = new Put(bytes);
+ put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
+
// global hbase.wal.hsync false, no override in put call - hflush
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false");
FileSystem fs = FileSystem.get(conf);
Path rootDir = new Path(dir + getName());
T wal = getWAL(fs, rootDir, getName(), conf);
HRegion region = initHRegion(tableName, null, null, wal);
- byte[] bytes = Bytes.toBytes(getName());
- Put put = new Put(bytes);
- put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
-
- resetSyncFlag(wal);
- assertNull(getSyncFlag(wal));
- region.put(put);
- assertFalse(getSyncFlag(wal));
-
- region.close();
- wal.close();
+ try {
+ resetSyncFlag(wal);
+ assertNull(getSyncFlag(wal));
+ assertNull(getWriterSyncFlag(wal));
+ region.put(put);
+ assertFalse(getSyncFlag(wal));
+ assertFalse(getWriterSyncFlag(wal));
+
+ // global hbase.wal.hsync false, durability set in put call - fsync
+ put.setDurability(Durability.FSYNC_WAL);
+ resetSyncFlag(wal);
+ assertNull(getSyncFlag(wal));
+ assertNull(getWriterSyncFlag(wal));
+ region.put(put);
+ assertTrue(getSyncFlag(wal));
+ assertTrue(getWriterSyncFlag(wal));
+ } finally {
+ HBaseTestingUtility.closeRegionAndWAL(region);
+ }
// global hbase.wal.hsync true, no override in put call
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
@@ -103,28 +116,36 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
wal = getWAL(fs, rootDir, getName(), conf);
region = initHRegion(tableName, null, null, wal);
- resetSyncFlag(wal);
- assertNull(getSyncFlag(wal));
- region.put(put);
- assertEquals(getSyncFlag(wal), true);
-
- // global hbase.wal.hsync true, durability set in put call - fsync
- put.setDurability(Durability.FSYNC_WAL);
- resetSyncFlag(wal);
- assertNull(getSyncFlag(wal));
- region.put(put);
- assertTrue(getSyncFlag(wal));
-
- // global hbase.wal.hsync true, durability set in put call - sync
- put = new Put(bytes);
- put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
- put.setDurability(Durability.SYNC_WAL);
- resetSyncFlag(wal);
- assertNull(getSyncFlag(wal));
- region.put(put);
- assertFalse(getSyncFlag(wal));
-
- HBaseTestingUtility.closeRegionAndWAL(region);
+ try {
+ resetSyncFlag(wal);
+ assertNull(getSyncFlag(wal));
+ assertNull(getWriterSyncFlag(wal));
+ region.put(put);
+ assertTrue(getSyncFlag(wal));
+ assertTrue(getWriterSyncFlag(wal));
+
+ // global hbase.wal.hsync true, durability set in put call - fsync
+ put.setDurability(Durability.FSYNC_WAL);
+ resetSyncFlag(wal);
+ assertNull(getSyncFlag(wal));
+ assertNull(getWriterSyncFlag(wal));
+ region.put(put);
+ assertTrue(getSyncFlag(wal));
+ assertTrue(getWriterSyncFlag(wal));
+
+ // global hbase.wal.hsync true, durability set in put call - sync
+ put = new Put(bytes);
+ put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
+ put.setDurability(Durability.SYNC_WAL);
+ resetSyncFlag(wal);
+ assertNull(getSyncFlag(wal));
+ assertNull(getWriterSyncFlag(wal));
+ region.put(put);
+ assertFalse(getSyncFlag(wal));
+ assertFalse(getWriterSyncFlag(wal));
+ } finally {
+ HBaseTestingUtility.closeRegionAndWAL(region);
+ }
}
private String getName() {