You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/27 20:17:55 UTC

[03/29] hbase git commit: HBASE-18042 Client Compatibility breaks between versions 1.2 and 1.3

HBASE-18042 Client Compatibility breaks between versions 1.2 and 1.3


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6846b039
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6846b039
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6846b039

Branch: refs/heads/HBASE-14614
Commit: 6846b03944d7e72301b825d4d118732c0ca65577
Parents: efc7edc
Author: zhangduo <zh...@apache.org>
Authored: Thu May 25 11:02:09 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat May 27 17:55:49 2017 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/RSRpcServices.java       |  58 +++++---
 .../hbase/client/TestAlwaysSetScannerId.java    |   5 +-
 .../hadoop/hbase/client/TestLeaseRenewal.java   |   3 +-
 .../client/TestScanWithoutFetchingData.java     | 131 +++++++++++++++++++
 4 files changed, 175 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6846b039/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b3ca94d..1f3fede 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -27,10 +29,20 @@ import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
@@ -194,7 +206,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMet
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -265,7 +276,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
   private final AtomicLong scannerIdGen = new AtomicLong(0L);
   private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
-
+  // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
+  // which may send next or close request to a region scanner which has already been exhausted. The
+  // entries will be removed automatically after scannerLeaseTimeoutPeriod.
+  private final Cache<String, String> closedScanners;
   /**
    * The lease timeout period for client scanners (milliseconds).
    */
@@ -1168,6 +1182,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
     rpcServer.setErrorHandler(this);
     rs.setName(name);
+
+    closedScanners = CacheBuilder.newBuilder()
+        .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
   }
 
   @Override
@@ -2790,18 +2807,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     String scannerName = Long.toString(request.getScannerId());
     RegionScannerHolder rsh = scanners.get(scannerName);
     if (rsh == null) {
-      // just ignore the close request if scanner does not exists.
-      if (request.hasCloseScanner() && request.getCloseScanner()) {
+      // just ignore the next or close request if scanner does not exists.
+      if (closedScanners.getIfPresent(scannerName) != null) {
         throw SCANNER_ALREADY_CLOSED;
       } else {
         LOG.warn("Client tried to access missing scanner " + scannerName);
         throw new UnknownScannerException(
-            "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
-                + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
-                + "long wait between consecutive client checkins, c) Server may be closing down, "
-                + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
-                + "possible fix would be increasing the value of"
-                + "'hbase.client.scanner.timeout.period' configuration.");
+            "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " +
+                "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " +
+                "long wait between consecutive client checkins, c) Server may be closing down, " +
+                "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " +
+                "possible fix would be increasing the value of" +
+                "'hbase.client.scanner.timeout.period' configuration.");
       }
     }
     HRegionInfo hri = rsh.s.getRegionInfo();
@@ -3061,13 +3078,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           }
           values.clear();
         }
-        if (limitReached || moreRows) {
-          // We stopped prematurely
-          builder.setMoreResultsInRegion(true);
-        } else {
-          // We didn't get a single batch
-          builder.setMoreResultsInRegion(false);
-        }
+        builder.setMoreResultsInRegion(moreRows);
         // Check to see if the client requested that we track metrics server side. If the
         // client requested metrics, retrieve the metrics from the scanner context.
         if (trackMetrics) {
@@ -3238,7 +3249,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         if (!done) {
           scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
             results, builder, lastBlock, context);
+        } else {
+          builder.setMoreResultsInRegion(!results.isEmpty());
         }
+      } else {
+        // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
+        builder.setMoreResultsInRegion(true);
       }
 
       quota.addScanResult(results);
@@ -3252,6 +3268,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         // is false. Can remove the isEmpty check after we get rid of the old implementation.
         builder.setMoreResults(false);
       }
+      // Later we may close the scanner depending on this flag so here we need to make sure that we
+      // have already set this flag.
+      assert builder.hasMoreResultsInRegion();
       // we only set moreResults to false in the above code, so set it to true if we haven't set it
       // yet.
       if (!builder.hasMoreResults()) {
@@ -3276,7 +3295,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         closeScanner(region, scanner, scannerName, context);
       }
       return builder.build();
