You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/07/04 13:15:10 UTC

[hbase] branch branch-2 updated: HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length. (#1970)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new f834919  HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length. (#1970)
f834919 is described below

commit f8349199290a642c91908dd13037227f9eaebb35
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Jul 4 21:00:35 2020 +0800

    HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length. (#1970)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java     |  5 +++
 .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java   |  5 +++
 .../hbase/io/asyncfs/WrapperAsyncFSOutput.java     | 13 +++++-
 .../hbase/regionserver/wal/AbstractFSWAL.java      |  2 +-
 .../regionserver/wal/AsyncProtobufLogWriter.java   |  5 +++
 .../hbase/regionserver/wal/ProtobufLogWriter.java  | 11 +++++
 .../org/apache/hadoop/hbase/wal/WALProvider.java   | 17 ++++++++
 .../regionserver/TestFailedAppendAndSync.java      | 49 ++++++++++++----------
 .../hadoop/hbase/regionserver/TestHRegion.java     |  5 +++
 .../hadoop/hbase/regionserver/TestWALLockup.java   | 10 +++++
 .../hbase/regionserver/wal/TestAsyncFSWAL.java     |  5 +++
 .../regionserver/wal/TestAsyncFSWALDurability.java |  5 +++
 .../regionserver/wal/TestFSHLogDurability.java     |  5 +++
 .../hbase/regionserver/wal/TestLogRolling.java     | 10 +++++
 14 files changed, 123 insertions(+), 24 deletions(-)

diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index 3c520b8..059ca00 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
    */
   @Override
   void close() throws IOException;
+
+  /**
+   * @return byteSize success synced to underlying filesystem.
+   */
+  long getSyncedLength();
 }
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index ed5bbf0..457b7c1 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -574,4 +574,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
   public boolean isBroken() {
     return state == State.BROKEN;
   }
+
+  @Override
+  public long getSyncedLength() {
+    return this.ackedBlockLength;
+  }
 }
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
index bbb4e54..39f1f71 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
@@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
 
   private final ExecutorService executor;
 
+  private volatile long syncedLength = 0;
+
   public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
     this.out = out;
     this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
@@ -91,7 +93,11 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
           out.hflush();
         }
       }
