You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2018/08/04 02:25:39 UTC
hbase git commit: HBASE-20896 Port HBASE-20866 to branch-1 and
branch-1.4
Repository: hbase
Updated Branches:
refs/heads/branch-1 0298c06b4 -> 3bc4bc48f
HBASE-20896 Port HBASE-20866 to branch-1 and branch-1.4
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Reid Chan <re...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3bc4bc48
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3bc4bc48
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3bc4bc48
Branch: refs/heads/branch-1
Commit: 3bc4bc48f6c5fbf01a9d3e20def78d1f77cb641b
Parents: 0298c06
Author: Vikas Vishwakarma <vv...@salesforce.com>
Authored: Fri Aug 3 10:17:23 2018 +0530
Committer: Reid Chan <re...@apache.org>
Committed: Sat Aug 4 10:24:18 2018 +0800
----------------------------------------------------------------------
.../client/AllowPartialScanResultCache.java | 36 ++++----
.../hbase/client/BatchScanResultCache.java | 37 ++++----
.../hadoop/hbase/client/ClientScanner.java | 22 ++---
.../hbase/client/CompleteScanResultCache.java | 63 +++++++-------
.../hadoop/hbase/client/ConnectionUtils.java | 9 +-
.../hadoop/hbase/client/ScanResultCache.java | 83 ++++++++++++++++--
.../client/TestAllowPartialScanResultCache.java | 26 +++---
.../hbase/client/TestBatchScanResultCache.java | 31 ++++---
.../TestCompleteResultScanResultCache.java | 88 ++++++++++++++------
9 files changed, 255 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3bc4bc48/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
index 5b6c411..13e4687 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
* that skips the cells that have already been returned.
*/
@InterfaceAudience.Private
-class AllowPartialScanResultCache implements ScanResultCache {
+class AllowPartialScanResultCache extends ScanResultCache {
// used to filter out the cells that already returned to user as we always start from the
// beginning of a row when retry.
@@ -41,7 +41,9 @@ class AllowPartialScanResultCache implements ScanResultCache {
private boolean lastResultPartial;
- private int numberOfCompleteRows;
+ public AllowPartialScanResultCache(List<Result> cache) {
+ super(cache);
+ }
private void recordLastResult(Result result) {
lastCell = result.rawCells()[result.rawCells().length - 1];
@@ -49,14 +51,14 @@ class AllowPartialScanResultCache implements ScanResultCache {
}
@Override
- public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+ public void loadResultsToCache(Result[] results, boolean isHeartbeatMessage) throws IOException {
if (results.length == 0) {
if (!isHeartbeatMessage && lastResultPartial) {
// An empty non heartbeat result indicate that there must be a row change. So if the
// lastResultPartial is true then we need to increase numberOfCompleteRows.
numberOfCompleteRows++;
}
- return EMPTY_RESULT_ARRAY;
+ return;
}
int i;
for (i = 0; i < results.length; i++) {
@@ -67,31 +69,27 @@ class AllowPartialScanResultCache implements ScanResultCache {
}
}
if (i == results.length) {
- return EMPTY_RESULT_ARRAY;
+ return;
}
if (lastResultPartial && !CellUtil.matchingRow(lastCell, results[0].getRow())) {
// there is a row change, so increase numberOfCompleteRows
numberOfCompleteRows++;
}
recordLastResult(results[results.length - 1]);
- if (i > 0) {
- results = Arrays.copyOfRange(results, i, results.length);
- }
- for (Result result : results) {
- if (!result.mayHaveMoreCellsInRow()) {
- numberOfCompleteRows++;
- }
- }
- return results;
+ addResultArrayToCache(results, i, results.length);
}
@Override
- public void clear() {
- // we do not cache anything
+ protected void checkUpdateNumberOfCompleteRowsAndCache(Result rs) {
+ if (!rs.mayHaveMoreCellsInRow()) {
+ numberOfCompleteRows++;
+ }
+ addResultToCache(rs);
}
@Override
- public int numberOfCompleteRows() {
- return numberOfCompleteRows;
+ public void clear() {
+ // we do not cache anything
+ super.clear();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3bc4bc48/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
index 293f411..e9ba9b6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
@@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
import java.io.IOException;
import java.util.ArrayDeque;
-import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
@@ -38,7 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* doesn't mean setAllowPartialResult(true).
*/
@InterfaceAudience.Private
-public class BatchScanResultCache implements ScanResultCache {
+public class BatchScanResultCache extends ScanResultCache {
private final int batch;
@@ -52,9 +51,8 @@ public class BatchScanResultCache implements ScanResultCache {
private int numCellsOfPartialResults;
- private int numberOfCompleteRows;
-
- public BatchScanResultCache(int batch) {
+ public BatchScanResultCache(List<Result> cache, int batch) {
+ super(cache);
this.batch = batch;
}
@@ -109,11 +107,12 @@ public class BatchScanResultCache implements ScanResultCache {
}
@Override
- public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+ public void loadResultsToCache(Result[] results, boolean isHeartbeatMessage) throws IOException {
if (results.length == 0) {
if (!isHeartbeatMessage) {
if (!partialResults.isEmpty()) {
- return new Result[] { createCompletedResult() };
+ checkUpdateNumberOfCompleteRowsAndCache(createCompletedResult());
+ return;
}
if (lastResultPartial) {
// An empty non heartbeat result indicate that there must be a row change. So if the
@@ -121,9 +120,8 @@ public class BatchScanResultCache implements ScanResultCache {
numberOfCompleteRows++;
}
}
- return EMPTY_RESULT_ARRAY;
+ return;
}
- List<Result> regroupedResults = new ArrayList<>();
for (Result result : results) {
result = filterCells(result, lastCell);
if (result == null) {
@@ -132,7 +130,7 @@ public class BatchScanResultCache implements ScanResultCache {
if (!partialResults.isEmpty()) {
if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
// there is a row change
- regroupedResults.add(createCompletedResult());
+ checkUpdateNumberOfCompleteRowsAndCache(createCompletedResult());
}
} else if (lastResultPartial && !CellUtil.matchingRow(lastCell, result.getRow())) {
// As for batched scan we may return partial results to user if we reach the batch limit, so
@@ -143,33 +141,34 @@ public class BatchScanResultCache implements ScanResultCache {
// check if we have a row change
if (!partialResults.isEmpty() &&
!Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
- regroupedResults.add(createCompletedResult());
+ checkUpdateNumberOfCompleteRowsAndCache(createCompletedResult());
}
Result regroupedResult = regroupResults(result);
if (regroupedResult != null) {
if (!regroupedResult.mayHaveMoreCellsInRow()) {
numberOfCompleteRows++;
}
- regroupedResults.add(regroupedResult);
+ checkUpdateNumberOfCompleteRowsAndCache(regroupedResult);
// only update last cell when we actually return it to user.
recordLastResult(regroupedResult);
}
if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
// We are done for this row
- regroupedResults.add(createCompletedResult());
+ checkUpdateNumberOfCompleteRowsAndCache(createCompletedResult());
}
}
- return regroupedResults.toArray(new Result[0]);
}
@Override
- public void clear() {
- partialResults.clear();
- numCellsOfPartialResults = 0;
+ protected void checkUpdateNumberOfCompleteRowsAndCache(Result rs) {
+ // Number of Complete rows already updated
+ addResultToCache(rs);
}
@Override
- public int numberOfCompleteRows() {
- return numberOfCompleteRows;
+ public void clear() {
+ partialResults.clear();
+ numCellsOfPartialResults = 0;
+ super.clear();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3bc4bc48/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index d548901..8665eea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -30,8 +30,6 @@ import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -136,7 +134,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
this.rpcControllerFactory = controllerFactory;
this.conf = conf;
- this.scanResultCache = createScanResultCache(scan);
+ this.scanResultCache = createScanResultCache(scan, cache);
}
protected ClusterConnection getConnection() {
@@ -464,19 +462,15 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows();
- Result[] resultsToAddToCache =
- scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
+ scanResultCache.loadResultsToCache(values, callable.isHeartbeatMessage());
int numberOfCompleteRows =
scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
- if (resultsToAddToCache.length > 0) {
- for (Result rs : resultsToAddToCache) {
- cache.add(rs);
- for (Cell cell : rs.rawCells()) {
- remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
- }
- countdown--;
- this.lastResult = rs;
- }
+ if (scanResultCache.getCount() > 0) {
+ remainingResultSize -= scanResultCache.getResultSize();
+ scanResultCache.resetResultSize();
+ countdown -= scanResultCache.getCount();
+ scanResultCache.resetCount();
+ this.lastResult = scanResultCache.getLastResult();
}
if (scan.getLimit() > 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3bc4bc48/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
index a132642..ff28f61 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -29,45 +28,38 @@ import org.apache.hadoop.hbase.util.Bytes;
* A scan result cache that only returns complete result.
*/
@InterfaceAudience.Private
-class CompleteScanResultCache implements ScanResultCache {
-
- private int numberOfCompleteRows;
+class CompleteScanResultCache extends ScanResultCache {
private final List<Result> partialResults = new ArrayList<>();
+ public CompleteScanResultCache(List<Result> cache) {
+ super(cache);
+ }
+
private Result combine() throws IOException {
Result result = Result.createCompleteResult(partialResults);
partialResults.clear();
return result;
}
- private Result[] prependCombined(Result[] results, int length) throws IOException {
+ private void prependCombinedAndCache(Result[] results, int length) throws IOException {
if (length == 0) {
- return new Result[] { combine() };
+ checkUpdateNumberOfCompleteRowsAndCache(combine());
+ return;
}
// the last part of a partial result may not be marked as partial so here we need to check if
// there is a row change.
- int start;
+ int start = 0;
if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) {
partialResults.add(results[0]);
start = 1;
- length--;
- } else {
- start = 0;
}
- Result[] prependResults = new Result[length + 1];
- prependResults[0] = combine();
- System.arraycopy(results, start, prependResults, 1, length);
- return prependResults;
- }
-
- private Result[] updateNumberOfCompleteResultsAndReturn(Result... results) {
- numberOfCompleteRows += results.length;
- return results;
+ checkUpdateNumberOfCompleteRowsAndCache(combine());
+ addResultArrayToCache(results, start, length);
}
@Override
- public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+ public void loadResultsToCache(Result[] results, boolean isHeartbeatMessage) throws IOException {
// If no results were returned it indicates that either we have the all the partial results
// necessary to construct the complete result or the server had to send a heartbeat message
// to the client to keep the client-server connection alive
@@ -76,9 +68,9 @@ class CompleteScanResultCache implements ScanResultCache {
// and thus there may be more partials server side that still need to be added to the partial
// list before we form the complete Result
if (!partialResults.isEmpty() && !isHeartbeatMessage) {
- return updateNumberOfCompleteResultsAndReturn(combine());
+ checkUpdateNumberOfCompleteRowsAndCache(combine());
}
- return EMPTY_RESULT_ARRAY;
+ return;
}
// In every RPC response there should be at most a single partial result. Furthermore, if
// there is a partial result, it is guaranteed to be in the last position of the array.
@@ -86,37 +78,42 @@ class CompleteScanResultCache implements ScanResultCache {
if (last.mayHaveMoreCellsInRow()) {
if (partialResults.isEmpty()) {
partialResults.add(last);
- return updateNumberOfCompleteResultsAndReturn(Arrays.copyOf(results, results.length - 1));
+ addResultArrayToCache(results, 0, results.length - 1);
+ return;
}
// We have only one result and it is partial
if (results.length == 1) {
// check if there is a row change
if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) {
partialResults.add(last);
- return EMPTY_RESULT_ARRAY;
+ return;
}
Result completeResult = combine();
partialResults.add(last);
- return updateNumberOfCompleteResultsAndReturn(completeResult);
+ checkUpdateNumberOfCompleteRowsAndCache(completeResult);
+ return;
}
// We have some complete results
- Result[] resultsToReturn = prependCombined(results, results.length - 1);
+ prependCombinedAndCache(results, results.length - 1);
partialResults.add(last);
- return updateNumberOfCompleteResultsAndReturn(resultsToReturn);
+ return;
}
if (!partialResults.isEmpty()) {
- return updateNumberOfCompleteResultsAndReturn(prependCombined(results, results.length));
+ prependCombinedAndCache(results, results.length);
+ return;
}
- return updateNumberOfCompleteResultsAndReturn(results);
+ addResultArrayToCache(results, 0, results.length);
}
@Override
- public void clear() {
- partialResults.clear();
+ protected void checkUpdateNumberOfCompleteRowsAndCache(Result rs) {
+ numberOfCompleteRows++;
+ addResultToCache(rs);
}
@Override
- public int numberOfCompleteRows() {
- return numberOfCompleteRows;
+ public void clear() {
+ partialResults.clear();
+ super.clear();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3bc4bc48/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 96e7788..ba3e8d9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@@ -288,13 +289,13 @@ public class ConnectionUtils {
return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0;
}
- public static ScanResultCache createScanResultCache(Scan scan) {
+ public static ScanResultCache createScanResultCache(Scan scan, List<Result> cache) {
if (scan.getAllowPartialResults()) {
- return new AllowPartialScanResultCache();
+ return new AllowPartialScanResultCache(cache);
} else if (scan.getBatch() > 0) {
- return new BatchScanResultCache(scan.getBatch());
+ return new BatchScanResultCache(cache, scan.getBatch());
} else {
- return new CompleteScanResultCache();
+ return new CompleteScanResultCache(cache);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3bc4bc48/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
index 2d28e1a..85b85c9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
@@ -18,7 +18,10 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
@@ -33,26 +36,94 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
* </ol>
*/
@InterfaceAudience.Private
-interface ScanResultCache {
+public abstract class ScanResultCache {
static final Result[] EMPTY_RESULT_ARRAY = new Result[0];
+ int numberOfCompleteRows;
+ long resultSize = 0;
+ int count = 0;
+ Result lastResult = null;
+ List<Result> cache;
+
+ ScanResultCache(List<Result> cache) {
+ this.cache = cache;
+ }
/**
- * Add the given results to cache and get valid results back.
+ * Process the results from the server and load it to cache.
* @param results the results of a scan next. Must not be null.
* @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response.
- * @return valid results, never null.
*/
- Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException;
+ abstract void loadResultsToCache(Result[] results, boolean isHeartbeatMessage)
+ throws IOException;
/**
* Clear the cached result if any. Called when scan error and we will start from a start of a row
* again.
*/
- void clear();
+ void clear() {
+ resetCount();
+ resetResultSize();
+ lastResult = null;
+ }
/**
* Return the number of complete rows. Used to implement limited scan.
*/
- int numberOfCompleteRows();
+ int numberOfCompleteRows() {
+ return numberOfCompleteRows;
+ }
+
+ /**
+ * Add result array received from server to cache
+ * @param resultsToAddToCache The array of Results returned from the server
+ * @param start start index to cache from Results array
+ * @param end last index to cache from Results array
+ */
+ void addResultArrayToCache(Result[] resultsToAddToCache, int start, int end) {
+ if (resultsToAddToCache != null) {
+ for (int r = start; r < end; r++) {
+ checkUpdateNumberOfCompleteRowsAndCache(resultsToAddToCache[r]);
+ }
+ }
+ }
+
+ /**
+ * Check and update number of complete rows and add result to cache
+ * @param rs Result to cache from Results array or constructed from partial results
+ */
+ abstract void checkUpdateNumberOfCompleteRowsAndCache(Result rs);
+
+ /**
+ * Add the result received from server or result constructed from partials to cache
+ * @param rs Result to cache from Results array or constructed from partial results
+ */
+ void addResultToCache(Result rs) {
+ cache.add(rs);
+ for (Cell cell : rs.rawCells()) {
+ resultSize += CellUtil.estimatedHeapSizeOf(cell);
+ }
+ count++;
+ lastResult = rs;
+ }
+
+ long getResultSize() {
+ return resultSize;
+ }
+
+ int getCount() {
+ return count;
+ }
+
+ void resetResultSize() {
+ resultSize = 0;
+ }
+
+ void resetCount() {
+ count = 0;
+ }
+
+ Result getLastResult() {
+ return lastResult;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3bc4bc48/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
index 3fe43a5..6938983 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
import java.io.IOException;
import java.util.Arrays;
+import java.util.LinkedList;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -39,10 +39,11 @@ public class TestAllowPartialScanResultCache {
private static byte[] CF = Bytes.toBytes("cf");
private AllowPartialScanResultCache resultCache;
+ private static final LinkedList<Result> cache = new LinkedList<Result>();
@Before
public void setUp() {
- resultCache = new AllowPartialScanResultCache();
+ resultCache = new AllowPartialScanResultCache(cache);
}
@After
@@ -53,16 +54,17 @@ public class TestAllowPartialScanResultCache {
@Test
public void test() throws IOException {
- assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
- resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
- assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
- resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+ resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+ assertEquals(0, cache.size());
+ resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, true);
+ assertEquals(0, cache.size());
Cell[] cells1 = createCells(CF, 1, 10);
Cell[] cells2 = createCells(CF, 2, 10);
- Result[] results1 = resultCache.addAndGet(
+ resultCache.loadResultsToCache(
new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false);
+ Result[] results1 = cache.toArray(new Result[0]);
assertEquals(1, results1.length);
assertEquals(1, Bytes.toInt(results1[0].getRow()));
assertEquals(5, results1[0].rawCells().length);
@@ -70,8 +72,10 @@ public class TestAllowPartialScanResultCache {
assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i))));
}
- Result[] results2 = resultCache.addAndGet(
+ cache.clear();
+ resultCache.loadResultsToCache(
new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false);
+ Result[] results2 = cache.toArray(new Result[0]);
assertEquals(1, results2.length);
assertEquals(1, Bytes.toInt(results2[0].getRow()));
assertEquals(5, results2[0].rawCells().length);
@@ -79,8 +83,10 @@ public class TestAllowPartialScanResultCache {
assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i))));
}
- Result[] results3 =
- resultCache.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
+ cache.clear();
+ resultCache.loadResultsToCache(
+ new Result[] { Result.create(cells1), Result.create(cells2) }, false);
+ Result[] results3 = cache.toArray(new Result[0]);
assertEquals(1, results3.length);
assertEquals(2, Bytes.toInt(results3[0].getRow()));
assertEquals(10, results3[0].rawCells().length);
http://git-wip-us.apache.org/repos/asf/hbase/blob/3bc4bc48/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
index 31a4594..fdeca31 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
import java.io.IOException;
import java.util.Arrays;
+import java.util.LinkedList;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
@@ -40,9 +40,11 @@ public class TestBatchScanResultCache {
private BatchScanResultCache resultCache;
+ private final LinkedList<Result> cache = new LinkedList<Result>();
+
@Before
public void setUp() {
- resultCache = new BatchScanResultCache(4);
+ resultCache = new BatchScanResultCache(cache, 4);
}
@After
@@ -73,28 +75,34 @@ public class TestBatchScanResultCache {
@Test
public void test() throws IOException {
- assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
- resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
- assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
- resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+ resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+ assertEquals(0, cache.size());
+ resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, true);
+ assertEquals(0, cache.size());
Cell[] cells1 = createCells(CF, 1, 10);
Cell[] cells2 = createCells(CF, 2, 10);
Cell[] cells3 = createCells(CF, 3, 10);
- assertEquals(0, resultCache.addAndGet(
- new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false).length);
- Result[] results = resultCache.addAndGet(
+ resultCache.loadResultsToCache(
+ new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false);
+ assertEquals(0, cache.size());
+ resultCache.loadResultsToCache(
new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, false, true),
Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) },
false);
+ Result[] results = cache.toArray(new Result[0]);
assertEquals(2, results.length);
assertResultEquals(results[0], 1, 0, 4);
assertResultEquals(results[1], 1, 4, 8);
- results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+
+ cache.clear();
+ resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+ results = cache.toArray(new Result[0]);
assertEquals(1, results.length);
assertResultEquals(results[0], 1, 8, 10);
- results = resultCache.addAndGet(
+ cache.clear();
+ resultCache.loadResultsToCache(
new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, false, true),
Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true),
Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true),
@@ -102,6 +110,7 @@ public class TestBatchScanResultCache {
Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true),
Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) },
false);
+ results = cache.toArray(new Result[0]);
assertEquals(6, results.length);
assertResultEquals(results[0], 2, 0, 4);
assertResultEquals(results[1], 2, 4, 8);
http://git-wip-us.apache.org/repos/asf/hbase/blob/3bc4bc48/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
index 8759593..e2bd92c 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame;
import java.io.IOException;
import java.util.Arrays;
+import java.util.LinkedList;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
@@ -46,9 +47,11 @@ public class TestCompleteResultScanResultCache {
private CompleteScanResultCache resultCache;
+ private final LinkedList<Result> cache = new LinkedList<Result>();
+
@Before
public void setUp() {
- resultCache = new CompleteScanResultCache();
+ resultCache = new CompleteScanResultCache(cache);
}
@After
@@ -63,33 +66,50 @@ public class TestCompleteResultScanResultCache {
@Test
public void testNoPartial() throws IOException {
- assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
- resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
- assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
- resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+ cache.clear();
+
+ resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+ assertEquals(0, cache.size());
+ resultCache.loadResultsToCache(ScanResultCache.EMPTY_RESULT_ARRAY, true);
+ assertEquals(0, cache.size());
+
int count = 10;
Result[] results = new Result[count];
for (int i = 0; i < count; i++) {
results[i] = Result.create(Arrays.asList(createCell(i, CQ1)));
}
- assertSame(results, resultCache.addAndGet(results, false));
+ resultCache.loadResultsToCache(results, false);
+ results = cache.toArray(new Result[0]);
+ assertEquals(count, results.length);
+ for (int i = 0; i < count; i++) {
+ assertEquals(i, Bytes.toInt(results[i].getRow()));
+ assertEquals(i, Bytes.toInt(results[i].getValue(CF, CQ1)));
+ }
}
@Test
public void testCombine1() throws IOException {
+ cache.clear();
+
Result previousResult = Result.create(Arrays.asList(createCell(0, CQ1)), null, false, true);
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
- Result[] results = resultCache.addAndGet(new Result[] { previousResult, result1 }, false);
+ resultCache.loadResultsToCache(new Result[] { previousResult, result1 }, false);
+ Result[] results = cache.toArray(new Result[0]);
assertEquals(1, results.length);
assertSame(previousResult, results[0]);
- assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
- assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
- assertEquals(0, resultCache.addAndGet(new Result[0], true).length);
+ cache.clear();
+ resultCache.loadResultsToCache(new Result[] { result2 }, false);
+ assertEquals(0, cache.size());
+ resultCache.loadResultsToCache(new Result[] { result3 }, false);
+ assertEquals(0, cache.size());
+ resultCache.loadResultsToCache(new Result[0], true);
+ assertEquals(0, cache.size());
- results = resultCache.addAndGet(new Result[0], false);
+ resultCache.loadResultsToCache(new Result[0], false);
+ results = cache.toArray(new Result[0]);
assertEquals(1, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(3, results[0].rawCells().length);
@@ -100,17 +120,23 @@ public class TestCompleteResultScanResultCache {
@Test
public void testCombine2() throws IOException {
+ cache.clear();
+
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ2)), null, false, false);
- assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
- assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
- assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
+ resultCache.loadResultsToCache(new Result[] { result1 }, false);
+ assertEquals(0, cache.size());
+ resultCache.loadResultsToCache(new Result[] { result2 }, false);
+ assertEquals(0, cache.size());
+ resultCache.loadResultsToCache(new Result[] { result3 }, false);
+ assertEquals(0, cache.size());
- Result[] results = resultCache.addAndGet(new Result[] { nextResult1 }, false);
+ resultCache.loadResultsToCache(new Result[] { nextResult1 }, false);
+ Result[] results = cache.toArray(new Result[0]);
assertEquals(1, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(3, results[0].rawCells().length);
@@ -118,7 +144,9 @@ public class TestCompleteResultScanResultCache {
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3)));
- results = resultCache.addAndGet(new Result[] { nextToNextResult1 }, false);
+ cache.clear();
+ resultCache.loadResultsToCache(new Result[] { nextToNextResult1 }, false);
+ results = cache.toArray(new Result[0]);
assertEquals(2, results.length);
assertEquals(2, Bytes.toInt(results[0].getRow()));
assertEquals(1, results[0].rawCells().length);
@@ -130,16 +158,20 @@ public class TestCompleteResultScanResultCache {
@Test
public void testCombine3() throws IOException {
+ cache.clear();
+
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, false);
Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ1)), null, false, true);
- assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
- assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
+ resultCache.loadResultsToCache(new Result[] { result1 }, false);
+ assertEquals(0, cache.size());
+ resultCache.loadResultsToCache(new Result[] { result2 }, false);
+ assertEquals(0, cache.size());
- Result[] results =
- resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, false);
+ resultCache.loadResultsToCache(new Result[] { nextResult1, nextToNextResult1 }, false);
+ Result[] results = cache.toArray(new Result[0]);
assertEquals(2, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(2, results[0].rawCells().length);
@@ -149,7 +181,9 @@ public class TestCompleteResultScanResultCache {
assertEquals(1, results[1].rawCells().length);
assertEquals(2, Bytes.toInt(results[1].getValue(CF, CQ1)));
- results = resultCache.addAndGet(new Result[0], false);
+ cache.clear();
+ resultCache.loadResultsToCache(new Result[0], false);
+ results = cache.toArray(new Result[0]);
assertEquals(1, results.length);
assertEquals(3, Bytes.toInt(results[0].getRow()));
assertEquals(1, results[0].rawCells().length);
@@ -158,21 +192,27 @@ public class TestCompleteResultScanResultCache {
@Test
public void testCombine4() throws IOException {
+ cache.clear();
+
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, false);
Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
Result nextResult2 = Result.create(Arrays.asList(createCell(2, CQ2)), null, false, false);
- assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
+ resultCache.loadResultsToCache(new Result[] { result1 }, false);
+ assertEquals(0, cache.size());
- Result[] results = resultCache.addAndGet(new Result[] { result2, nextResult1 }, false);
+ resultCache.loadResultsToCache(new Result[] { result2, nextResult1 }, false);
+ Result[] results = cache.toArray(new Result[0]);
assertEquals(1, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(2, results[0].rawCells().length);
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
- results = resultCache.addAndGet(new Result[] { nextResult2 }, false);
+ cache.clear();
+ resultCache.loadResultsToCache(new Result[] { nextResult2 }, false);
+ results = cache.toArray(new Result[0]);
assertEquals(1, results.length);
assertEquals(2, Bytes.toInt(results[0].getRow()));
assertEquals(2, results[0].rawCells().length);