You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/03/17 01:33:22 UTC
hbase git commit: HBASE-17793 Backport ScanResultCache related code
to branch-1
Repository: hbase
Updated Branches:
refs/heads/branch-1 d542b446b -> 6be8d2041
HBASE-17793 Backport ScanResultCache related code to branch-1
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6be8d204
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6be8d204
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6be8d204
Branch: refs/heads/branch-1
Commit: 6be8d2041b9dfb99c77d824776fb39da8984bea1
Parents: d542b44
Author: zhangduo <zh...@apache.org>
Authored: Thu Mar 16 15:25:56 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Mar 17 09:32:44 2017 +0800
----------------------------------------------------------------------
.../client/AllowPartialScanResultCache.java | 73 ++++++
.../hbase/client/BatchScanResultCache.java | 142 +++++++++++
.../hadoop/hbase/client/ClientScanner.java | 254 +------------------
.../hbase/client/CompleteScanResultCache.java | 110 ++++++++
.../hadoop/hbase/client/ConnectionUtils.java | 48 ++++
.../org/apache/hadoop/hbase/client/Result.java | 69 +++--
.../hadoop/hbase/client/ScanResultCache.java | 53 ++++
.../client/TestAllowPartialScanResultCache.java | 91 +++++++
.../hbase/client/TestBatchScanResultCache.java | 113 +++++++++
.../TestCompleteResultScanResultCache.java | 182 +++++++++++++
10 files changed, 858 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
new file mode 100644
index 0000000..82f1ea0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A ScanResultCache that may return partial result.
+ * <p>
+ * As we can only scan from the starting of a row when error, so here we also implement the logic
+ * that skips the cells that have already been returned.
+ */
+@InterfaceAudience.Private
+class AllowPartialScanResultCache implements ScanResultCache {
+
+ // used to filter out the cells that already returned to user as we always start from the
+ // beginning of a row when retry.
+ private Cell lastCell;
+
+ private void updateLastCell(Result result) {
+ lastCell = result.rawCells()[result.rawCells().length - 1];
+ }
+
+ @Override
+ public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+ if (results.length == 0) {
+ return EMPTY_RESULT_ARRAY;
+ }
+ int i;
+ for (i = 0; i < results.length; i++) {
+ Result r = filterCells(results[i], lastCell);
+ if (r != null) {
+ results[i] = r;
+ break;
+ }
+ }
+ if (i == results.length) {
+ return EMPTY_RESULT_ARRAY;
+ }
+ updateLastCell(results[results.length - 1]);
+ if (i > 0) {
+ return Arrays.copyOfRange(results, i, results.length);
+ } else {
+ return results;
+ }
+ }
+
+ @Override
+ public void clear() {
+ // we do not cache anything
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
new file mode 100644
index 0000000..9ab959b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A scan result cache for batched scan, i.e,
+ * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
+ * <p>
+ * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch
+ * doesn't mean setAllowPartialResult(true).
+ */
+@InterfaceAudience.Private
+public class BatchScanResultCache implements ScanResultCache {
+
+ private final int batch;
+
+ // used to filter out the cells that already returned to user as we always start from the
+ // beginning of a row when retry.
+ private Cell lastCell;
+
+ private final Deque<Result> partialResults = new ArrayDeque<>();
+
+ private int numCellsOfPartialResults;
+
+ public BatchScanResultCache(int batch) {
+ this.batch = batch;
+ }
+
+ private void updateLastCell(Result result) {
+ lastCell = result.rawCells()[result.rawCells().length - 1];
+ }
+
+ private Result createCompletedResult() throws IOException {
+ Result result = Result.createCompleteResult(partialResults);
+ partialResults.clear();
+ numCellsOfPartialResults = 0;
+ return result;
+ }
+
+ // Add new result to the partial list and return a batched Result if caching size exceed batching
+ // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only
+ // one Result back at most(or null, which means we do not have enough cells).
+ private Result regroupResults(Result result) {
+ partialResults.addLast(result);
+ numCellsOfPartialResults += result.size();
+ if (numCellsOfPartialResults < batch) {
+ return null;
+ }
+ Cell[] cells = new Cell[batch];
+ int cellCount = 0;
+ boolean stale = false;
+ for (;;) {
+ Result r = partialResults.pollFirst();
+ stale = stale || r.isStale();
+ int newCellCount = cellCount + r.size();
+ if (newCellCount > batch) {
+ // We have more cells than expected, so split the current result
+ int len = batch - cellCount;
+ System.arraycopy(r.rawCells(), 0, cells, cellCount, len);
+ Cell[] remainingCells = new Cell[r.size() - len];
+ System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len);
+ partialResults.addFirst(
+ Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow()));
+ break;
+ }
+ System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size());
+ if (newCellCount == batch) {
+ break;
+ }
+ cellCount = newCellCount;
+ }
+ numCellsOfPartialResults -= batch;
+ return Result.create(cells, null, stale,
+ result.mayHaveMoreCellsInRow() || !partialResults.isEmpty());
+ }
+
+ @Override
+ public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+ if (results.length == 0) {
+ if (!partialResults.isEmpty() && !isHeartbeatMessage) {
+ return new Result[] { createCompletedResult() };
+ }
+ return EMPTY_RESULT_ARRAY;
+ }
+ List<Result> regroupedResults = new ArrayList<>();
+ for (Result result : results) {
+ result = filterCells(result, lastCell);
+ if (result == null) {
+ continue;
+ }
+ // check if we have a row change
+ if (!partialResults.isEmpty() &&
+ !Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
+ regroupedResults.add(createCompletedResult());
+ }
+ Result regroupedResult = regroupResults(result);
+ if (regroupedResult != null) {
+ regroupedResults.add(regroupedResult);
+ // only update last cell when we actually return it to user.
+ updateLastCell(regroupedResult);
+ }
+ if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
+ // We are done for this row
+ regroupedResults.add(createCompletedResult());
+ }
+ }
+ return regroupedResults.toArray(new Result[0]);
+ }
+
+ @Override
+ public void clear() {
+ partialResults.clear();
+ numCellsOfPartialResults = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 01ea993..57586d8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -17,16 +17,15 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
-import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang.mutable.MutableBoolean;
@@ -34,13 +33,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue.MetaComparator;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
@@ -69,23 +66,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected HRegionInfo currentRegion = null;
protected ScannerCallableWithReplicas callable = null;
protected final LinkedList<Result> cache = new LinkedList<Result>();
- /**
- * A list of partial results that have been returned from the server. This list should only
- * contain results if this scanner does not have enough partial results to form the complete
- * result.
- */
- protected final LinkedList<Result> partialResults = new LinkedList<Result>();
- protected int partialResultsCellSizes = 0;
- /**
- * The row for which we are accumulating partial Results (i.e. the row of the Results stored
- * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
- * the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()}
- */
- protected byte[] partialResultsRow = null;
- /**
- * The last cell from a not full Row which is added to cache
- */
- protected Cell lastCellLoadedToCache = null;
+ private final ScanResultCache scanResultCache;
protected final int caching;
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
@@ -106,7 +87,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected final int primaryOperationTimeout;
private int retries;
protected final ExecutorService pool;
- private static MetaComparator metaComparator = new MetaComparator();
/**
* Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
@@ -159,6 +139,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
this.rpcControllerFactory = controllerFactory;
this.conf = conf;
+ this.scanResultCache = createScanResultCache(scan);
}
protected ClusterConnection getConnection() {
@@ -361,14 +342,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
private void closeScannerIfExhausted(boolean exhausted) throws IOException {
if (exhausted) {
- if (!partialResults.isEmpty()) {
- // XXX: continue if there are partial results. But in fact server should not set
- // hasMoreResults to false if there are partial results.
- LOG.warn("Server tells us there is no more results for this region but we still have" +
- " partialResults, this should not happen, retry on the current scanner anyway");
- } else {
- closeScanner();
- }
+ closeScanner();
}
}
@@ -376,7 +350,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
// An exception was thrown which makes any partial results that we were collecting
// invalid. The scanner will need to be reset to the beginning of a row.
- clearPartialResults();
+ scanResultCache.clear();
// Unfortunately, DNRIOE is used in two different semantics.
// (1) The first is to close the client scanner and bubble up the exception all the way
@@ -470,7 +444,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (callable.switchedToADifferentReplica()) {
// Any accumulated partial results are no longer valid since the callable will
// openScanner with the correct startkey and we must pick up from there
- clearPartialResults();
+ scanResultCache.clear();
this.currentRegion = callable.getHRegionInfo();
}
retryAfterOutOfOrderException.setValue(true);
@@ -490,28 +464,20 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
- List<Result> resultsToAddToCache =
- getResultsToAddToCache(values, callable.isHeartbeatMessage());
- if (!resultsToAddToCache.isEmpty()) {
+ Result[] resultsToAddToCache =
+ scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
+ if (resultsToAddToCache.length > 0) {
for (Result rs : resultsToAddToCache) {
- rs = filterLoadedCell(rs);
- if (rs == null) {
- continue;
- }
cache.add(rs);
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
- if (this.lastResult.mayHaveMoreCellsInRow()) {
- updateLastCellLoadedToCache(this.lastResult);
- } else {
- this.lastCellLoadedToCache = null;
- }
}
- if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) {
- int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache);
+ if (scan.getLimit() > 0) {
+ int newLimit =
+ scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache));
assert newLimit >= 0;
scan.setLimit(newLimit);
}
@@ -554,13 +520,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
// we are done with the current region
if (regionExhausted) {
- if (!partialResults.isEmpty()) {
- // XXX: continue if there are partial results. But in fact server should not set
- // hasMoreResults to false if there are partial results.
- LOG.warn("Server tells us there is no more results for this region but we still have" +
- " partialResults, this should not happen, retry on the current scanner anyway");
- continue;
- }
if (!moveToNextRegion()) {
break;
}
@@ -568,141 +527,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
}
- /**
- * This method ensures all of our book keeping regarding partial results is kept up to date. This
- * method should be called once we know that the results we received back from the RPC request do
- * not contain errors. We return a list of results that should be added to the cache. In general,
- * this list will contain all NON-partial results from the input array (unless the client has
- * specified that they are okay with receiving partial results)
- * @param resultsFromServer The array of {@link Result}s returned from the server
- * @param heartbeatMessage Flag indicating whether or not the response received from the server
- * represented a complete response, or a heartbeat message that was sent to keep the
- * client-server connection alive
- * @return the list of results that should be added to the cache.
- * @throws IOException
- */
- protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
- boolean heartbeatMessage) throws IOException {
- int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
- List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
-
- // If the caller has indicated in their scan that they are okay with seeing partial results,
- // then simply add all results to the list. Note allowPartial and setBatch are not same, we can
- // return here if allow partials and we will handle batching later.
- if (scan.getAllowPartialResults()) {
- addResultsToList(resultsToAddToCache, resultsFromServer, 0,
- (null == resultsFromServer ? 0 : resultsFromServer.length));
- return resultsToAddToCache;
- }
-
- // If no results were returned it indicates that either we have the all the partial results
- // necessary to construct the complete result or the server had to send a heartbeat message
- // to the client to keep the client-server connection alive
- if (resultsFromServer == null || resultsFromServer.length == 0) {
- // If this response was an empty heartbeat message, then we have not exhausted the region
- // and thus there may be more partials server side that still need to be added to the partial
- // list before we form the complete Result
- if (!partialResults.isEmpty() && !heartbeatMessage) {
- resultsToAddToCache.add(Result.createCompleteResult(partialResults));
- clearPartialResults();
- }
-
- return resultsToAddToCache;
- }
-
- for(Result result : resultsFromServer) {
- if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) {
- // We have a new row, complete the previous row.
- resultsToAddToCache.add(Result.createCompleteResult(partialResults));
- clearPartialResults();
- }
- Result res = regroupResults(result);
- if (res != null) {
- resultsToAddToCache.add(res);
- }
- if (!result.mayHaveMoreCellsInRow()) {
- // We are done for this row
- if (partialResultsCellSizes > 0) {
- resultsToAddToCache.add(Result.createCompleteResult(partialResults));
- }
- clearPartialResults();
- }
- }
-
- return resultsToAddToCache;
- }
-
- /**
- * Add new result to the partial list and return a batched Result if caching size exceed
- * batching limit.
- * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user.
- * setBatch doesn't mean setAllowPartialResult(true)
- * @param result The result that we want to add to our list of partial Results
- * @return the result if we have batch limit and there is one Result can be returned to user, or
- * null if we have not.
- * @throws IOException
- */
- private Result regroupResults(final Result result) throws IOException {
- partialResultsRow = result.getRow();
- partialResults.add(result);
- partialResultsCellSizes += result.size();
- if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) {
- Cell[] cells = new Cell[scan.getBatch()];
- int count = 0;
- boolean stale = false;
- while (count < scan.getBatch()) {
- Result res = partialResults.poll();
- stale = stale || res.isStale();
- if (res.size() + count <= scan.getBatch()) {
- System.arraycopy(res.rawCells(), 0, cells, count, res.size());
- count += res.size();
- } else {
- int len = scan.getBatch() - count;
- System.arraycopy(res.rawCells(), 0, cells, count, len);
- Cell[] remainingCells = new Cell[res.size() - len];
- System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len);
- Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(),
- res.mayHaveMoreCellsInRow());
- partialResults.addFirst(remainingRes);
- count = scan.getBatch();
- }
- }
- partialResultsCellSizes -= scan.getBatch();
- if (partialResultsCellSizes == 0) {
- // We have nothing in partialResults, clear the flags to prevent returning empty Result
- // when next result belongs to the next row.
- clearPartialResults();
- }
- return Result.create(cells, null, stale,
- partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow());
- }
- return null;
- }
-
- /**
- * Convenience method for clearing the list of partials and resetting the partialResultsRow.
- */
- private void clearPartialResults() {
- partialResults.clear();
- partialResultsCellSizes = 0;
- partialResultsRow = null;
- }
-
- /**
- * Helper method for adding results between the indices [start, end) to the outputList
- * @param outputList the list that results will be added to
- * @param inputArray the array that results are taken from
- * @param start beginning index (inclusive)
- * @param end ending index (exclusive)
- */
- private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
- if (inputArray == null || start < 0 || end > inputArray.length) return;
-
- for (int i = start; i < end; i++) {
- outputList.add(inputArray[i]);
- }
- }
-
@Override
public void close() {
if (!scanMetricsPublished) writeScanMetrics();
@@ -739,58 +563,4 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
return false;
}
-
- protected void updateLastCellLoadedToCache(Result result) {
- if (result.rawCells().length == 0) {
- return;
- }
- this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
- }
-
- /**
- * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not
- * columns.
- */
- private int compare(Cell a, Cell b) {
- int r = 0;
- if (currentRegion != null && currentRegion.isMetaRegion()) {
- r = metaComparator.compareRows(a, b);
- } else {
- r = CellComparator.compareRows(a, b);
- }
- if (r != 0) {
- return this.scan.isReversed() ? -r : r;
- }
- return CellComparator.compareWithoutRow(a, b);
- }
-
- private Result filterLoadedCell(Result result) {
- // we only filter result when last result is partial
- // so lastCellLoadedToCache and result should have same row key.
- // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
- // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
- if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
- return result;
- }
- if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
- // The first cell of this result is larger than the last cell of loadcache.
- // If user do not allow partial result, it must be true.
- return result;
- }
- if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
- // The last cell of this result is smaller than the last cell of loadcache, skip all.
- return null;
- }
-
- // The first one must not in filtered result, we start at the second.
- int index = 1;
- while (index < result.rawCells().length) {
- if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
- break;
- }
- index++;
- }
- Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
- return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow());
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
new file mode 100644
index 0000000..e09ddfb
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A scan result cache that only returns complete result.
+ */
+@InterfaceAudience.Private
+class CompleteScanResultCache implements ScanResultCache {
+
+ private final List<Result> partialResults = new ArrayList<>();
+
+ private Result combine() throws IOException {
+ Result result = Result.createCompleteResult(partialResults);
+ partialResults.clear();
+ return result;
+ }
+
+ private Result[] prependCombined(Result[] results, int length) throws IOException {
+ if (length == 0) {
+ return new Result[] { combine() };
+ }
+ // the last part of a partial result may not be marked as partial so here we need to check if
+ // there is a row change.
+ int start;
+ if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) {
+ partialResults.add(results[0]);
+ start = 1;
+ length--;
+ } else {
+ start = 0;
+ }
+ Result[] prependResults = new Result[length + 1];
+ prependResults[0] = combine();
+ System.arraycopy(results, start, prependResults, 1, length);
+ return prependResults;
+ }
+
+ @Override
+ public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+ // If no results were returned it indicates that either we have the all the partial results
+ // necessary to construct the complete result or the server had to send a heartbeat message
+ // to the client to keep the client-server connection alive
+ if (results.length == 0) {
+ // If this response was an empty heartbeat message, then we have not exhausted the region
+ // and thus there may be more partials server side that still need to be added to the partial
+ // list before we form the complete Result
+ if (!partialResults.isEmpty() && !isHeartbeatMessage) {
+ return new Result[] { combine() };
+ }
+ return EMPTY_RESULT_ARRAY;
+ }
+ // In every RPC response there should be at most a single partial result. Furthermore, if
+ // there is a partial result, it is guaranteed to be in the last position of the array.
+ Result last = results[results.length - 1];
+ if (last.mayHaveMoreCellsInRow()) {
+ if (partialResults.isEmpty()) {
+ partialResults.add(last);
+ return Arrays.copyOf(results, results.length - 1);
+ }
+ // We have only one result and it is partial
+ if (results.length == 1) {
+ // check if there is a row change
+ if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) {
+ partialResults.add(last);
+ return EMPTY_RESULT_ARRAY;
+ }
+ Result completeResult = combine();
+ partialResults.add(last);
+ return new Result[] { completeResult };
+ }
+ // We have some complete results
+ Result[] resultsToReturn = prependCombined(results, results.length - 1);
+ partialResults.add(last);
+ return resultsToReturn;
+ }
+ if (!partialResults.isEmpty()) {
+ return prependCombined(results, results.length);
+ }
+ return results;
+ }
+
+ @Override
+ public void clear() {
+ partialResults.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 97f71e7..1bdc5fe 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -24,12 +24,16 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
@@ -239,6 +243,40 @@ public class ConnectionUtils {
return Bytes.equals(row, EMPTY_END_ROW);
}
+ private static final Comparator<Cell> COMPARE_WITHOUT_ROW = new Comparator<Cell>() {
+
+ @Override
+ public int compare(Cell o1, Cell o2) {
+ return CellComparator.compareWithoutRow(o1, o2);
+ }
+ };
+
+ static Result filterCells(Result result, Cell keepCellsAfter) {
+ if (keepCellsAfter == null) {
+ // do not need to filter
+ return result;
+ }
+ // not the same row
+ if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) {
+ return result;
+ }
+ Cell[] rawCells = result.rawCells();
+ int index = Arrays.binarySearch(rawCells, keepCellsAfter, COMPARE_WITHOUT_ROW);
+ if (index < 0) {
+ index = -index - 1;
+ } else {
+ index++;
+ }
+ if (index == 0) {
+ return result;
+ }
+ if (index == rawCells.length) {
+ return null;
+ }
+ return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
+ result.isStale(), result.mayHaveMoreCellsInRow());
+ }
+
static boolean noMoreResultsForScan(Scan scan, HRegionInfo info) {
if (isEmptyStopRow(info.getEndKey())) {
return true;
@@ -287,4 +325,14 @@ public class ConnectionUtils {
}
return count;
}
+
+ public static ScanResultCache createScanResultCache(Scan scan) {
+ if (scan.getAllowPartialResults()) {
+ return new AllowPartialScanResultCache();
+ } else if (scan.getBatch() > 0) {
+ return new BatchScanResultCache(scan.getBatch());
+ } else {
+ return new CompleteScanResultCache();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 3483d26..4c67c50 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -24,7 +24,9 @@ import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -857,44 +858,42 @@ public class Result implements CellScannable, CellScanner {
* @throws IOException A complete result cannot be formed because the results in the partial list
* come from different rows
*/
- public static Result createCompleteResult(List<Result> partialResults)
+ public static Result createCompleteResult(Iterable<Result> partialResults)
throws IOException {
- List<Cell> cells = new ArrayList<Cell>();
+ if (partialResults == null) {
+ return Result.create(Collections.<Cell> emptyList(), null, false);
+ }
+ List<Cell> cells = new ArrayList<>();
boolean stale = false;
byte[] prevRow = null;
byte[] currentRow = null;
-
- if (partialResults != null && !partialResults.isEmpty()) {
- for (int i = 0; i < partialResults.size(); i++) {
- Result r = partialResults.get(i);
- currentRow = r.getRow();
- if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
- throw new IOException(
- "Cannot form complete result. Rows of partial results do not match." +
- " Partial Results: " + partialResults);
- }
-
- // Ensure that all Results except the last one are marked as partials. The last result
- // may not be marked as a partial because Results are only marked as partials when
- // the scan on the server side must be stopped due to reaching the maxResultSize.
- // Visualizing it makes it easier to understand:
- // maxResultSize: 2 cells
- // (-x-) represents cell number x in a row
- // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
- // How row1 will be returned by the server as partial Results:
- // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
- // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
- // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
- if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
- throw new IOException(
- "Cannot form complete result. Result is missing partial flag. " +
- "Partial Results: " + partialResults);
- }
- prevRow = currentRow;
- stale = stale || r.isStale();
- for (Cell c : r.rawCells()) {
- cells.add(c);
- }
+ for (Iterator<Result> iter = partialResults.iterator(); iter.hasNext();) {
+ Result r = iter.next();
+ currentRow = r.getRow();
+ if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
+ throw new IOException(
+ "Cannot form complete result. Rows of partial results do not match." +
+ " Partial Results: " + partialResults);
+ }
+ // Ensure that all Results except the last one are marked as partials. The last result
+ // may not be marked as a partial because Results are only marked as partials when
+ // the scan on the server side must be stopped due to reaching the maxResultSize.
+ // Visualizing it makes it easier to understand:
+ // maxResultSize: 2 cells
+ // (-x-) represents cell number x in a row
+ // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+ // How row1 will be returned by the server as partial Results:
+ // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+ // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+ // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+ if (iter.hasNext() && !r.mayHaveMoreCellsInRow()) {
+ throw new IOException("Cannot form complete result. Result is missing partial flag. " +
+ "Partial Results: " + partialResults);
+ }
+ prevRow = currentRow;
+ stale = stale || r.isStale();
+ for (Cell c : r.rawCells()) {
+ cells.add(c);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
new file mode 100644
index 0000000..2366b57
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to separate the row constructing logic.
+ * <p>
+ * After we add heartbeat support for scan, RS may return partial result even if allowPartial is
+ * false and batch is 0. With this interface, the implementation now looks like:
+ * <ol>
+ * <li>Get results from ScanResponse proto.</li>
+ * <li>Pass them to ScanResultCache and get something back.</li>
+ * <li>If we actually get something back, then pass it to ScanObserver.</li>
+ * </ol>
+ */
+@InterfaceAudience.Private
+interface ScanResultCache {
+
+ static final Result[] EMPTY_RESULT_ARRAY = new Result[0];
+
+ /**
+ * Add the given results to cache and get valid results back.
+ * @param results the results of a scan next. Must not be null.
+ * @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response.
+ * @return valid results, never null.
+ */
+ Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException;
+
+ /**
+ * Clear the cached result if any. Called when scan error and we will start from a start of a row
+ * again.
+ */
+ void clear();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
new file mode 100644
index 0000000..3fe43a5
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestAllowPartialScanResultCache {
+
+ private static byte[] CF = Bytes.toBytes("cf");
+
+ private AllowPartialScanResultCache resultCache;
+
+ @Before
+ public void setUp() {
+ resultCache = new AllowPartialScanResultCache();
+ }
+
+ @After
+ public void tearDown() {
+ resultCache.clear();
+ resultCache = null;
+ }
+
+ @Test
+ public void test() throws IOException {
+ assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+ resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+ assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+ resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+
+ Cell[] cells1 = createCells(CF, 1, 10);
+ Cell[] cells2 = createCells(CF, 2, 10);
+
+ Result[] results1 = resultCache.addAndGet(
+ new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false);
+ assertEquals(1, results1.length);
+ assertEquals(1, Bytes.toInt(results1[0].getRow()));
+ assertEquals(5, results1[0].rawCells().length);
+ for (int i = 0; i < 5; i++) {
+ assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i))));
+ }
+
+ Result[] results2 = resultCache.addAndGet(
+ new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false);
+ assertEquals(1, results2.length);
+ assertEquals(1, Bytes.toInt(results2[0].getRow()));
+ assertEquals(5, results2[0].rawCells().length);
+ for (int i = 5; i < 10; i++) {
+ assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i))));
+ }
+
+ Result[] results3 =
+ resultCache.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
+ assertEquals(1, results3.length);
+ assertEquals(2, Bytes.toInt(results3[0].getRow()));
+ assertEquals(10, results3[0].rawCells().length);
+ for (int i = 0; i < 10; i++) {
+ assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i))));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
new file mode 100644
index 0000000..31a4594
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestBatchScanResultCache {
+
+ private static byte[] CF = Bytes.toBytes("cf");
+
+ private BatchScanResultCache resultCache;
+
+ @Before
+ public void setUp() {
+ resultCache = new BatchScanResultCache(4);
+ }
+
+ @After
+ public void tearDown() {
+ resultCache.clear();
+ resultCache = null;
+ }
+
+ static Cell createCell(byte[] cf, int key, int cq) {
+ return new KeyValue(Bytes.toBytes(key), cf, Bytes.toBytes("cq" + cq), Bytes.toBytes(key));
+ }
+
+ static Cell[] createCells(byte[] cf, int key, int numCqs) {
+ Cell[] cells = new Cell[numCqs];
+ for (int i = 0; i < numCqs; i++) {
+ cells[i] = createCell(cf, key, i);
+ }
+ return cells;
+ }
+
+ private void assertResultEquals(Result result, int key, int start, int to) {
+ assertEquals(to - start, result.size());
+ for (int i = start; i < to; i++) {
+ assertEquals(key, Bytes.toInt(result.getValue(CF, Bytes.toBytes("cq" + i))));
+ }
+ assertEquals(to - start == 4, result.mayHaveMoreCellsInRow());
+ }
+
+ @Test
+ public void test() throws IOException {
+ assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+ resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+ assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+ resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+
+ Cell[] cells1 = createCells(CF, 1, 10);
+ Cell[] cells2 = createCells(CF, 2, 10);
+ Cell[] cells3 = createCells(CF, 3, 10);
+ assertEquals(0, resultCache.addAndGet(
+ new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false).length);
+ Result[] results = resultCache.addAndGet(
+ new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, false, true),
+ Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) },
+ false);
+ assertEquals(2, results.length);
+ assertResultEquals(results[0], 1, 0, 4);
+ assertResultEquals(results[1], 1, 4, 8);
+ results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+ assertEquals(1, results.length);
+ assertResultEquals(results[0], 1, 8, 10);
+
+ results = resultCache.addAndGet(
+ new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, false, true),
+ Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true),
+ Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true),
+ Result.create(Arrays.copyOfRange(cells3, 0, 4), null, false, true),
+ Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true),
+ Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) },
+ false);
+ assertEquals(6, results.length);
+ assertResultEquals(results[0], 2, 0, 4);
+ assertResultEquals(results[1], 2, 4, 8);
+ assertResultEquals(results[2], 2, 8, 10);
+ assertResultEquals(results[3], 3, 0, 4);
+ assertResultEquals(results[4], 3, 4, 8);
+ assertResultEquals(results[5], 3, 8, 10);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6be8d204/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
new file mode 100644
index 0000000..8759593
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestCompleteResultScanResultCache {
+
+ private static byte[] CF = Bytes.toBytes("cf");
+
+ private static byte[] CQ1 = Bytes.toBytes("cq1");
+
+ private static byte[] CQ2 = Bytes.toBytes("cq2");
+
+ private static byte[] CQ3 = Bytes.toBytes("cq3");
+
+ private CompleteScanResultCache resultCache;
+
+ @Before
+ public void setUp() {
+ resultCache = new CompleteScanResultCache();
+ }
+
+ @After
+ public void tearDown() {
+ resultCache.clear();
+ resultCache = null;
+ }
+
+ private static Cell createCell(int key, byte[] cq) {
+ return new KeyValue(Bytes.toBytes(key), CF, cq, Bytes.toBytes(key));
+ }
+
+ @Test
+ public void testNoPartial() throws IOException {
+ assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+ resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+ assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+ resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+ int count = 10;
+ Result[] results = new Result[count];
+ for (int i = 0; i < count; i++) {
+ results[i] = Result.create(Arrays.asList(createCell(i, CQ1)));
+ }
+ assertSame(results, resultCache.addAndGet(results, false));
+ }
+
+ @Test
+ public void testCombine1() throws IOException {
+ Result previousResult = Result.create(Arrays.asList(createCell(0, CQ1)), null, false, true);
+ Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
+ Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
+ Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
+ Result[] results = resultCache.addAndGet(new Result[] { previousResult, result1 }, false);
+ assertEquals(1, results.length);
+ assertSame(previousResult, results[0]);
+
+ assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
+ assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
+ assertEquals(0, resultCache.addAndGet(new Result[0], true).length);
+
+ results = resultCache.addAndGet(new Result[0], false);
+ assertEquals(1, results.length);
+ assertEquals(1, Bytes.toInt(results[0].getRow()));
+ assertEquals(3, results[0].rawCells().length);
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3)));
+ }
+
+ @Test
+ public void testCombine2() throws IOException {
+ Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
+ Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
+ Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
+ Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
+ Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ2)), null, false, false);
+
+ assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
+ assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
+ assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
+
+ Result[] results = resultCache.addAndGet(new Result[] { nextResult1 }, false);
+ assertEquals(1, results.length);
+ assertEquals(1, Bytes.toInt(results[0].getRow()));
+ assertEquals(3, results[0].rawCells().length);
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3)));
+
+ results = resultCache.addAndGet(new Result[] { nextToNextResult1 }, false);
+ assertEquals(2, results.length);
+ assertEquals(2, Bytes.toInt(results[0].getRow()));
+ assertEquals(1, results[0].rawCells().length);
+ assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1)));
+ assertEquals(3, Bytes.toInt(results[1].getRow()));
+ assertEquals(1, results[1].rawCells().length);
+ assertEquals(3, Bytes.toInt(results[1].getValue(CF, CQ2)));
+ }
+
+ @Test
+ public void testCombine3() throws IOException {
+ Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
+ Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
+ Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, false);
+ Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ1)), null, false, true);
+
+ assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
+ assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
+
+ Result[] results =
+ resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, false);
+ assertEquals(2, results.length);
+ assertEquals(1, Bytes.toInt(results[0].getRow()));
+ assertEquals(2, results[0].rawCells().length);
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
+ assertEquals(2, Bytes.toInt(results[1].getRow()));
+ assertEquals(1, results[1].rawCells().length);
+ assertEquals(2, Bytes.toInt(results[1].getValue(CF, CQ1)));
+
+ results = resultCache.addAndGet(new Result[0], false);
+ assertEquals(1, results.length);
+ assertEquals(3, Bytes.toInt(results[0].getRow()));
+ assertEquals(1, results[0].rawCells().length);
+ assertEquals(3, Bytes.toInt(results[0].getValue(CF, CQ1)));
+ }
+
+ @Test
+ public void testCombine4() throws IOException {
+ Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
+ Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, false);
+ Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
+ Result nextResult2 = Result.create(Arrays.asList(createCell(2, CQ2)), null, false, false);
+
+ assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
+
+ Result[] results = resultCache.addAndGet(new Result[] { result2, nextResult1 }, false);
+ assertEquals(1, results.length);
+ assertEquals(1, Bytes.toInt(results[0].getRow()));
+ assertEquals(2, results[0].rawCells().length);
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
+ assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
+
+ results = resultCache.addAndGet(new Result[] { nextResult2 }, false);
+ assertEquals(1, results.length);
+ assertEquals(2, Bytes.toInt(results[0].getRow()));
+ assertEquals(2, results[0].rawCells().length);
+ assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1)));
+ assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ2)));
+ }
+}