You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2023/03/02 03:24:32 UTC

[hbase] branch branch-2.5 updated: HBASE-27458 Use ReadWriteLock for region scanner readpoint map (#5069)

This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 79a7bf770a6 HBASE-27458 Use ReadWriteLock for region scanner readpoint map (#5069)
79a7bf770a6 is described below

commit 79a7bf770a69eb68b12201aaf81e788d845c7586
Author: Ruanhui <32...@users.noreply.github.com>
AuthorDate: Thu Mar 2 11:24:20 2023 +0800

    HBASE-27458 Use ReadWriteLock for region scanner readpoint map (#5069)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 17 ++--
 .../regionserver/ReadPointCalculationLock.java     | 90 ++++++++++++++++++++++
 .../hbase/regionserver/RegionScannerImpl.java      |  5 +-
 3 files changed, 103 insertions(+), 9 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 614f60b0135..2d0ed75f664 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -397,6 +397,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
 
   final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
+  final ReadPointCalculationLock smallestReadPointCalcLock;
 
   /**
    * The sequence ID that was enLongAddered when this region was opened.
@@ -435,19 +436,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    *         this readPoint, are included in every read operation.
    */
   public long getSmallestReadPoint() {
-    long minimumReadPoint;
     // We need to ensure that while we are calculating the smallestReadPoint
     // no new RegionScanners can grab a readPoint that we are unaware of.
-    // We achieve this by synchronizing on the scannerReadPoints object.
-    synchronized (scannerReadPoints) {
-      minimumReadPoint = mvcc.getReadPoint();
+    smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.CALCULATION_LOCK);
+    try {
+      long minimumReadPoint = mvcc.getReadPoint();
       for (Long readPoint : this.scannerReadPoints.values()) {
-        if (readPoint < minimumReadPoint) {
-          minimumReadPoint = readPoint;
-        }
+        minimumReadPoint = Math.min(minimumReadPoint, readPoint);
       }
+      return minimumReadPoint;
+    } finally {
+      smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.CALCULATION_LOCK);
     }
-    return minimumReadPoint;
   }
 
   /*
@@ -812,6 +812,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     setHTableSpecificConf();
     this.scannerReadPoints = new ConcurrentHashMap<>();
+    this.smallestReadPointCalcLock = new ReadPointCalculationLock(conf);
 
     this.busyWaitDuration = conf.getLong("hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java
new file mode 100644
index 00000000000..380b82de93a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Lock to manage concurrency between {@link RegionScanner} and
+ * {@link HRegion#getSmallestReadPoint()}. We need to ensure that while we are calculating the
+ * smallest read point, no new scanners can modify the scannerReadPoints Map. We used to achieve
+ * this by synchronizing on the scannerReadPoints object. But this may block the read thread and
+ * reduce the read performance. Since the scannerReadPoints object is a
+ * {@link java.util.concurrent.ConcurrentHashMap}, which is thread-safe, so the
+ * {@link RegionScanner} can record their read points concurrently, what it needs to do is just
+ * acquiring a shared lock. When we calculate the smallest read point, we need to acquire an
+ * exclusive lock. This can improve read performance in most scenarios, only not when we have a lot
+ * of delta operations, like {@link org.apache.hadoop.hbase.client.Append} or
+ * {@link org.apache.hadoop.hbase.client.Increment}. So we introduce a flag to enable/disable this
+ * feature.
+ */
+@InterfaceAudience.Private
+public class ReadPointCalculationLock {
+
+  public enum LockType {
+    CALCULATION_LOCK,
+    RECORDING_LOCK
+  }
+
+  private final boolean useReadWriteLockForReadPoints;
+  private Lock lock;
+  private ReadWriteLock readWriteLock;
+
+  ReadPointCalculationLock(Configuration conf) {
+    this.useReadWriteLockForReadPoints =
+      conf.getBoolean("hbase.region.readpoints.read.write.lock.enable", false);
+    if (useReadWriteLockForReadPoints) {
+      readWriteLock = new ReentrantReadWriteLock();
+    } else {
+      lock = new ReentrantLock();
+    }
+  }
+
+  void lock(LockType lockType) {
+    if (useReadWriteLockForReadPoints) {
+      assert lock == null;
+      if (lockType == LockType.CALCULATION_LOCK) {
+        readWriteLock.writeLock().lock();
+      } else {
+        readWriteLock.readLock().lock();
+      }
+    } else {
+      assert readWriteLock == null;
+      lock.lock();
+    }
+  }
+
+  void unlock(LockType lockType) {
+    if (useReadWriteLockForReadPoints) {
+      assert lock == null;
+      if (lockType == LockType.CALCULATION_LOCK) {
+        readWriteLock.writeLock().unlock();
+      } else {
+        readWriteLock.readLock().unlock();
+      }
+    } else {
+      assert readWriteLock == null;
+      lock.unlock();
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index d010d71f6cf..11d4c20f581 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -130,7 +130,8 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
     long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
     this.scannerReadPoints = region.scannerReadPoints;
     this.rsServices = region.getRegionServerServices();
-    synchronized (scannerReadPoints) {
+    region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
+    try {
       if (mvccReadPoint > 0) {
         this.readPt = mvccReadPoint;
       } else if (hasNonce(region, nonce)) {
@@ -139,6 +140,8 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
         this.readPt = region.getReadPoint(isolationLevel);
       }
       scannerReadPoints.put(this, this.readPt);
+    } finally {
+      region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
     }
     initializeScanners(scan, additionalScanners);
   }