You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2021/02/08 22:41:05 UTC

[hbase] branch master updated: HBASE-25542 Add client detail to scan name so when lease expires, we … (#2930)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new ae063f9  HBASE-25542 Add client detail to scan name so when lease expires, we … (#2930)
ae063f9 is described below

commit ae063f953eaf68492fa858c45b5b02670cb7ae31
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Mon Feb 8 14:33:00 2021 -0800

    HBASE-25542 Add client detail to scan name so when lease expires, we … (#2930)
    
    * HBASE-25542 Add client detail to scan name so when lease expires, we have clue on who was scanning
    
    When we create a scanner lease, record client ip and port (removed
    unnecessary store of scannerName).
    
    Signed-off-by: Clara Xiong <cl...@gmail.com>
---
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 173 +++++++++++++--------
 .../hbase/regionserver/TestRSRpcServices.java      |  65 ++++++++
 2 files changed, 171 insertions(+), 67 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 587919d..54395c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -23,6 +23,7 @@ import java.io.UncheckedIOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.BindException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -94,6 +95,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.QosPriority;
+import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.ipc.RpcCallback;
 import org.apache.hadoop.hbase.ipc.RpcScheduler;
@@ -399,7 +401,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * An Rpc callback for doing shipped() call on a RegionScanner.
    */
   private class RegionScannerShippedCallBack implements RpcCallback {
-
     private final String scannerName;
     private final Shipper shipper;
     private final Lease lease;
@@ -449,10 +450,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   /**
    * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
    */
-  private static final class RegionScannerHolder {
-
+  static final class RegionScannerHolder {
     private final AtomicLong nextCallSeq = new AtomicLong(0);
-    private final String scannerName;
     private final RegionScanner s;
     private final HRegion r;
     private final RpcCallback closeCallBack;
@@ -460,32 +459,39 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     private byte[] rowOfLastPartialResult;
     private boolean needCursor;
     private boolean fullRegionScan;
+    private final String clientIPAndPort;
 
-    public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r,
+    RegionScannerHolder(RegionScanner s, HRegion r,
         RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor,
-        boolean fullRegionScan) {
-      this.scannerName = scannerName;
+        boolean fullRegionScan, String clientIPAndPort) {
       this.s = s;
       this.r = r;
       this.closeCallBack = closeCallBack;
       this.shippedCallback = shippedCallback;
       this.needCursor = needCursor;
       this.fullRegionScan = fullRegionScan;
+      this.clientIPAndPort = clientIPAndPort;
     }
 
-    public long getNextCallSeq() {
+    long getNextCallSeq() {
       return nextCallSeq.get();
     }
 
-    public boolean incNextCallSeq(long currentSeq) {
+    boolean incNextCallSeq(long currentSeq) {
       // Use CAS to prevent multiple scan request running on the same scanner.
       return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
     }
+
+    // Should be called only when we need to print lease expired messages otherwise
+    // cache the String once made.
+    @Override
+    public String toString() {
+      return this.clientIPAndPort + ", " + this.r.getRegionInfo().getRegionNameAsString();
+    }
   }
 
   /**
-   * Instantiated as a scanner lease. If the lease times out, the scanner is
-   * closed
+   * Instantiated as a scanner lease. If the lease times out, the scanner is closed
    */
   private class ScannerListener implements LeaseListener {
     private final String scannerName;
@@ -497,31 +503,32 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     @Override
     public void leaseExpired() {
       RegionScannerHolder rsh = scanners.remove(this.scannerName);
-      if (rsh != null) {
-        RegionScanner s = rsh.s;
-        LOG.info("Scanner " + this.scannerName + " lease expired on region "
-          + s.getRegionInfo().getRegionNameAsString());
-        HRegion region = null;
+      if (rsh == null) {
+        LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
+        return;
+      }
+      LOG.info("Scanner lease {} expired {}, user={}", this.scannerName, rsh,
+        RpcServer.getRequestUserName().orElse(null));
+      RegionScanner s = rsh.s;
+      HRegion region = null;
+      try {
+        region = regionServer.getRegion(s.getRegionInfo().getRegionName());
+        if (region != null && region.getCoprocessorHost() != null) {
+          region.getCoprocessorHost().preScannerClose(s);
+        }
+      } catch (IOException e) {
+        LOG.error("Closing scanner {} {}, user={}", this.scannerName, rsh, e,
+          RpcServer.getRequestUserName().orElse(null));
+      } finally {
         try {
-          region = regionServer.getRegion(s.getRegionInfo().getRegionName());
+          s.close();
           if (region != null && region.getCoprocessorHost() != null) {
-            region.getCoprocessorHost().preScannerClose(s);
+            region.getCoprocessorHost().postScannerClose(s);
           }
         } catch (IOException e) {
-          LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
-        } finally {
-          try {
-            s.close();
-            if (region != null && region.getCoprocessorHost() != null) {
-              region.getCoprocessorHost().postScannerClose(s);
-            }
-          } catch (IOException e) {
-            LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
-          }
+          LOG.error("Closing scanner {} {}, user={}", this.scannerName, rsh, e,
+            RpcServer.getRequestUserName().orElse(null));
         }
-      } else {
-        LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
-          " scanner found, hence no chance to close that related scanner!");
       }
     }
   }
@@ -1305,14 +1312,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return scanners.size();
   }
 
-  public
+  /**
+   * @return The outstanding RegionScanner for <code>scannerId</code> or null if none found.
+   */
   RegionScanner getScanner(long scannerId) {
-    String scannerIdString = Long.toString(scannerId);
-    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
-    if (scannerHolder != null) {
-      return scannerHolder.s;
-    }
-    return null;
+    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
+    return rsh == null? null: rsh.s;
+  }
+
+  /**
+   * @return The associated RegionScannerHolder for <code>scannerId</code> or null.
+   */
+  private RegionScannerHolder getRegionScannerHolder(long scannerId) {
+    return scanners.get(toScannerName(scannerId));
   }
 
   public String getScanDetailsWithId(long scannerId) {
@@ -1346,12 +1358,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * Currently the vtime is the number of "next" calls.
    */
   long getScannerVirtualTime(long scannerId) {
-    String scannerIdString = Long.toString(scannerId);
-    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
-    if (scannerHolder != null) {
-      return scannerHolder.getNextCallSeq();
-    }
-    return 0L;
+    RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
+    return rsh == null? 0L: rsh.getNextCallSeq();
   }
 
   /**
@@ -1395,24 +1403,36 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return lastBlock;
   }
 
+  /**
+   * @return Remote client's ip and port else null if can't be determined.
+   */
+  static String getRemoteClientIpAndPort() {
+    RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
+    if (rpcCall == null) {
+      return HConstants.EMPTY_STRING;
+    }
+    InetAddress address = rpcCall.getRemoteAddress();
+    if (address == null) {
+      return HConstants.EMPTY_STRING;
+    }
+    // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
+    // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
+    // scanning than a hostname anyways.
+    return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
+  }
+
   private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
       HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
     Lease lease = regionServer.getLeaseManager().createLease(
         scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName));
     RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
-    RpcCallback closeCallback;
-    if (s instanceof RpcCallback) {
-      closeCallback = (RpcCallback) s;
-    } else {
-      closeCallback = new RegionScannerCloseCallBack(s);
-    }
-
-    RegionScannerHolder rsh =
-        new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback,
-          needCursor, fullRegionScan);
+    RpcCallback closeCallback = s instanceof RpcCallback?
+      (RpcCallback)s: new RegionScannerCloseCallBack(s);
+    RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
+      needCursor, fullRegionScan, getRemoteClientIpAndPort());
     RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
-      scannerName;
+      scannerName + ", " + existing;
     return rsh;
   }
 
@@ -3162,8 +3182,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   };
 
   private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
-    String scannerName = Long.toString(request.getScannerId());
-    RegionScannerHolder rsh = scanners.get(scannerName);
+    String scannerName = toScannerName(request.getScannerId());
+    RegionScannerHolder rsh = this.scanners.get(scannerName);
     if (rsh == null) {
       // just ignore the next or close request if scanner does not exists.
       if (closedScanners.getIfPresent(scannerName) != null) {
@@ -3203,8 +3223,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return rsh;
   }
 
-  private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder)
-      throws IOException {
+  /**
+   * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
+   *    value.
+   */
+  private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
+      ScanResponse.Builder builder) throws IOException {
     HRegion region = getRegion(request.getRegion());
     rejectIfInStandByState(region);
     ClientProtos.Scan protoScan = request.getScan();
@@ -3236,13 +3260,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     builder.setScannerId(scannerId);
     builder.setMvccReadPoint(scanner.getMvccReadPoint());
     builder.setTtl(scannerLeaseTimeoutPeriod);
-    String scannerName = String.valueOf(scannerId);
+    String scannerName = toScannerName(scannerId);
 
     boolean fullRegionScan = !region.getRegionInfo().getTable().isSystemTable() &&
       isFullRegionScan(scan, region);
 
-    return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(),
-      fullRegionScan);
+    return new Pair<String, RegionScannerHolder>(scannerName,
+      addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(),
+      fullRegionScan));
+  }
+
+  /**
+   * The returned String is used as key doing look up of outstanding Scanners in this Servers'
+   * this.scanners, the Map of outstanding scanners and their current state.
+   * @param scannerId A scanner long id.
+   * @return The long id as a String.
+   */
+  private static String toScannerName(long scannerId) {
+    return Long.toString(scannerId);
   }
 
   private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