-      future.complete(out.getPos());
+      long pos = out.getPos();
+      if(pos > this.syncedLength) {
+        this.syncedLength = pos;
+      }
+      future.complete(pos);
     } catch (IOException e) {
       future.completeExceptionally(e);
       return;
@@ -124,4 +130,9 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
   public boolean isBroken() {
     return false;
   }
+
+  @Override
+  public long getSyncedLength() {
+    return this.syncedLength;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index bf53352..a978dbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -1061,7 +1061,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       Path currentPath = getOldPath();
       if (path.equals(currentPath)) {
         W writer = this.writer;
-        return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
+        return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
       } else {
         return OptionalLong.empty();
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e731611..8c944b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -231,4 +231,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
   protected OutputStream getOutputStreamForCellEncoder() {
     return asyncOutputWrapper;
   }
+
+  @Override
+  public long getSyncedLength() {
+    return this.output.getSyncedLength();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index ff08da8..4bbc13d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -19,11 +19,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.util.AtomicUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.wal.FSHLogProvider;
@@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
 
   protected FSDataOutputStream output;
 
+  private final AtomicLong syncedLength = new AtomicLong(0);
+
   @Override
   public void append(Entry entry) throws IOException {
     entry.getKey().getBuilder(compressor).
@@ -85,6 +90,12 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
     } else {
       fsdos.hflush();
     }
+    AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
+  }
+
+  @Override
+  public long getSyncedLength() {
+    return this.syncedLength.get();
   }
 
   public FSDataOutputStream getStream() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 6f0b983..c3bd149 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -25,6 +25,7 @@ import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -74,6 +75,22 @@ public interface WALProvider {
 
   interface WriterBase extends Closeable {
     long getLength();
+    /**
+     * NOTE: We add this method for {@link WALFileLengthProvider} used for replication,
+     * considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently,
+     * according to the visibility guarantee of HDFS, the data will be available immediately
+     * when arriving at DN since all the DNs will be considered as the last one in pipeline.
+     * This means replication may read uncommitted data and replicate it to the remote cluster
+     * and cause data inconsistency.
+     * The method {@link WriterBase#getLength} may return length which just in hdfs client
+     * buffer and not successfully synced to HDFS, so we use this method to return the length
+     * successfully synced to HDFS and replication thread could only read writing WAL file
+     * limited by this length.
+     * see also HBASE-14004 and this document for more details:
+     * https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit#
+     * @return byteSize successfully synced to underlying filesystem.
+     */
+    long getSyncedLength();
   }
 
   // Writers are used internally. Users outside of the WAL should be relying on the
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 25ea112..198e64b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -130,35 +130,40 @@ public class TestFailedAppendAndSync {
       @Override
       protected Writer createWriterInstance(Path path) throws IOException {
         final Writer w = super.createWriterInstance(path);
-          return new Writer() {
-            @Override
-            public void close() throws IOException {
-              w.close();
-            }
+        return new Writer() {
+          @Override
+          public void close() throws IOException {
+            w.close();
+          }
 
-            @Override
-            public void sync(boolean forceSync) throws IOException {
-              if (throwSyncException) {
-                throw new IOException("FAKE! Failed to replace a bad datanode...");
-              }
-              w.sync(forceSync);
+          @Override
+          public void sync(boolean forceSync) throws IOException {
+            if (throwSyncException) {
+              throw new IOException("FAKE! Failed to replace a bad datanode...");
             }
+            w.sync(forceSync);
+          }
 
-            @Override
-            public void append(Entry entry) throws IOException {
-              if (throwAppendException) {
-                throw new IOException("FAKE! Failed to replace a bad datanode...");
-              }
-              w.append(entry);
+          @Override
+          public void append(Entry entry) throws IOException {
+            if (throwAppendException) {
+              throw new IOException("FAKE! Failed to replace a bad datanode...");
             }
+            w.append(entry);
+          }
 
-            @Override
-            public long getLength() {
-              return w.getLength();
-              }
-            };
+          @Override
+          public long getLength() {
+            return w.getLength();
           }
+
+          @Override
+          public long getSyncedLength() {
+            return w.getSyncedLength();
+          }
+        };
       }
+    }
 
     // Make up mocked server and services.
     RegionServerServices services = mock(RegionServerServices.class);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 3448eb7..41ca8a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -1251,6 +1251,11 @@ public class TestHRegion {
           public long getLength() {
             return w.getLength();
           }
+
+          @Override
+          public long getSyncedLength() {
+            return w.getSyncedLength();
+          }
         };
       }
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index a50ef78..21f1774 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -190,6 +190,11 @@ public class TestWALLockup {
         public long getLength() {
           return w.getLength();
         }
+
+        @Override
+        public long getSyncedLength() {
+          return w.getSyncedLength();
+        }
       };
     }
   }
@@ -374,6 +379,11 @@ public class TestWALLockup {
           public long getLength() {
             return w.getLength();
           }
+
+          @Override
+          public long getSyncedLength() {
+            return w.getSyncedLength();
+          }
         };
       }
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index 704cdfa..f31a908 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -156,6 +156,11 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
               }
 
               @Override
+              public long getSyncedLength() {
+                return writer.getSyncedLength();
+              }
+
+              @Override
               public CompletableFuture<Long> sync(boolean forceSync) {
                 CompletableFuture<Long> result = writer.sync(forceSync);
                 if (failedCount.incrementAndGet() < 1000) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
index 353f549..a482d93 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
@@ -110,6 +110,11 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
       }
 
       @Override
+      public long getSyncedLength() {
+        return writer.getSyncedLength();
+      }
+
+      @Override
       public CompletableFuture<Long> sync(boolean forceSync) {
         writerSyncFlag = forceSync;
         return writer.sync(forceSync);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
index 9c46058..3c25044 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
@@ -85,6 +85,11 @@ class CustomFSHLog extends FSHLog {
       }
 
       @Override
+      public long getSyncedLength() {
+        return writer.getSyncedLength();
+      }
+
+      @Override
       public void sync(boolean forceSync) throws IOException {
         writerSyncFlag = forceSync;
         writer.sync(forceSync);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index 691250a..0712b59 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -174,6 +174,11 @@ public class TestLogRolling extends AbstractTestLogRolling {
         public long getLength() {
           return oldWriter1.getLength();
         }
+
+        @Override
+        public long getSyncedLength() {
+          return oldWriter1.getSyncedLength();
+        }
       };
       log.setWriter(newWriter1);
 
@@ -231,6 +236,11 @@ public class TestLogRolling extends AbstractTestLogRolling {
         public long getLength() {
           return oldWriter2.getLength();
         }
+
+        @Override
+        public long getSyncedLength() {
+          return oldWriter2.getSyncedLength();
+        }
       };
       log.setWriter(newWriter2);