You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2018/02/21 18:04:41 UTC

hbase git commit: HBASE-20037 Race when calling SequenceIdAccounting.resetHighest

Repository: hbase
Updated Branches:
  refs/heads/branch-2 2b4df5e36 -> 30c2dcd88


HBASE-20037 Race when calling SequenceIdAccounting.resetHighest


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30c2dcd8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30c2dcd8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30c2dcd8

Branch: refs/heads/branch-2
Commit: 30c2dcd8832d6148fc1c4a430f90e7660b1af3c1
Parents: 2b4df5e
Author: zhangduo <zh...@apache.org>
Authored: Wed Feb 21 18:19:01 2018 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Wed Feb 21 10:04:33 2018 -0800

----------------------------------------------------------------------
 .../hbase/regionserver/wal/AbstractFSWAL.java   | 37 +++++++++--------
 .../hbase/regionserver/wal/AsyncFSWAL.java      | 42 ++++++++++----------
 .../wal/AsyncProtobufLogWriter.java             |  4 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |  6 +--
 .../regionserver/wal/ProtobufLogWriter.java     | 11 +++--
 .../regionserver/wal/SequenceIdAccounting.java  | 40 ++++++++++---------
 6 files changed, 74 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 14fbe10..ce8dafa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -661,9 +661,26 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     }
   }
 
+  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
+    int oldNumEntries = this.numEntries.getAndSet(0);
+    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
+    if (oldPath != null) {
+      this.walFile2Props.put(oldPath,
+        new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
+      this.totalLogSize.addAndGet(oldFileLen);
+      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
+        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
+        newPathString);
+    } else {
+      LOG.info("New WAL {}", newPathString);
+    }
+  }
+
   /**
+   * <p>
    * Cleans up current writer closing it and then puts in place the passed in
    * <code>nextWriter</code>.
+   * </p>
    * <p>
    * <ul>
    * <li>In the case of creating a new WAL, oldPath will be null.</li>
@@ -672,26 +689,17 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
    * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
    * null.</li>
    * </ul>
+   * </p>
    * @param oldPath may be null
    * @param newPath may be null
    * @param nextWriter may be null
    * @return the passed in <code>newPath</code>
    * @throws IOException if there is a problem flushing or closing the underlying FS
    */
+  @VisibleForTesting
   Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
     try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
-      long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
-      int oldNumEntries = this.numEntries.getAndSet(0);
-      final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath));
-      if (oldPath != null) {
-        this.walFile2Props.put(oldPath,
-          new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
-        this.totalLogSize.addAndGet(oldFileLen);
-        LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
-          ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
-      } else {
-        LOG.info("New WAL " + newPathString);
-      }
+      doReplaceWriter(oldPath, newPath, nextWriter);
       return newPath;
     }
   }
@@ -1021,10 +1029,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
   protected abstract W createWriterInstance(Path path)
       throws IOException, CommonFSUtils.StreamLacksCapabilityException;
 
-  /**
-   * @return old wal file size
-   */
-  protected abstract long doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
+  protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
       throws IOException;
 
   protected abstract void doShutdown() throws IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 19d89df..d22d1ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -631,11 +631,29 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     }
   }
 
+  private long closeWriter() {
+    AsyncWriter oldWriter = this.writer;
+    if (oldWriter != null) {
+      long fileLength = oldWriter.getLength();
+      closeExecutor.execute(() -> {
+        try {
+          oldWriter.close();
+        } catch (IOException e) {
+          LOG.warn("close old writer failed", e);
+        }
+      });
+      return fileLength;
+    } else {
+      return 0L;
+    }
+  }
+
   @Override
-  protected long doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
+  protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
       throws IOException {
     waitForSafePoint();
-    final AsyncWriter oldWriter = this.writer;
+    long oldFileLen = closeWriter();
+    logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
     this.writer = nextWriter;
     if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
       this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
@@ -654,13 +672,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     } finally {
       consumeLock.unlock();
     }
-    return executeClose(closeExecutor, oldWriter);
   }
 
   @Override
   protected void doShutdown() throws IOException {
     waitForSafePoint();
-    executeClose(closeExecutor, writer);
+    closeWriter();
     closeExecutor.shutdown();
     try {
       if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
@@ -698,23 +715,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     }
   }
 
-  private static long executeClose(ExecutorService closeExecutor, AsyncWriter writer) {
-    long fileLength;
-    if (writer != null) {
-      fileLength = writer.getLength();
-      closeExecutor.execute(() -> {
-        try {
-          writer.close();
-        } catch (IOException e) {
-          LOG.warn("close old writer failed", e);
-        }
-      });
-    } else {
-      fileLength = 0L;
-    }
-    return fileLength;
-  }
-
   @Override
   protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
     writer.append(entry);

http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index 9688bbd..67258ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -34,13 +33,14 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
 import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index a9a3835..d927b7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -298,7 +298,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
   }
 
   @Override
-  protected long doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
+  protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
     // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer
     // thread will eventually pause. An error hereafter needs to release the writer thread
     // regardless -- hence the finally block below. Note, this method is called from the FSHLog
@@ -320,7 +320,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
     }
     afterCreatingZigZagLatch();
-    long oldFileLen = 0L;
     try {
       // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the
       // ring buffer between the above notification of writer that we want it to go to
@@ -343,6 +342,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
         LOG.warn(
           "Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
       }
+      long oldFileLen = 0L;
       // It is at the safe point. Swap out writer from under the blocked writer thread.
       // TODO: This is close is inline with critical section. Should happen in background?
       if (this.writer != null) {
@@ -363,6 +363,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
           }
         }
       }
+      logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
       this.writer = nextWriter;
       if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
         this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
@@ -397,7 +398,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
         }
       }
     }
-    return oldFileLen;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index aeb2c19..9d36429 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
@@ -25,15 +23,16 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
-import org.apache.hadoop.hbase.wal.FSHLogProvider;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
  * Writer for protobuf-based WAL.

http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index f4cacb2..e14ce0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,25 +27,28 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ImmutableByteArray;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ImmutableByteArray;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
+ * <p>
  * Accounting of sequence ids per region and then by column family. So we can our accounting
  * current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
  * keep abreast of the state of sequence id persistence. Also call update per append.
+ * </p>
  * <p>
  * For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by
- * {@link HRegionInfo#getEncodedNameAsBytes()}. So it is safe to use it as a hash key. And for
- * family name, we use {@link ImmutableByteArray} as key. This is because hash based map is much
- * faster than RBTree or CSLM and here we are on the critical write path. See HBASE-16278 for more
- * details.
+ * {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
+ * it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
+ * hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
+ * HBASE-16278 for more details.
+ * </p>
  */
 @InterfaceAudience.Private
 class SequenceIdAccounting {
@@ -93,14 +94,17 @@ class SequenceIdAccounting {
    */
   private final Map<byte[], Map<ImmutableByteArray, Long>> flushingSequenceIds = new HashMap<>();
 
- /**
-  * Map of region encoded names to the latest/highest region sequence id.  Updated on each
-  * call to append.
-  * <p>
-  * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
-  * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
-  * the same array.
-  */
+  /**
+   * <p>
+   * Map of region encoded names to the latest/highest region sequence id. Updated on each call to
+   * append.
+   * </p>
+   * <p>
+   * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
+   * use {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()} as keys. For a
+   * given region, it always returns the same array.
+   * </p>
+   */
   private Map<byte[], Long> highestSequenceIds = new HashMap<>();
 
   /**