You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/07/11 13:20:56 UTC

[hbase] branch branch-2.2 updated: HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length.(#2034)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 4b11ad4  HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length.(#2034)
4b11ad4 is described below

commit 4b11ad44546398dc71039de4bc07150e170efc9d
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Jul 11 16:35:10 2020 +0800

    HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length.(#2034)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Nick Dimiduk <nd...@apache.org>
---
 .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java     |  5 +++
 .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java   |  5 +++
 .../hbase/io/asyncfs/WrapperAsyncFSOutput.java     | 13 +++++-
 .../hbase/regionserver/wal/AbstractFSWAL.java      |  2 +-
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  | 14 +++----
 .../regionserver/wal/AsyncProtobufLogWriter.java   |  5 +++
 .../hbase/regionserver/wal/ProtobufLogWriter.java  | 11 +++++
 .../org/apache/hadoop/hbase/wal/WALProvider.java   | 17 ++++++++
 .../regionserver/TestFailedAppendAndSync.java      | 49 ++++++++++++----------
 .../hadoop/hbase/regionserver/TestHRegion.java     |  5 +++
 .../hadoop/hbase/regionserver/TestWALLockup.java   | 10 +++++
 .../hbase/regionserver/wal/TestAsyncFSWAL.java     |  5 +++
 .../regionserver/wal/TestAsyncFSWALDurability.java |  5 +++
 .../regionserver/wal/TestFSHLogDurability.java     |  5 +++
 .../regionserver/TestWALEntryStream.java           |  4 +-
 15 files changed, 121 insertions(+), 34 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index 3c520b8..059ca00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
    */
   @Override
   void close() throws IOException;
+
+  /**
+   * @return byteSize success synced to underlying filesystem.
+   */
+  long getSyncedLength();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index ed9da5f..5f70cfb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -577,4 +577,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
   public boolean isBroken() {
     return state == State.BROKEN;
   }
+
+  @Override
+  public long getSyncedLength() {
+    return this.ackedBlockLength;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
index bbb4e54..39f1f71 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
@@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
 
   private final ExecutorService executor;
 
+  private volatile long syncedLength = 0;
+
   public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
     this.out = out;
     this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
@@ -91,7 +93,11 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
           out.hflush();
         }
       }
-      future.complete(out.getPos());
+      long pos = out.getPos();
+      if(pos > this.syncedLength) {
+        this.syncedLength = pos;
+      }
+      future.complete(pos);
     } catch (IOException e) {
       future.completeExceptionally(e);
       return;
@@ -124,4 +130,9 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
   public boolean isBroken() {
     return false;
   }
+
+  @Override
+  public long getSyncedLength() {
+    return this.syncedLength;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 2a7ebff..5621a19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -1014,7 +1014,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       Path currentPath = getOldPath();
       if (path.equals(currentPath)) {
         W writer = this.writer;
-        return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
+        return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
       } else {
         return OptionalLong.empty();
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 38051ff..ef6870b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -663,13 +663,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     }
   }
 
-  private long closeWriter() {
-    AsyncWriter oldWriter = this.writer;
-    if (oldWriter != null) {
-      long fileLength = oldWriter.getLength();
+  protected final long closeWriter(AsyncWriter writer) {
+    if (writer != null) {
+      long fileLength = writer.getLength();
       closeExecutor.execute(() -> {
         try {
-          oldWriter.close();
+          writer.close();
         } catch (IOException e) {
           LOG.warn("close old writer failed", e);
         }
@@ -685,7 +684,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       throws IOException {
     Preconditions.checkNotNull(nextWriter);
     waitForSafePoint();
-    long oldFileLen = closeWriter();
+    long oldFileLen = closeWriter(this.writer);
     logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
     this.writer = nextWriter;
     if (nextWriter instanceof AsyncProtobufLogWriter) {
@@ -711,7 +710,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   @Override
   protected void doShutdown() throws IOException {
     waitForSafePoint();
-    closeWriter();
+    closeWriter(this.writer);
+    this.writer = null;
     closeExecutor.shutdown();
     try {
       if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e731611..8c944b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -231,4 +231,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
   protected OutputStream getOutputStreamForCellEncoder() {
     return asyncOutputWrapper;
   }
+
+  @Override
+  public long getSyncedLength() {
+    return this.output.getSyncedLength();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 5c8e0d2..4d8548c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.util.AtomicUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.wal.FSHLogProvider;
@@ -45,6 +48,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
 
   protected FSDataOutputStream output;
 
+  private final AtomicLong syncedLength = new AtomicLong(0);
+
   @Override
   public void append(Entry entry) throws IOException {
     entry.getKey().getBuilder(compressor).
@@ -84,6 +89,12 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
     } else {
       fsdos.hflush();
     }
+    AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
+  }
+
+  @Override
+  public long getSyncedLength() {
+    return this.syncedLength.get();
   }
 
   public FSDataOutputStream getStream() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 6f0b983..c3bd149 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -25,6 +25,7 @@ import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -74,6 +75,22 @@ public interface WALProvider {
 
   interface WriterBase extends Closeable {
     long getLength();
+    /**
+     * NOTE: We add this method for {@link WALFileLengthProvider} used for replication,
+     * considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently,
+     * according to the visibility guarantee of HDFS, the data will be available immediately
+     * when arriving at DN since all the DNs will be considered as the last one in pipeline.
+     * This means replication may read uncommitted data and replicate it to the remote cluster
+     * and cause data inconsistency.
+     * The method {@link WriterBase#getLength} may return length which just in hdfs client
+     * buffer and not successfully synced to HDFS, so we use this method to return the length
+     * successfully synced to HDFS and replication thread could only read writing WAL file
+     * limited by this length.
+     * see also HBASE-14004 and this document for more details:
+     * https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit#
+     * @return byteSize successfully synced to underlying filesystem.
+     */
+    long getSyncedLength();
   }
 
   // Writers are used internally. Users outside of the WAL should be relying on the
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 4e6a1fe..2013778 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -130,35 +130,40 @@ public class TestFailedAppendAndSync {
       @Override
       protected Writer createWriterInstance(Path path) throws IOException {
         final Writer w = super.createWriterInstance(path);
-          return new Writer() {
-            @Override
-            public void close() throws IOException {
-              w.close();
-            }
+        return new Writer() {
+          @Override
+          public void close() throws IOException {
+            w.close();
+          }
 
-            @Override
-            public void sync(boolean forceSync) throws IOException {
-              if (throwSyncException) {
-                throw new IOException("FAKE! Failed to replace a bad datanode...");
-              }
-              w.sync(forceSync);
+          @Override
+          public void sync(boolean forceSync) throws IOException {
+            if (throwSyncException) {
+              throw new IOException("FAKE! Failed to replace a bad datanode...");
             }
+            w.sync(forceSync);
+          }
 
-            @Override
-            public void append(Entry entry) throws IOException {
-              if (throwAppendException) {
-                throw new IOException("FAKE! Failed to replace a bad datanode...");
-              }
-              w.append(entry);
+          @Override
+          public void append(Entry entry) throws IOException {
+            if (throwAppendException) {
+              throw new IOException("FAKE! Failed to replace a bad datanode...");
             }
+            w.append(entry);
+          }
 
-            @Override
-            public long getLength() {
-              return w.getLength();
-              }
-            };
+          @Override
+          public long getLength() {
+            return w.getLength();
           }
+
+          @Override
+          public long getSyncedLength() {
+            return w.getSyncedLength();
+          }
+        };
       }
+    }
 
     // Make up mocked server and services.
     RegionServerServices services = mock(RegionServerServices.class);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 66edbd7..a7fd62f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -1250,6 +1250,11 @@ public class TestHRegion {
           public long getLength() {
             return w.getLength();
           }
+
+          @Override
+          public long getSyncedLength() {
+            return w.getSyncedLength();
+          }
         };
       }
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index a50ef78..21f1774 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -190,6 +190,11 @@ public class TestWALLockup {
         public long getLength() {
           return w.getLength();
         }
+
+        @Override
+        public long getSyncedLength() {
+          return w.getSyncedLength();
+        }
       };
     }
   }
@@ -374,6 +379,11 @@ public class TestWALLockup {
           public long getLength() {
             return w.getLength();
           }
+
+          @Override
+          public long getSyncedLength() {
+            return w.getSyncedLength();
+          }
         };
       }
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index 704cdfa..f31a908 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -156,6 +156,11 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
               }
 
               @Override
+              public long getSyncedLength() {
+                return writer.getSyncedLength();
+              }
+
+              @Override
               public CompletableFuture<Long> sync(boolean forceSync) {
                 CompletableFuture<Long> result = writer.sync(forceSync);
                 if (failedCount.incrementAndGet() < 1000) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
index f9dee07..2b1da95 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
@@ -110,6 +110,11 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
       }
 
       @Override
+      public long getSyncedLength() {
+        return writer.getSyncedLength();
+      }
+
+      @Override
       public CompletableFuture<Long> sync(boolean forceSync) {
         writerSyncFlag = forceSync;
         return writer.sync(forceSync);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
index 9c46058..3c25044 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
@@ -85,6 +85,11 @@ class CustomFSHLog extends FSHLog {
       }
 
       @Override
+      public long getSyncedLength() {
+        return writer.getSyncedLength();
+      }
+
+      @Override
       public void sync(boolean forceSync) throws IOException {
         writerSyncFlag = forceSync;
         writer.sync(forceSync);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index c412eca..b4af38b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -413,9 +413,7 @@ public class TestWALEntryStream {
     batch = reader.take();
     assertEquals(walPath, batch.getLastWalPath());
     assertEquals(5, batch.getNbEntries());
-    // Actually this should be true but we haven't handled this yet since for a normal queue the
-    // last one is always open... Not a big deal for now.
-    assertFalse(batch.isEndOfFile());
+    assertTrue(batch.isEndOfFile());
 
     assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
   }