@@ -3516,7 +3551,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       checkOpen();
     } catch (IOException e) {
       if (request.hasScannerId()) {
-        String scannerName = Long.toString(request.getScannerId());
+        String scannerName = toScannerName(request.getScannerId());
         if (LOG.isDebugEnabled()) {
           LOG.debug(
             "Server shutting down and client tried to access missing scanner " + scannerName);
@@ -3539,14 +3574,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     rpcScanRequestCount.increment();
     RegionScannerHolder rsh;
     ScanResponse.Builder builder = ScanResponse.newBuilder();
+    String scannerName;
     try {
       if (request.hasScannerId()) {
         // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
         // for more details.
-        builder.setScannerId(request.getScannerId());
+        long scannerId = request.getScannerId();
+        builder.setScannerId(scannerId);
+        scannerName = toScannerName(scannerId);
         rsh = getRegionScanner(request);
       } else {
-        rsh = newRegionScanner(request, builder);
+        Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
+        scannerName = scannerNameAndRSH.getFirst();
+        rsh = scannerNameAndRSH.getSecond();
       }
     } catch (IOException e) {
       if (e == SCANNER_ALREADY_CLOSED) {
@@ -3560,11 +3600,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       rpcFullScanRequestCount.increment();
     }
     HRegion region = rsh.r;
-    String scannerName = rsh.scannerName;
     LeaseManager.Lease lease;
     try {
       // Remove lease while its being processed in server; protects against case
-      // where processing of request takes > lease expiration time.
+      // where processing of request takes > lease expiration time. or null if none found.
       lease = regionServer.getLeaseManager().removeLease(scannerName);
     } catch (LeaseException e) {
       throw new ServiceException(e);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java
new file mode 100644
index 0000000..f242ddb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test parts of {@link RSRpcServices}
+ */
+@Category({ RegionServerTests.class, MediumTests.class})
+public class TestRSRpcServices {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRSRpcServices.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestRSRpcServices.class);
+
+  /**
+   * Simple test of the toString on RegionScannerHolder works.
+   * Just creates one and calls #toString on it.
+   */
+  @Test
+  public void testRegionScannerHolderToString() throws UnknownHostException {
+    RpcCall call = Mockito.mock(RpcCall.class);
+    int port = 1234;
+    Mockito.when(call.getRemotePort()).thenReturn(port);
+    InetAddress address = InetAddress.getLocalHost();
+    Mockito.when(call.getRemoteAddress()).thenReturn(address);
+    RpcServer.setCurrentCall(call);
+    String clientIpAndPort = RSRpcServices.getRemoteClientIpAndPort();
+    HRegion region = Mockito.mock(HRegion.class);
+    Mockito.when(region.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
+    RSRpcServices.RegionScannerHolder rsh = new RSRpcServices.RegionScannerHolder(null, region,
+      null, null, false, false, clientIpAndPort);
+    LOG.info("rsh={}", rsh);
+  }
+}