You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/01/03 09:09:18 UTC

[hbase] branch branch-2.2 updated: HBASE-23587 The FSYNC_WAL flag does not work on branch-2.x (#974)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new ef5bf2e  HBASE-23587 The FSYNC_WAL flag does not work on branch-2.x (#974)
ef5bf2e is described below

commit ef5bf2e79f3b1e6ea99e890dc5cfdf084667c105
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Jan 3 16:57:49 2020 +0800

    HBASE-23587 The FSYNC_WAL flag does not work on branch-2.x (#974)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
    Signed-off-by: stack <st...@apache.org>
---
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  | 20 ++++-
 .../hadoop/hbase/regionserver/wal/FSHLog.java      |  2 +-
 .../regionserver/wal/TestAsyncFSWALDurability.java | 43 +++++++++++
 .../regionserver/wal/TestFSHLogDurability.java     | 41 ++++++++++
 .../regionserver/wal/WALDurabilityTestBase.java    | 89 +++++++++++++---------
 5 files changed, 159 insertions(+), 36 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index c4bbe67..38051ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -341,13 +341,31 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     requestLogRoll();
   }
 
+  // find all the sync futures between these two txids to see if we need to issue a hsync, if no
+  // sync futures then just use the default one.
+  private boolean isHsync(long beginTxid, long endTxid) {
+    SortedSet<SyncFuture> futures =
+      syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
+    if (futures.isEmpty()) {
+      return useHsync;
+    }
+    for (SyncFuture future : futures) {
+      if (future.isForceSync()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private void sync(AsyncWriter writer) {
     fileLengthAtLastSync = writer.getLength();
     long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
+    boolean shouldUseHsync =
+      isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
     highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
     final long startTimeNs = System.nanoTime();
     final long epoch = (long) epochAndState >>> 2L;
-    addListener(writer.sync(useHsync), (result, error) -> {
+    addListener(writer.sync(shouldUseHsync), (result, error) -> {
       if (error != null) {
         syncFailed(epoch, error);
       } else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index d63af0b..407edc1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -574,7 +574,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
           Throwable lastException = null;
           try {
             TraceUtil.addTimelineAnnotation("syncing writer");
-            writer.sync(useHsync);
+            writer.sync(takeSyncFuture.isForceSync());
             TraceUtil.addTimelineAnnotation("writer synced");
             currentSequence = updateHighestSyncedSequence(currentSequence);
           } catch (IOException e) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
index 04996c0..f9dee07 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -72,11 +74,19 @@ public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncF
   protected Boolean getSyncFlag(CustomAsyncFSWAL wal) {
     return wal.getSyncFlag();
   }
+
+  @Override
+  protected Boolean getWriterSyncFlag(CustomAsyncFSWAL wal) {
+    return wal.getWriterSyncFlag();
+  }
 }
 
 class CustomAsyncFSWAL extends AsyncFSWAL {
+
   private Boolean syncFlag;
 
+  private Boolean writerSyncFlag;
+
   public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf,
     EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
     throws FailedLogCloseException, IOException {
@@ -85,6 +95,34 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
   }
 
   @Override
+  protected AsyncWriter createWriterInstance(Path path) throws IOException {
+    AsyncWriter writer = super.createWriterInstance(path);
+    return new AsyncWriter() {
+
+      @Override
+      public void close() throws IOException {
+        writer.close();
+      }
+
+      @Override
+      public long getLength() {
+        return writer.getLength();
+      }
+
+      @Override
+      public CompletableFuture<Long> sync(boolean forceSync) {
+        writerSyncFlag = forceSync;
+        return writer.sync(forceSync);
+      }
+
+      @Override
+      public void append(Entry entry) {
+        writer.append(entry);
+      }
+    };
+  }
+
+  @Override
   public void sync(boolean forceSync) throws IOException {
     syncFlag = forceSync;
     super.sync(forceSync);
@@ -98,9 +136,14 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
 
   void resetSyncFlag() {
     this.syncFlag = null;
+    this.writerSyncFlag = null;
   }
 
   Boolean getSyncFlag() {
     return syncFlag;
   }
+
+  Boolean getWriterSyncFlag() {
+    return writerSyncFlag;
+  }
 }
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
index e7f73d0..9c46058 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
 
@@ -51,17 +52,52 @@ public class TestFSHLogDurability extends WALDurabilityTestBase<CustomFSHLog> {
   protected Boolean getSyncFlag(CustomFSHLog wal) {
     return wal.getSyncFlag();
   }
+
+  @Override
+  protected Boolean getWriterSyncFlag(CustomFSHLog wal) {
+    return wal.getWriterSyncFlag();
+  }
 }
 
 class CustomFSHLog extends FSHLog {
   private Boolean syncFlag;
 
+  private Boolean writerSyncFlag;
+
   public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf)
     throws IOException {
     super(fs, root, logDir, conf);
   }
 
   @Override
+  protected Writer createWriterInstance(Path path) throws IOException {
+    Writer writer = super.createWriterInstance(path);
+    return new Writer() {
+
+      @Override
+      public void close() throws IOException {
+        writer.close();
+      }
+
+      @Override
+      public long getLength() {
+        return writer.getLength();
+      }
+
+      @Override
+      public void sync(boolean forceSync) throws IOException {
+        writerSyncFlag = forceSync;
+        writer.sync(forceSync);
+      }
+
+      @Override
+      public void append(Entry entry) throws IOException {
+        writer.append(entry);
+      }
+    };
+  }
+
+  @Override
   public void sync(boolean forceSync) throws IOException {
     syncFlag = forceSync;
     super.sync(forceSync);
@@ -75,9 +111,14 @@ class CustomFSHLog extends FSHLog {
 
   void resetSyncFlag() {
     this.syncFlag = null;
+    this.writerSyncFlag = null;
   }
 
   Boolean getSyncFlag() {
     return syncFlag;
   }
+
+  Boolean getWriterSyncFlag() {
+    return writerSyncFlag;
+  }
 }
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java
index bc0255b..f100b06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -77,25 +76,39 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
 
   protected abstract Boolean getSyncFlag(T wal);
 
+  protected abstract Boolean getWriterSyncFlag(T wal);
+
   @Test
   public void testWALDurability() throws IOException {
+    byte[] bytes = Bytes.toBytes(getName());
+    Put put = new Put(bytes);
+    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
+
     // global hbase.wal.hsync false, no override in put call - hflush
     conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false");
     FileSystem fs = FileSystem.get(conf);
     Path rootDir = new Path(dir + getName());
     T wal = getWAL(fs, rootDir, getName(), conf);
     HRegion region = initHRegion(tableName, null, null, wal);
-    byte[] bytes = Bytes.toBytes(getName());
-    Put put = new Put(bytes);
-    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
-
-    resetSyncFlag(wal);
-    assertNull(getSyncFlag(wal));
-    region.put(put);
-    assertFalse(getSyncFlag(wal));
-
-    region.close();
-    wal.close();
+    try {
+      resetSyncFlag(wal);
+      assertNull(getSyncFlag(wal));
+      assertNull(getWriterSyncFlag(wal));
+      region.put(put);
+      assertFalse(getSyncFlag(wal));
+      assertFalse(getWriterSyncFlag(wal));
+
+      // global hbase.wal.hsync false, durability set in put call - fsync
+      put.setDurability(Durability.FSYNC_WAL);
+      resetSyncFlag(wal);
+      assertNull(getSyncFlag(wal));
+      assertNull(getWriterSyncFlag(wal));
+      region.put(put);
+      assertTrue(getSyncFlag(wal));
+      assertTrue(getWriterSyncFlag(wal));
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(region);
+    }
 
     // global hbase.wal.hsync true, no override in put call
     conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
@@ -103,28 +116,36 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
     wal = getWAL(fs, rootDir, getName(), conf);
     region = initHRegion(tableName, null, null, wal);
 
-    resetSyncFlag(wal);
-    assertNull(getSyncFlag(wal));
-    region.put(put);
-    assertEquals(getSyncFlag(wal), true);
-
-    // global hbase.wal.hsync true, durability set in put call - fsync
-    put.setDurability(Durability.FSYNC_WAL);
-    resetSyncFlag(wal);
-    assertNull(getSyncFlag(wal));
-    region.put(put);
-    assertTrue(getSyncFlag(wal));
-
-    // global hbase.wal.hsync true, durability set in put call - sync
-    put = new Put(bytes);
-    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
-    put.setDurability(Durability.SYNC_WAL);
-    resetSyncFlag(wal);
-    assertNull(getSyncFlag(wal));
-    region.put(put);
-    assertFalse(getSyncFlag(wal));
-
-    HBaseTestingUtility.closeRegionAndWAL(region);
+    try {
+      resetSyncFlag(wal);
+      assertNull(getSyncFlag(wal));
+      assertNull(getWriterSyncFlag(wal));
+      region.put(put);
+      assertTrue(getSyncFlag(wal));
+      assertTrue(getWriterSyncFlag(wal));
+
+      // global hbase.wal.hsync true, durability set in put call - fsync
+      put.setDurability(Durability.FSYNC_WAL);
+      resetSyncFlag(wal);
+      assertNull(getSyncFlag(wal));
+      assertNull(getWriterSyncFlag(wal));
+      region.put(put);
+      assertTrue(getSyncFlag(wal));
+      assertTrue(getWriterSyncFlag(wal));
+
+      // global hbase.wal.hsync true, durability set in put call - sync
+      put = new Put(bytes);
+      put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
+      put.setDurability(Durability.SYNC_WAL);
+      resetSyncFlag(wal);
+      assertNull(getSyncFlag(wal));
+      assertNull(getWriterSyncFlag(wal));
+      region.put(put);
+      assertFalse(getSyncFlag(wal));
+      assertFalse(getWriterSyncFlag(wal));
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(region);
+    }
   }
 
   private String getName() {