You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/27 20:17:55 UTC
[03/29] hbase git commit: HBASE-18042 Client Compatibility breaks
between versions 1.2 and 1.3
HBASE-18042 Client Compatibility breaks between versions 1.2 and 1.3
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6846b039
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6846b039
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6846b039
Branch: refs/heads/HBASE-14614
Commit: 6846b03944d7e72301b825d4d118732c0ca65577
Parents: efc7edc
Author: zhangduo <zh...@apache.org>
Authored: Thu May 25 11:02:09 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat May 27 17:55:49 2017 +0800
----------------------------------------------------------------------
.../hbase/regionserver/RSRpcServices.java | 58 +++++---
.../hbase/client/TestAlwaysSetScannerId.java | 5 +-
.../hadoop/hbase/client/TestLeaseRenewal.java | 3 +-
.../client/TestScanWithoutFetchingData.java | 131 +++++++++++++++++++
4 files changed, 175 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6846b039/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b3ca94d..1f3fede 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -27,10 +29,20 @@ import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
@@ -194,7 +206,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.ScanMet
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -265,7 +276,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final AtomicLong scannerIdGen = new AtomicLong(0L);
private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
-
+ // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
+ // which may send next or close request to a region scanner which has already been exhausted. The
+ // entries will be removed automatically after scannerLeaseTimeoutPeriod.
+ private final Cache<String, String> closedScanners;
/**
* The lease timeout period for client scanners (milliseconds).
*/
@@ -1168,6 +1182,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
rpcServer.setErrorHandler(this);
rs.setName(name);
+
+ closedScanners = CacheBuilder.newBuilder()
+ .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
}
@Override
@@ -2790,18 +2807,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
String scannerName = Long.toString(request.getScannerId());
RegionScannerHolder rsh = scanners.get(scannerName);
if (rsh == null) {
- // just ignore the close request if scanner does not exists.
- if (request.hasCloseScanner() && request.getCloseScanner()) {
+ // just ignore the next or close request if scanner does not exists.
+ if (closedScanners.getIfPresent(scannerName) != null) {
throw SCANNER_ALREADY_CLOSED;
} else {
LOG.warn("Client tried to access missing scanner " + scannerName);
throw new UnknownScannerException(
- "Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
- + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
- + "long wait between consecutive client checkins, c) Server may be closing down, "
- + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
- + "possible fix would be increasing the value of"
- + "'hbase.client.scanner.timeout.period' configuration.");
+ "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " +
+ "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " +
+ "long wait between consecutive client checkins, c) Server may be closing down, " +
+ "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " +
+ "possible fix would be increasing the value of" +
+ "'hbase.client.scanner.timeout.period' configuration.");
}
}
HRegionInfo hri = rsh.s.getRegionInfo();
@@ -3061,13 +3078,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
values.clear();
}
- if (limitReached || moreRows) {
- // We stopped prematurely
- builder.setMoreResultsInRegion(true);
- } else {
- // We didn't get a single batch
- builder.setMoreResultsInRegion(false);
- }
+ builder.setMoreResultsInRegion(moreRows);
// Check to see if the client requested that we track metrics server side. If the
// client requested metrics, retrieve the metrics from the scanner context.
if (trackMetrics) {
@@ -3238,7 +3249,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!done) {
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
results, builder, lastBlock, context);
+ } else {
+ builder.setMoreResultsInRegion(!results.isEmpty());
}
+ } else {
+ // This is a open scanner call with numberOfRow = 0, so set more results in region to true.
+ builder.setMoreResultsInRegion(true);
}
quota.addScanResult(results);
@@ -3252,6 +3268,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// is false. Can remove the isEmpty check after we get rid of the old implementation.
builder.setMoreResults(false);
}
+ // Later we may close the scanner depending on this flag so here we need to make sure that we
+ // have already set this flag.
+ assert builder.hasMoreResultsInRegion();
// we only set moreResults to false in the above code, so set it to true if we haven't set it
// yet.
if (!builder.hasMoreResults()) {
@@ -3276,7 +3295,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
closeScanner(region, scanner, scannerName, context);
}
return builder.build();
- } catch (Exception e) {
+ } catch (IOException e) {
try {
// scanner is closed here
scannerClosed = true;
@@ -3353,6 +3372,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
+ closedScanners.put(scannerName, scannerName);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6846b039/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java
index d9f226f..9f5f621 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -84,7 +85,7 @@ public class TestAlwaysSetScannerId {
long scannerId = resp.getScannerId();
int nextCallSeq = 0;
// test next
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < COUNT / 2; i++) {
req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1);
resp = STUB.scan(null, req);
assertTrue(resp.hasScannerId());
http://git-wip-us.apache.org/repos/asf/hbase/blob/6846b039/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java
index 5dba207..87d8a6e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java
@@ -123,7 +123,8 @@ public class TestLeaseRenewal {
assertTrue(rs.renewLease());
// make sure we haven't advanced the scanner
assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES));
- assertTrue(rs.renewLease());
+ // renewLease should return false now as we have read all the data already
+ assertFalse(rs.renewLease());
// make sure scanner is exhausted now
assertNull(rs.next());
// renewLease should return false now
http://git-wip-us.apache.org/repos/asf/hbase/blob/6846b039/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java
new file mode 100644
index 0000000..9edcf20
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase to make sure that we do not close scanners if ScanRequest.numberOfRows is zero. See
+ * HBASE-18042 for more details.
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestScanWithoutFetchingData {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final TableName TABLE_NAME = TableName.valueOf("test");
+
+ private static final byte[] CF = Bytes.toBytes("cf");
+
+ private static final byte[] CQ = Bytes.toBytes("cq");
+
+ private static final int COUNT = 10;
+
+ private static HRegionInfo HRI;
+
+ private static ClientProtos.ClientService.BlockingInterface STUB;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniCluster(1);
+ try (Table table = UTIL.createTable(TABLE_NAME, CF)) {
+ for (int i = 0; i < COUNT; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ HRI = UTIL.getAdmin().getTableRegions(TABLE_NAME).get(0);
+ STUB = ((ConnectionImplementation) UTIL.getConnection())
+ .getClient(UTIL.getHBaseCluster().getRegionServer(0).getServerName());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ private void assertResult(int row, Result result) {
+ assertEquals(row, Bytes.toInt(result.getRow()));
+ assertEquals(row, Bytes.toInt(result.getValue(CF, CQ)));
+ }
+
+ @Test
+ public void test() throws ServiceException, IOException {
+ Scan scan = new Scan();
+ ScanRequest req = RequestConverter.buildScanRequest(HRI.getRegionName(), scan, 0, false);
+ HBaseRpcController hrc = new HBaseRpcControllerImpl();
+ ScanResponse resp = STUB.scan(hrc, req);
+ assertTrue(resp.getMoreResults());
+ assertTrue(resp.getMoreResultsInRegion());
+ assertEquals(0, ResponseConverter.getResults(hrc.cellScanner(), resp).length);
+ long scannerId = resp.getScannerId();
+ int nextCallSeq = 0;
+ // test normal next
+ for (int i = 0; i < COUNT / 2; i++) {
+ req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1);
+ hrc.reset();
+ resp = STUB.scan(hrc, req);
+ assertTrue(resp.getMoreResults());
+ assertTrue(resp.getMoreResultsInRegion());
+ Result[] results = ResponseConverter.getResults(hrc.cellScanner(), resp);
+ assertEquals(1, results.length);
+ assertResult(i, results[0]);
+ }
+ // test zero next
+ req = RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq++, false, false, -1);
+ hrc.reset();
+ resp = STUB.scan(hrc, req);
+ assertTrue(resp.getMoreResults());
+ assertTrue(resp.getMoreResultsInRegion());
+ assertEquals(0, ResponseConverter.getResults(hrc.cellScanner(), resp).length);
+ for (int i = COUNT / 2; i < COUNT; i++) {
+ req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1);
+ hrc.reset();
+ resp = STUB.scan(hrc, req);
+ assertTrue(resp.getMoreResults());
+ assertEquals(i != COUNT - 1, resp.getMoreResultsInRegion());
+ Result[] results = ResponseConverter.getResults(hrc.cellScanner(), resp);
+ assertEquals(1, results.length);
+ assertResult(i, results[0]);
+ }
+ // close
+ req = RequestConverter.buildScanRequest(scannerId, 0, true, false);
+ resp = STUB.scan(null, req);
+ }
+}