-    } catch (Exception e) {
+    } catch (IOException e) {
       try {
         // scanner is closed here
         scannerClosed = true;
@@ -3353,6 +3372,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       if (region.getCoprocessorHost() != null) {
         region.getCoprocessorHost().postScannerClose(scanner);
       }
+      closedScanners.put(scannerName, scannerName);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6846b039/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java
index d9f226f..9f5f621 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
@@ -84,7 +85,7 @@ public class TestAlwaysSetScannerId {
     long scannerId = resp.getScannerId();
     int nextCallSeq = 0;
     // test next
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < COUNT / 2; i++) {
       req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1);
       resp = STUB.scan(null, req);
       assertTrue(resp.hasScannerId());

http://git-wip-us.apache.org/repos/asf/hbase/blob/6846b039/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java
index 5dba207..87d8a6e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java
@@ -123,7 +123,8 @@ public class TestLeaseRenewal {
     assertTrue(rs.renewLease());
     // make sure we haven't advanced the scanner
     assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES));
-    assertTrue(rs.renewLease());
+    // renewLease should return false now as we have read all the data already
+    assertFalse(rs.renewLease());
     // make sure scanner is exhausted now
     assertNull(rs.next());
     // renewLease should return false now

http://git-wip-us.apache.org/repos/asf/hbase/blob/6846b039/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java
new file mode 100644
index 0000000..9edcf20
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase to make sure that we do not close scanners if ScanRequest.numberOfRows is zero. See
+ * HBASE-18042 for more details.
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestScanWithoutFetchingData {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final TableName TABLE_NAME = TableName.valueOf("test");
+
+  private static final byte[] CF = Bytes.toBytes("cf");
+
+  private static final byte[] CQ = Bytes.toBytes("cq");
+
+  private static final int COUNT = 10;
+
+  private static HRegionInfo HRI;
+
+  private static ClientProtos.ClientService.BlockingInterface STUB;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(1);
+    try (Table table = UTIL.createTable(TABLE_NAME, CF)) {
+      for (int i = 0; i < COUNT; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+      }
+    }
+    HRI = UTIL.getAdmin().getTableRegions(TABLE_NAME).get(0);
+    STUB = ((ConnectionImplementation) UTIL.getConnection())
+        .getClient(UTIL.getHBaseCluster().getRegionServer(0).getServerName());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private void assertResult(int row, Result result) {
+    assertEquals(row, Bytes.toInt(result.getRow()));
+    assertEquals(row, Bytes.toInt(result.getValue(CF, CQ)));
+  }
+
+  @Test
+  public void test() throws ServiceException, IOException {
+    Scan scan = new Scan();
+    ScanRequest req = RequestConverter.buildScanRequest(HRI.getRegionName(), scan, 0, false);
+    HBaseRpcController hrc = new HBaseRpcControllerImpl();
+    ScanResponse resp = STUB.scan(hrc, req);
+    assertTrue(resp.getMoreResults());
+    assertTrue(resp.getMoreResultsInRegion());
+    assertEquals(0, ResponseConverter.getResults(hrc.cellScanner(), resp).length);
+    long scannerId = resp.getScannerId();
+    int nextCallSeq = 0;
+    // test normal next
+    for (int i = 0; i < COUNT / 2; i++) {
+      req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1);
+      hrc.reset();
+      resp = STUB.scan(hrc, req);
+      assertTrue(resp.getMoreResults());
+      assertTrue(resp.getMoreResultsInRegion());
+      Result[] results = ResponseConverter.getResults(hrc.cellScanner(), resp);
+      assertEquals(1, results.length);
+      assertResult(i, results[0]);
+    }
+    // test zero next
+    req = RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq++, false, false, -1);
+    hrc.reset();
+    resp = STUB.scan(hrc, req);
+    assertTrue(resp.getMoreResults());
+    assertTrue(resp.getMoreResultsInRegion());
+    assertEquals(0, ResponseConverter.getResults(hrc.cellScanner(), resp).length);
+    for (int i = COUNT / 2; i < COUNT; i++) {
+      req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1);
+      hrc.reset();
+      resp = STUB.scan(hrc, req);
+      assertTrue(resp.getMoreResults());
+      assertEquals(i != COUNT - 1, resp.getMoreResultsInRegion());
+      Result[] results = ResponseConverter.getResults(hrc.cellScanner(), resp);
+      assertEquals(1, results.length);
+      assertResult(i, results[0]);
+    }
+    // close
+    req = RequestConverter.buildScanRequest(scannerId, 0, true, false);
+    resp = STUB.scan(null, req);
+  }
+}