You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/03/17 19:03:32 UTC
hbase git commit: HBASE-15325 ResultScanner allowing partial result
will miss the rest of the row if the region is moved between two rpc requests
(Phil Yang)
Repository: hbase
Updated Branches:
refs/heads/master 2b8a7f8d7 -> fbf58f330
HBASE-15325 ResultScanner allowing partial result will miss the rest of the row if the region is moved between two rpc requests (Phil Yang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fbf58f33
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fbf58f33
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fbf58f33
Branch: refs/heads/master
Commit: fbf58f330b7affa633513fd03076954f0d90c2fc
Parents: 2b8a7f8
Author: tedyu <yu...@gmail.com>
Authored: Thu Mar 17 11:03:29 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Thu Mar 17 11:03:29 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/ClientScanner.java | 86 +++++++-
.../org/apache/hadoop/hbase/CellComparator.java | 2 +-
.../hbase/TestPartialResultsFromClientSide.java | 217 ++++++++++++++++++-
3 files changed, 297 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbf58f33/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 22a56e3..3b6b83a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -77,6 +78,10 @@ public abstract class ClientScanner extends AbstractClientScanner {
* via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
*/
protected byte[] partialResultsRow = null;
+ /**
+ * The last cell from a not full Row which is added to cache
+ */
+ protected Cell lastCellLoadedToCache = null;
protected final int caching;
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
@@ -389,7 +394,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
// We don't expect that the server will have more results for us if
// it doesn't tell us otherwise. We rely on the size or count of results
boolean serverHasMoreResults = false;
+ boolean allResultsSkipped = false;
do {
+ allResultsSkipped = false;
try {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
@@ -448,10 +455,15 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Reset the startRow to the row we've seen last so that the new scanner starts at
// the correct row. Otherwise we may see previously returned rows again.
// (ScannerCallable by now has "relocated" the correct region)
- if (scan.isReversed()) {
- scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
+ if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
+ if (scan.isReversed()) {
+ scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
+ } else {
+ scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
+ }
} else {
- scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
+ // we need rescan this row because we only loaded partial row before
+ scan.setStartRow(lastResult.getRow());
}
}
if (e instanceof OutOfOrderScannerNextException) {
@@ -483,12 +495,27 @@ public abstract class ClientScanner extends AbstractClientScanner {
getResultsToAddToCache(values, callable.isHeartbeatMessage());
if (!resultsToAddToCache.isEmpty()) {
for (Result rs : resultsToAddToCache) {
+ rs = filterLoadedCell(rs);
+ if (rs == null) {
+ continue;
+ }
+
cache.add(rs);
long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
countdown--;
remainingResultSize -= estimatedHeapSizeOfResult;
addEstimatedSize(estimatedHeapSizeOfResult);
this.lastResult = rs;
+ if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
+ updateLastCellLoadedToCache(this.lastResult);
+ } else {
+ this.lastCellLoadedToCache = null;
+ }
+ }
+ if (cache.isEmpty()) {
+ // all result has been seen before, we need scan more.
+ allResultsSkipped = true;
+ continue;
}
}
if (callable.isHeartbeatMessage()) {
@@ -519,7 +546,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// !partialResults.isEmpty() means that we are still accumulating partial Results for a
// row. We should not change scanners before we receive all the partial Results for that
// row.
- } while ((callable != null && callable.isHeartbeatMessage())
+ } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
|| (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
}
@@ -783,4 +810,55 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
return false;
}
+
+ protected void updateLastCellLoadedToCache(Result result) {
+ if (result.rawCells().length == 0) {
+ return;
+ }
+ this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
+ }
+
+ /**
+ * Compare two Cells considering reversed scanner.
+ * ReversedScanner only reverses rows, not columns.
+ */
+ private int compare(Cell a, Cell b) {
+ CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ?
+ CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
+ int r = comparator.compareRows(a, b);
+ if (r != 0) {
+ return this.scan.isReversed() ? -r : r;
+ }
+ return CellComparator.compareWithoutRow(a, b);
+ }
+
+ private Result filterLoadedCell(Result result) {
+ // we only filter result when last result is partial
+ // so lastCellLoadedToCache and result should have same row key.
+ // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
+ // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
+ if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
+ return result;
+ }
+ if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
+ // The first cell of this result is larger than the last cell of loadcache.
+ // If user do not allow partial result, it must be true.
+ return result;
+ }
+ if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
+ // The last cell of this result is smaller than the last cell of loadcache, skip all.
+ return null;
+ }
+
+ // The first one must not in filtered result, we start at the second.
+ int index = 1;
+ while (index < result.rawCells().length) {
+ if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
+ break;
+ }
+ index++;
+ }
+ Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
+ return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbf58f33/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
index d869b3e..a5e26cf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
@@ -378,7 +378,7 @@ public class CellComparator implements Comparator<Cell>, Serializable {
roffset, rlength);
}
- private static int compareWithoutRow(final Cell left, final Cell right) {
+ public static int compareWithoutRow(final Cell left, final Cell right) {
// If the column is not specified, the "minimum" key type appears the
// latest in the sorted order, regardless of the timestamp. This is used
// for specifying the last key/value in a given row, because there is no
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbf58f33/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index a6f8373..c6a2525 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -65,7 +67,7 @@ public class TestPartialResultsFromClientSide {
private static final Log LOG = LogFactory.getLog(TestPartialResultsFromClientSide.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
+ private final static int MINICLUSTER_SIZE = 5;
private static Table TABLE = null;
/**
@@ -99,7 +101,8 @@ public class TestPartialResultsFromClientSide {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.startMiniCluster(MINICLUSTER_SIZE);
+ TEST_UTIL.getAdmin().setBalancerRunning(false, true);
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
}
@@ -430,7 +433,7 @@ public class TestPartialResultsFromClientSide {
}
/**
- * Test the method {@link Result#createCompleteResult(List, Result)}
+ * Test the method {@link Result#createCompleteResult(List)}
* @throws Exception
*/
@Test
@@ -829,4 +832,212 @@ public class TestPartialResultsFromClientSide {
testEquivalenceOfScanResults(TABLE, partialScan, oneshotScan);
}
}
+
+ private void moveRegion(Table table, int index) throws IOException{
+ List<Pair<HRegionInfo, ServerName>> regions = MetaTableAccessor
+ .getTableRegionsAndLocations(TEST_UTIL.getConnection(),
+ table.getName());
+ assertEquals(1, regions.size());
+ HRegionInfo regionInfo = regions.get(0).getFirst();
+ ServerName name = TEST_UTIL.getHBaseCluster().getRegionServer(index).getServerName();
+ TEST_UTIL.getAdmin().move(regionInfo.getEncodedNameAsBytes(),
+ Bytes.toBytes(name.getServerName()));
+ }
+
+ private void assertCell(Cell cell, byte[] row, byte[] cf, byte[] cq) {
+ assertArrayEquals(row,
+ Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ assertArrayEquals(cf,
+ Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
+ assertArrayEquals(cq,
+ Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
+ }
+
+ @Test
+ public void testPartialResultWhenRegionMove() throws IOException {
+ Table table=createTestTable(TableName.valueOf("testPartialResultWhenRegionMove"),
+ ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+ moveRegion(table, 1);
+
+ Scan scan = new Scan();
+ scan.setMaxResultSize(1);
+ scan.setAllowPartialResults(true);
+ ResultScanner scanner = table.getScanner(scan);
+ for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) {
+ scanner.next();
+ }
+ Result result1 = scanner.next();
+ assertEquals(1, result1.rawCells().length);
+ Cell c1 = result1.rawCells()[0];
+ assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
+ assertFalse(result1.isPartial());
+
+ moveRegion(table, 2);
+
+ Result result2 = scanner.next();
+ assertEquals(1, result2.rawCells().length);
+ Cell c2 = result2.rawCells()[0];
+ assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
+ assertTrue(result2.isPartial());
+
+ moveRegion(table, 3);
+
+ Result result3 = scanner.next();
+ assertEquals(1, result3.rawCells().length);
+ Cell c3 = result3.rawCells()[0];
+ assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]);
+ assertTrue(result3.isPartial());
+
+ }
+
+ @Test
+ public void testReversedPartialResultWhenRegionMove() throws IOException {
+ Table table=createTestTable(TableName.valueOf("testReversedPartialResultWhenRegionMove"),
+ ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+ moveRegion(table, 1);
+
+ Scan scan = new Scan();
+ scan.setMaxResultSize(1);
+ scan.setAllowPartialResults(true);
+ scan.setReversed(true);
+ ResultScanner scanner = table.getScanner(scan);
+ for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS-1; i++) {
+ scanner.next();
+ }
+ Result result1 = scanner.next();
+ assertEquals(1, result1.rawCells().length);
+ Cell c1 = result1.rawCells()[0];
+ assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
+ assertFalse(result1.isPartial());
+
+ moveRegion(table, 2);
+
+ Result result2 = scanner.next();
+ assertEquals(1, result2.rawCells().length);
+ Cell c2 = result2.rawCells()[0];
+ assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
+ assertTrue(result2.isPartial());
+
+ moveRegion(table, 3);
+
+ Result result3 = scanner.next();
+ assertEquals(1, result3.rawCells().length);
+ Cell c3 = result3.rawCells()[0];
+ assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]);
+ assertTrue(result3.isPartial());
+
+ }
+
+ @Test
+ public void testCompleteResultWhenRegionMove() throws IOException {
+ Table table=createTestTable(TableName.valueOf("testCompleteResultWhenRegionMove"),
+ ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+ moveRegion(table, 1);
+
+ Scan scan = new Scan();
+ scan.setMaxResultSize(1);
+ scan.setCaching(1);
+ ResultScanner scanner = table.getScanner(scan);
+
+ Result result1 = scanner.next();
+ assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length);
+ Cell c1 = result1.rawCells()[0];
+ assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]);
+ assertFalse(result1.isPartial());
+
+ moveRegion(table, 2);
+
+ Result result2 = scanner.next();
+ assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length);
+ Cell c2 = result2.rawCells()[0];
+ assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
+ assertFalse(result2.isPartial());
+
+ moveRegion(table, 3);
+
+ Result result3 = scanner.next();
+ assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length);
+ Cell c3 = result3.rawCells()[0];
+ assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]);
+ assertFalse(result3.isPartial());
+
+ }
+
+ @Test
+ public void testReversedCompleteResultWhenRegionMove() throws IOException {
+ Table table=createTestTable(TableName.valueOf("testReversedCompleteResultWhenRegionMove"),
+ ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+ moveRegion(table, 1);
+
+ Scan scan = new Scan();
+ scan.setMaxResultSize(1);
+ scan.setCaching(1);
+ scan.setReversed(true);
+ ResultScanner scanner = table.getScanner(scan);
+
+ Result result1 = scanner.next();
+ assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length);
+ Cell c1 = result1.rawCells()[0];
+ assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]);
+ assertFalse(result1.isPartial());
+
+ moveRegion(table, 2);
+
+ Result result2 = scanner.next();
+ assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length);
+ Cell c2 = result2.rawCells()[0];
+ assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
+ assertFalse(result2.isPartial());
+
+ moveRegion(table, 3);
+
+ Result result3 = scanner.next();
+ assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length);
+ Cell c3 = result3.rawCells()[0];
+ assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]);
+ assertFalse(result3.isPartial());
+
+ }
+
+ @Test
+ public void testBatchingResultWhenRegionMove() throws IOException {
+ Table table =
+ createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, FAMILIES,
+ QUALIFIERS, VALUE);
+
+ moveRegion(table, 1);
+
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ scan.setBatch(1);
+
+ ResultScanner scanner = table.getScanner(scan);
+ for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) {
+ scanner.next();
+ }
+ Result result1 = scanner.next();
+ assertEquals(1, result1.rawCells().length);
+ Cell c1 = result1.rawCells()[0];
+ assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
+
+ moveRegion(table, 2);
+
+ Result result2 = scanner.next();
+ assertEquals(1, result2.rawCells().length);
+ Cell c2 = result2.rawCells()[0];
+ assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
+
+ moveRegion(table, 3);
+
+ Result result3 = scanner.next();
+ assertEquals(1, result3.rawCells().length);
+ Cell c3 = result3.rawCells()[0];
+ assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]);
+ }
+
+
}
\ No newline at end of file