You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mi...@apache.org on 2016/03/10 18:23:13 UTC
[30/51] [partial] hbase-site git commit: Published site at
f6945c4631e7697976fd8c2272f8152905c6f875.
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/5eb82203/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html
index 57b645b..8c624dc 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html
@@ -5305,2629 +5305,2632 @@
<span class="sourceLineNo">5297</span> Preconditions.checkNotNull(familyPaths);<a name="line.5297"></a>
<span class="sourceLineNo">5298</span> // we need writeLock for multi-family bulk load<a name="line.5298"></a>
<span class="sourceLineNo">5299</span> startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));<a name="line.5299"></a>
-<span class="sourceLineNo">5300</span> try {<a name="line.5300"></a>
-<span class="sourceLineNo">5301</span> this.writeRequestsCount.increment();<a name="line.5301"></a>
-<span class="sourceLineNo">5302</span><a name="line.5302"></a>
-<span class="sourceLineNo">5303</span> // There possibly was a split that happened between when the split keys<a name="line.5303"></a>
-<span class="sourceLineNo">5304</span> // were gathered and before the HRegion's write lock was taken. We need<a name="line.5304"></a>
-<span class="sourceLineNo">5305</span> // to validate the HFile region before attempting to bulk load all of them<a name="line.5305"></a>
-<span class="sourceLineNo">5306</span> List<IOException> ioes = new ArrayList<IOException>();<a name="line.5306"></a>
-<span class="sourceLineNo">5307</span> List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();<a name="line.5307"></a>
-<span class="sourceLineNo">5308</span> for (Pair<byte[], String> p : familyPaths) {<a name="line.5308"></a>
-<span class="sourceLineNo">5309</span> byte[] familyName = p.getFirst();<a name="line.5309"></a>
-<span class="sourceLineNo">5310</span> String path = p.getSecond();<a name="line.5310"></a>
-<span class="sourceLineNo">5311</span><a name="line.5311"></a>
-<span class="sourceLineNo">5312</span> Store store = getStore(familyName);<a name="line.5312"></a>
-<span class="sourceLineNo">5313</span> if (store == null) {<a name="line.5313"></a>
-<span class="sourceLineNo">5314</span> IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(<a name="line.5314"></a>
-<span class="sourceLineNo">5315</span> "No such column family " + Bytes.toStringBinary(familyName));<a name="line.5315"></a>
-<span class="sourceLineNo">5316</span> ioes.add(ioe);<a name="line.5316"></a>
-<span class="sourceLineNo">5317</span> } else {<a name="line.5317"></a>
-<span class="sourceLineNo">5318</span> try {<a name="line.5318"></a>
-<span class="sourceLineNo">5319</span> store.assertBulkLoadHFileOk(new Path(path));<a name="line.5319"></a>
-<span class="sourceLineNo">5320</span> } catch (WrongRegionException wre) {<a name="line.5320"></a>
-<span class="sourceLineNo">5321</span> // recoverable (file doesn't fit in region)<a name="line.5321"></a>
-<span class="sourceLineNo">5322</span> failures.add(p);<a name="line.5322"></a>
-<span class="sourceLineNo">5323</span> } catch (IOException ioe) {<a name="line.5323"></a>
-<span class="sourceLineNo">5324</span> // unrecoverable (hdfs problem)<a name="line.5324"></a>
-<span class="sourceLineNo">5325</span> ioes.add(ioe);<a name="line.5325"></a>
-<span class="sourceLineNo">5326</span> }<a name="line.5326"></a>
-<span class="sourceLineNo">5327</span> }<a name="line.5327"></a>
-<span class="sourceLineNo">5328</span> }<a name="line.5328"></a>
-<span class="sourceLineNo">5329</span><a name="line.5329"></a>
-<span class="sourceLineNo">5330</span> // validation failed because of some sort of IO problem.<a name="line.5330"></a>
-<span class="sourceLineNo">5331</span> if (ioes.size() != 0) {<a name="line.5331"></a>
-<span class="sourceLineNo">5332</span> IOException e = MultipleIOException.createIOException(ioes);<a name="line.5332"></a>
-<span class="sourceLineNo">5333</span> LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);<a name="line.5333"></a>
-<span class="sourceLineNo">5334</span> throw e;<a name="line.5334"></a>
-<span class="sourceLineNo">5335</span> }<a name="line.5335"></a>
-<span class="sourceLineNo">5336</span><a name="line.5336"></a>
-<span class="sourceLineNo">5337</span> // validation failed, bail out before doing anything permanent.<a name="line.5337"></a>
-<span class="sourceLineNo">5338</span> if (failures.size() != 0) {<a name="line.5338"></a>
-<span class="sourceLineNo">5339</span> StringBuilder list = new StringBuilder();<a name="line.5339"></a>
-<span class="sourceLineNo">5340</span> for (Pair<byte[], String> p : failures) {<a name="line.5340"></a>
-<span class="sourceLineNo">5341</span> list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")<a name="line.5341"></a>
-<span class="sourceLineNo">5342</span> .append(p.getSecond());<a name="line.5342"></a>
-<span class="sourceLineNo">5343</span> }<a name="line.5343"></a>
-<span class="sourceLineNo">5344</span> // problem when validating<a name="line.5344"></a>
-<span class="sourceLineNo">5345</span> LOG.warn("There was a recoverable bulk load failure likely due to a" +<a name="line.5345"></a>
-<span class="sourceLineNo">5346</span> " split. These (family, HFile) pairs were not loaded: " + list);<a name="line.5346"></a>
-<span class="sourceLineNo">5347</span> return false;<a name="line.5347"></a>
-<span class="sourceLineNo">5348</span> }<a name="line.5348"></a>
-<span class="sourceLineNo">5349</span><a name="line.5349"></a>
-<span class="sourceLineNo">5350</span> // We need to assign a sequential ID that's in between two memstores in order to preserve<a name="line.5350"></a>
-<span class="sourceLineNo">5351</span> // the guarantee that all the edits lower than the highest sequential ID from all the<a name="line.5351"></a>
-<span class="sourceLineNo">5352</span> // HFiles are flushed on disk. See HBASE-10958. The sequence id returned when we flush is<a name="line.5352"></a>
-<span class="sourceLineNo">5353</span> // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is<a name="line.5353"></a>
-<span class="sourceLineNo">5354</span> // a sequence id that we can be sure is beyond the last hfile written).<a name="line.5354"></a>
-<span class="sourceLineNo">5355</span> if (assignSeqId) {<a name="line.5355"></a>
-<span class="sourceLineNo">5356</span> FlushResult fs = flushcache(true, false);<a name="line.5356"></a>
-<span class="sourceLineNo">5357</span> if (fs.isFlushSucceeded()) {<a name="line.5357"></a>
-<span class="sourceLineNo">5358</span> seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5358"></a>
-<span class="sourceLineNo">5359</span> } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {<a name="line.5359"></a>
-<span class="sourceLineNo">5360</span> seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5360"></a>
-<span class="sourceLineNo">5361</span> } else {<a name="line.5361"></a>
-<span class="sourceLineNo">5362</span> throw new IOException("Could not bulk load with an assigned sequential ID because the "+<a name="line.5362"></a>
-<span class="sourceLineNo">5363</span> "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);<a name="line.5363"></a>
-<span class="sourceLineNo">5364</span> }<a name="line.5364"></a>
-<span class="sourceLineNo">5365</span> }<a name="line.5365"></a>
-<span class="sourceLineNo">5366</span><a name="line.5366"></a>
-<span class="sourceLineNo">5367</span> for (Pair<byte[], String> p : familyPaths) {<a name="line.5367"></a>
-<span class="sourceLineNo">5368</span> byte[] familyName = p.getFirst();<a name="line.5368"></a>
-<span class="sourceLineNo">5369</span> String path = p.getSecond();<a name="line.5369"></a>
-<span class="sourceLineNo">5370</span> Store store = getStore(familyName);<a name="line.5370"></a>
-<span class="sourceLineNo">5371</span> try {<a name="line.5371"></a>
-<span class="sourceLineNo">5372</span> String finalPath = path;<a name="line.5372"></a>
-<span class="sourceLineNo">5373</span> if (bulkLoadListener != null) {<a name="line.5373"></a>
-<span class="sourceLineNo">5374</span> finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);<a name="line.5374"></a>
-<span class="sourceLineNo">5375</span> }<a name="line.5375"></a>
-<span class="sourceLineNo">5376</span> Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);<a name="line.5376"></a>
-<span class="sourceLineNo">5377</span><a name="line.5377"></a>
-<span class="sourceLineNo">5378</span> if(storeFiles.containsKey(familyName)) {<a name="line.5378"></a>
-<span class="sourceLineNo">5379</span> storeFiles.get(familyName).add(commitedStoreFile);<a name="line.5379"></a>
-<span class="sourceLineNo">5380</span> } else {<a name="line.5380"></a>
-<span class="sourceLineNo">5381</span> List<Path> storeFileNames = new ArrayList<Path>();<a name="line.5381"></a>
-<span class="sourceLineNo">5382</span> storeFileNames.add(commitedStoreFile);<a name="line.5382"></a>
-<span class="sourceLineNo">5383</span> storeFiles.put(familyName, storeFileNames);<a name="line.5383"></a>
-<span class="sourceLineNo">5384</span> }<a name="line.5384"></a>
-<span class="sourceLineNo">5385</span> if (bulkLoadListener != null) {<a name="line.5385"></a>
-<span class="sourceLineNo">5386</span> bulkLoadListener.doneBulkLoad(familyName, path);<a name="line.5386"></a>
-<span class="sourceLineNo">5387</span> }<a name="line.5387"></a>
-<span class="sourceLineNo">5388</span> } catch (IOException ioe) {<a name="line.5388"></a>
-<span class="sourceLineNo">5389</span> // A failure here can cause an atomicity violation that we currently<a name="line.5389"></a>
-<span class="sourceLineNo">5390</span> // cannot recover from since it is likely a failed HDFS operation.<a name="line.5390"></a>
-<span class="sourceLineNo">5391</span><a name="line.5391"></a>
-<span class="sourceLineNo">5392</span> // TODO Need a better story for reverting partial failures due to HDFS.<a name="line.5392"></a>
-<span class="sourceLineNo">5393</span> LOG.error("There was a partial failure due to IO when attempting to" +<a name="line.5393"></a>
-<span class="sourceLineNo">5394</span> " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);<a name="line.5394"></a>
-<span class="sourceLineNo">5395</span> if (bulkLoadListener != null) {<a name="line.5395"></a>
-<span class="sourceLineNo">5396</span> try {<a name="line.5396"></a>
-<span class="sourceLineNo">5397</span> bulkLoadListener.failedBulkLoad(familyName, path);<a name="line.5397"></a>
-<span class="sourceLineNo">5398</span> } catch (Exception ex) {<a name="line.5398"></a>
-<span class="sourceLineNo">5399</span> LOG.error("Error while calling failedBulkLoad for family " +<a name="line.5399"></a>
-<span class="sourceLineNo">5400</span> Bytes.toString(familyName) + " with path " + path, ex);<a name="line.5400"></a>
-<span class="sourceLineNo">5401</span> }<a name="line.5401"></a>
-<span class="sourceLineNo">5402</span> }<a name="line.5402"></a>
-<span class="sourceLineNo">5403</span> throw ioe;<a name="line.5403"></a>
-<span class="sourceLineNo">5404</span> }<a name="line.5404"></a>
-<span class="sourceLineNo">5405</span> }<a name="line.5405"></a>
-<span class="sourceLineNo">5406</span><a name="line.5406"></a>
-<span class="sourceLineNo">5407</span> return true;<a name="line.5407"></a>
-<span class="sourceLineNo">5408</span> } finally {<a name="line.5408"></a>
-<span class="sourceLineNo">5409</span> if (wal != null && !storeFiles.isEmpty()) {<a name="line.5409"></a>
-<span class="sourceLineNo">5410</span> // @rite a bulk load event when not all hfiles are loaded<a name="line.5410"></a>
-<span class="sourceLineNo">5411</span> try {<a name="line.5411"></a>
-<span class="sourceLineNo">5412</span> WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(<a name="line.5412"></a>
-<span class="sourceLineNo">5413</span> this.getRegionInfo().getTable(),<a name="line.5413"></a>
-<span class="sourceLineNo">5414</span> ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);<a name="line.5414"></a>
-<span class="sourceLineNo">5415</span> WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),<a name="line.5415"></a>
-<span class="sourceLineNo">5416</span> loadDescriptor, mvcc);<a name="line.5416"></a>
-<span class="sourceLineNo">5417</span> } catch (IOException ioe) {<a name="line.5417"></a>
-<span class="sourceLineNo">5418</span> if (this.rsServices != null) {<a name="line.5418"></a>
-<span class="sourceLineNo">5419</span> // Have to abort region server because some hfiles has been loaded but we can't write<a name="line.5419"></a>
-<span class="sourceLineNo">5420</span> // the event into WAL<a name="line.5420"></a>
-<span class="sourceLineNo">5421</span> this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);<a name="line.5421"></a>
-<span class="sourceLineNo">5422</span> }<a name="line.5422"></a>
-<span class="sourceLineNo">5423</span> }<a name="line.5423"></a>
-<span class="sourceLineNo">5424</span> }<a name="line.5424"></a>
-<span class="sourceLineNo">5425</span><a name="line.5425"></a>
-<span class="sourceLineNo">5426</span> closeBulkRegionOperation();<a name="line.5426"></a>
-<span class="sourceLineNo">5427</span> }<a name="line.5427"></a>
-<span class="sourceLineNo">5428</span> }<a name="line.5428"></a>
-<span class="sourceLineNo">5429</span><a name="line.5429"></a>
-<span class="sourceLineNo">5430</span> @Override<a name="line.5430"></a>
-<span class="sourceLineNo">5431</span> public boolean equals(Object o) {<a name="line.5431"></a>
-<span class="sourceLineNo">5432</span> return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(),<a name="line.5432"></a>
-<span class="sourceLineNo">5433</span> ((HRegion) o).getRegionInfo().getRegionName());<a name="line.5433"></a>
-<span class="sourceLineNo">5434</span> }<a name="line.5434"></a>
-<span class="sourceLineNo">5435</span><a name="line.5435"></a>
-<span class="sourceLineNo">5436</span> @Override<a name="line.5436"></a>
-<span class="sourceLineNo">5437</span> public int hashCode() {<a name="line.5437"></a>
-<span class="sourceLineNo">5438</span> return Bytes.hashCode(getRegionInfo().getRegionName());<a name="line.5438"></a>
-<span class="sourceLineNo">5439</span> }<a name="line.5439"></a>
-<span class="sourceLineNo">5440</span><a name="line.5440"></a>
-<span class="sourceLineNo">5441</span> @Override<a name="line.5441"></a>
-<span class="sourceLineNo">5442</span> public String toString() {<a name="line.5442"></a>
-<span class="sourceLineNo">5443</span> return getRegionInfo().getRegionNameAsString();<a name="line.5443"></a>
-<span class="sourceLineNo">5444</span> }<a name="line.5444"></a>
-<span class="sourceLineNo">5445</span><a name="line.5445"></a>
-<span class="sourceLineNo">5446</span> /**<a name="line.5446"></a>
-<span class="sourceLineNo">5447</span> * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).<a name="line.5447"></a>
-<span class="sourceLineNo">5448</span> */<a name="line.5448"></a>
-<span class="sourceLineNo">5449</span> class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {<a name="line.5449"></a>
-<span class="sourceLineNo">5450</span> // Package local for testability<a name="line.5450"></a>
-<span class="sourceLineNo">5451</span> KeyValueHeap storeHeap = null;<a name="line.5451"></a>
-<span class="sourceLineNo">5452</span> /** Heap of key-values that are not essential for the provided filters and are thus read<a name="line.5452"></a>
-<span class="sourceLineNo">5453</span> * on demand, if on-demand column family loading is enabled.*/<a name="line.5453"></a>
-<span class="sourceLineNo">5454</span> KeyValueHeap joinedHeap = null;<a name="line.5454"></a>
-<span class="sourceLineNo">5455</span> /**<a name="line.5455"></a>
-<span class="sourceLineNo">5456</span> * If the joined heap data gathering is interrupted due to scan limits, this will<a name="line.5456"></a>
-<span class="sourceLineNo">5457</span> * contain the row for which we are populating the values.*/<a name="line.5457"></a>
-<span class="sourceLineNo">5458</span> protected Cell joinedContinuationRow = null;<a name="line.5458"></a>
-<span class="sourceLineNo">5459</span> private boolean filterClosed = false;<a name="line.5459"></a>
-<span class="sourceLineNo">5460</span><a name="line.5460"></a>
-<span class="sourceLineNo">5461</span> protected final int isScan;<a name="line.5461"></a>
-<span class="sourceLineNo">5462</span> protected final byte[] stopRow;<a name="line.5462"></a>
-<span class="sourceLineNo">5463</span> protected final HRegion region;<a name="line.5463"></a>
-<span class="sourceLineNo">5464</span> protected final CellComparator comparator;<a name="line.5464"></a>
-<span class="sourceLineNo">5465</span> protected boolean copyCellsFromSharedMem = false;<a name="line.5465"></a>
-<span class="sourceLineNo">5466</span><a name="line.5466"></a>
-<span class="sourceLineNo">5467</span> private final long readPt;<a name="line.5467"></a>
-<span class="sourceLineNo">5468</span> private final long maxResultSize;<a name="line.5468"></a>
-<span class="sourceLineNo">5469</span> private final ScannerContext defaultScannerContext;<a name="line.5469"></a>
-<span class="sourceLineNo">5470</span> private final FilterWrapper filter;<a name="line.5470"></a>
-<span class="sourceLineNo">5471</span><a name="line.5471"></a>
-<span class="sourceLineNo">5472</span> @Override<a name="line.5472"></a>
-<span class="sourceLineNo">5473</span> public HRegionInfo getRegionInfo() {<a name="line.5473"></a>
-<span class="sourceLineNo">5474</span> return region.getRegionInfo();<a name="line.5474"></a>
-<span class="sourceLineNo">5475</span> }<a name="line.5475"></a>
-<span class="sourceLineNo">5476</span><a name="line.5476"></a>
-<span class="sourceLineNo">5477</span> public void setCopyCellsFromSharedMem(boolean copyCells) {<a name="line.5477"></a>
-<span class="sourceLineNo">5478</span> this.copyCellsFromSharedMem = copyCells;<a name="line.5478"></a>
-<span class="sourceLineNo">5479</span> }<a name="line.5479"></a>
-<span class="sourceLineNo">5480</span><a name="line.5480"></a>
-<span class="sourceLineNo">5481</span> RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,<a name="line.5481"></a>
-<span class="sourceLineNo">5482</span> boolean copyCellsFromSharedMem)<a name="line.5482"></a>
-<span class="sourceLineNo">5483</span> throws IOException {<a name="line.5483"></a>
-<span class="sourceLineNo">5484</span> this.region = region;<a name="line.5484"></a>
-<span class="sourceLineNo">5485</span> this.maxResultSize = scan.getMaxResultSize();<a name="line.5485"></a>
-<span class="sourceLineNo">5486</span> if (scan.hasFilter()) {<a name="line.5486"></a>
-<span class="sourceLineNo">5487</span> this.filter = new FilterWrapper(scan.getFilter());<a name="line.5487"></a>
-<span class="sourceLineNo">5488</span> } else {<a name="line.5488"></a>
-<span class="sourceLineNo">5489</span> this.filter = null;<a name="line.5489"></a>
-<span class="sourceLineNo">5490</span> }<a name="line.5490"></a>
-<span class="sourceLineNo">5491</span> this.comparator = region.getCellCompartor();<a name="line.5491"></a>
-<span class="sourceLineNo">5492</span> /**<a name="line.5492"></a>
-<span class="sourceLineNo">5493</span> * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default<a name="line.5493"></a>
-<span class="sourceLineNo">5494</span> * scanner context that can be used to enforce the batch limit in the event that a<a name="line.5494"></a>
-<span class="sourceLineNo">5495</span> * ScannerContext is not specified during an invocation of next/nextRaw<a name="line.5495"></a>
-<span class="sourceLineNo">5496</span> */<a name="line.5496"></a>
-<span class="sourceLineNo">5497</span> defaultScannerContext = ScannerContext.newBuilder()<a name="line.5497"></a>
-<span class="sourceLineNo">5498</span> .setBatchLimit(scan.getBatch()).build();<a name="line.5498"></a>
-<span class="sourceLineNo">5499</span><a name="line.5499"></a>
-<span class="sourceLineNo">5500</span> if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {<a name="line.5500"></a>
-<span class="sourceLineNo">5501</span> this.stopRow = null;<a name="line.5501"></a>
-<span class="sourceLineNo">5502</span> } else {<a name="line.5502"></a>
-<span class="sourceLineNo">5503</span> this.stopRow = scan.getStopRow();<a name="line.5503"></a>
-<span class="sourceLineNo">5504</span> }<a name="line.5504"></a>
-<span class="sourceLineNo">5505</span> // If we are doing a get, we want to be [startRow,endRow]. Normally<a name="line.5505"></a>
-<span class="sourceLineNo">5506</span> // it is [startRow,endRow) and if startRow=endRow we get nothing.<a name="line.5506"></a>
-<span class="sourceLineNo">5507</span> this.isScan = scan.isGetScan() ? 1 : 0;<a name="line.5507"></a>
-<span class="sourceLineNo">5508</span><a name="line.5508"></a>
-<span class="sourceLineNo">5509</span> // synchronize on scannerReadPoints so that nobody calculates<a name="line.5509"></a>
-<span class="sourceLineNo">5510</span> // getSmallestReadPoint, before scannerReadPoints is updated.<a name="line.5510"></a>
-<span class="sourceLineNo">5511</span> IsolationLevel isolationLevel = scan.getIsolationLevel();<a name="line.5511"></a>
-<span class="sourceLineNo">5512</span> synchronized(scannerReadPoints) {<a name="line.5512"></a>
-<span class="sourceLineNo">5513</span> this.readPt = getReadPoint(isolationLevel);<a name="line.5513"></a>
-<span class="sourceLineNo">5514</span> scannerReadPoints.put(this, this.readPt);<a name="line.5514"></a>
-<span class="sourceLineNo">5515</span> }<a name="line.5515"></a>
-<span class="sourceLineNo">5516</span><a name="line.5516"></a>
-<span class="sourceLineNo">5517</span> // Here we separate all scanners into two lists - scanner that provide data required<a name="line.5517"></a>
-<span class="sourceLineNo">5518</span> // by the filter to operate (scanners list) and all others (joinedScanners list).<a name="line.5518"></a>
-<span class="sourceLineNo">5519</span> List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());<a name="line.5519"></a>
-<span class="sourceLineNo">5520</span> List<KeyValueScanner> joinedScanners<a name="line.5520"></a>
-<span class="sourceLineNo">5521</span> = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());<a name="line.5521"></a>
-<span class="sourceLineNo">5522</span> if (additionalScanners != null) {<a name="line.5522"></a>
-<span class="sourceLineNo">5523</span> scanners.addAll(additionalScanners);<a name="line.5523"></a>
-<span class="sourceLineNo">5524</span> }<a name="line.5524"></a>
-<span class="sourceLineNo">5525</span><a name="line.5525"></a>
-<span class="sourceLineNo">5526</span> for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {<a name="line.5526"></a>
-<span class="sourceLineNo">5527</span> Store store = stores.get(entry.getKey());<a name="line.5527"></a>
-<span class="sourceLineNo">5528</span> KeyValueScanner scanner;<a name="line.5528"></a>
-<span class="sourceLineNo">5529</span> try {<a name="line.5529"></a>
-<span class="sourceLineNo">5530</span> scanner = store.getScanner(scan, entry.getValue(), this.readPt);<a name="line.5530"></a>
-<span class="sourceLineNo">5531</span> } catch (FileNotFoundException e) {<a name="line.5531"></a>
-<span class="sourceLineNo">5532</span> throw handleFileNotFound(e);<a name="line.5532"></a>
-<span class="sourceLineNo">5533</span> }<a name="line.5533"></a>
-<span class="sourceLineNo">5534</span> if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()<a name="line.5534"></a>
-<span class="sourceLineNo">5535</span> || this.filter.isFamilyEssential(entry.getKey())) {<a name="line.5535"></a>
-<span class="sourceLineNo">5536</span> scanners.add(scanner);<a name="line.5536"></a>
-<span class="sourceLineNo">5537</span> } else {<a name="line.5537"></a>
-<span class="sourceLineNo">5538</span> joinedScanners.add(scanner);<a name="line.5538"></a>
-<span class="sourceLineNo">5539</span> }<a name="line.5539"></a>
-<span class="sourceLineNo">5540</span> }<a name="line.5540"></a>
-<span class="sourceLineNo">5541</span> this.copyCellsFromSharedMem = copyCellsFromSharedMem;<a name="line.5541"></a>
-<span class="sourceLineNo">5542</span> initializeKVHeap(scanners, joinedScanners, region);<a name="line.5542"></a>
-<span class="sourceLineNo">5543</span> }<a name="line.5543"></a>
-<span class="sourceLineNo">5544</span><a name="line.5544"></a>
-<span class="sourceLineNo">5545</span> protected void initializeKVHeap(List<KeyValueScanner> scanners,<a name="line.5545"></a>
-<span class="sourceLineNo">5546</span> List<KeyValueScanner> joinedScanners, HRegion region)<a name="line.5546"></a>
-<span class="sourceLineNo">5547</span> throws IOException {<a name="line.5547"></a>
-<span class="sourceLineNo">5548</span> this.storeHeap = new KeyValueHeap(scanners, comparator);<a name="line.5548"></a>
-<span class="sourceLineNo">5549</span> if (!joinedScanners.isEmpty()) {<a name="line.5549"></a>
-<span class="sourceLineNo">5550</span> this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);<a name="line.5550"></a>
-<span class="sourceLineNo">5551</span> }<a name="line.5551"></a>
-<span class="sourceLineNo">5552</span> }<a name="line.5552"></a>
-<span class="sourceLineNo">5553</span><a name="line.5553"></a>
-<span class="sourceLineNo">5554</span> @Override<a name="line.5554"></a>
-<span class="sourceLineNo">5555</span> public long getMaxResultSize() {<a name="line.5555"></a>
-<span class="sourceLineNo">5556</span> return maxResultSize;<a name="line.5556"></a>
-<span class="sourceLineNo">5557</span> }<a name="line.5557"></a>
-<span class="sourceLineNo">5558</span><a name="line.5558"></a>
-<span class="sourceLineNo">5559</span> @Override<a name="line.5559"></a>
-<span class="sourceLineNo">5560</span> public long getMvccReadPoint() {<a name="line.5560"></a>
-<span class="sourceLineNo">5561</span> return this.readPt;<a name="line.5561"></a>
-<span class="sourceLineNo">5562</span> }<a name="line.5562"></a>
-<span class="sourceLineNo">5563</span><a name="line.5563"></a>
-<span class="sourceLineNo">5564</span> @Override<a name="line.5564"></a>
-<span class="sourceLineNo">5565</span> public int getBatch() {<a name="line.5565"></a>
-<span class="sourceLineNo">5566</span> return this.defaultScannerContext.getBatchLimit();<a name="line.5566"></a>
-<span class="sourceLineNo">5567</span> }<a name="line.5567"></a>
-<span class="sourceLineNo">5568</span><a name="line.5568"></a>
-<span class="sourceLineNo">5569</span> /**<a name="line.5569"></a>
-<span class="sourceLineNo">5570</span> * Reset both the filter and the old filter.<a name="line.5570"></a>
-<span class="sourceLineNo">5571</span> *<a name="line.5571"></a>
-<span class="sourceLineNo">5572</span> * @throws IOException in case a filter raises an I/O exception.<a name="line.5572"></a>
-<span class="sourceLineNo">5573</span> */<a name="line.5573"></a>
-<span class="sourceLineNo">5574</span> protected void resetFilters() throws IOException {<a name="line.5574"></a>
-<span class="sourceLineNo">5575</span> if (filter != null) {<a name="line.5575"></a>
-<span class="sourceLineNo">5576</span> filter.reset();<a name="line.5576"></a>
-<span class="sourceLineNo">5577</span> }<a name="line.5577"></a>
-<span class="sourceLineNo">5578</span> }<a name="line.5578"></a>
-<span class="sourceLineNo">5579</span><a name="line.5579"></a>
-<span class="sourceLineNo">5580</span> @Override<a name="line.5580"></a>
-<span class="sourceLineNo">5581</span> public boolean next(List<Cell> outResults)<a name="line.5581"></a>
-<span class="sourceLineNo">5582</span> throws IOException {<a name="line.5582"></a>
-<span class="sourceLineNo">5583</span> // apply the batching limit by default<a name="line.5583"></a>
-<span class="sourceLineNo">5584</span> return next(outResults, defaultScannerContext);<a name="line.5584"></a>
-<span class="sourceLineNo">5585</span> }<a name="line.5585"></a>
-<span class="sourceLineNo">5586</span><a name="line.5586"></a>
-<span class="sourceLineNo">5587</span> @Override<a name="line.5587"></a>
-<span class="sourceLineNo">5588</span> public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)<a name="line.5588"></a>
-<span class="sourceLineNo">5589</span> throws IOException {<a name="line.5589"></a>
-<span class="sourceLineNo">5590</span> if (this.filterClosed) {<a name="line.5590"></a>
-<span class="sourceLineNo">5591</span> throw new UnknownScannerException("Scanner was closed (timed out?) " +<a name="line.5591"></a>
-<span class="sourceLineNo">5592</span> "after we renewed it. Could be caused by a very slow scanner " +<a name="line.5592"></a>
-<span class="sourceLineNo">5593</span> "or a lengthy garbage collection");<a name="line.5593"></a>
-<span class="sourceLineNo">5594</span> }<a name="line.5594"></a>
-<span class="sourceLineNo">5595</span> startRegionOperation(Operation.SCAN);<a name="line.5595"></a>
-<span class="sourceLineNo">5596</span> readRequestsCount.increment();<a name="line.5596"></a>
-<span class="sourceLineNo">5597</span> try {<a name="line.5597"></a>
-<span class="sourceLineNo">5598</span> return nextRaw(outResults, scannerContext);<a name="line.5598"></a>
-<span class="sourceLineNo">5599</span> } finally {<a name="line.5599"></a>
-<span class="sourceLineNo">5600</span> closeRegionOperation(Operation.SCAN);<a name="line.5600"></a>
-<span class="sourceLineNo">5601</span> }<a name="line.5601"></a>
-<span class="sourceLineNo">5602</span> }<a name="line.5602"></a>
-<span class="sourceLineNo">5603</span><a name="line.5603"></a>
-<span class="sourceLineNo">5604</span> @Override<a name="line.5604"></a>
-<span class="sourceLineNo">5605</span> public boolean nextRaw(List<Cell> outResults) throws IOException {<a name="line.5605"></a>
-<span class="sourceLineNo">5606</span> // Use the RegionScanner's context by default<a name="line.5606"></a>
-<span class="sourceLineNo">5607</span> return nextRaw(outResults, defaultScannerContext);<a name="line.5607"></a>
-<span class="sourceLineNo">5608</span> }<a name="line.5608"></a>
-<span class="sourceLineNo">5609</span><a name="line.5609"></a>
-<span class="sourceLineNo">5610</span> @Override<a name="line.5610"></a>
-<span class="sourceLineNo">5611</span> public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)<a name="line.5611"></a>
-<span class="sourceLineNo">5612</span> throws IOException {<a name="line.5612"></a>
-<span class="sourceLineNo">5613</span> if (storeHeap == null) {<a name="line.5613"></a>
-<span class="sourceLineNo">5614</span> // scanner is closed<a name="line.5614"></a>
-<span class="sourceLineNo">5615</span> throw new UnknownScannerException("Scanner was closed");<a name="line.5615"></a>
-<span class="sourceLineNo">5616</span> }<a name="line.5616"></a>
-<span class="sourceLineNo">5617</span> boolean moreValues = false;<a name="line.5617"></a>
-<span class="sourceLineNo">5618</span> try {<a name="line.5618"></a>
-<span class="sourceLineNo">5619</span> if (outResults.isEmpty()) {<a name="line.5619"></a>
-<span class="sourceLineNo">5620</span> // Usually outResults is empty. This is true when next is called<a name="line.5620"></a>
-<span class="sourceLineNo">5621</span> // to handle scan or get operation.<a name="line.5621"></a>
-<span class="sourceLineNo">5622</span> moreValues = nextInternal(outResults, scannerContext);<a name="line.5622"></a>
-<span class="sourceLineNo">5623</span> } else {<a name="line.5623"></a>
-<span class="sourceLineNo">5624</span> List<Cell> tmpList = new ArrayList<Cell>();<a name="line.5624"></a>
-<span class="sourceLineNo">5625</span> moreValues = nextInternal(tmpList, scannerContext);<a name="line.5625"></a>
-<span class="sourceLineNo">5626</span> outResults.addAll(tmpList);<a name="line.5626"></a>
-<span class="sourceLineNo">5627</span> }<a name="line.5627"></a>
-<span class="sourceLineNo">5628</span><a name="line.5628"></a>
-<span class="sourceLineNo">5629</span> // If the size limit was reached it means a partial Result is being<a name="line.5629"></a>
-<span class="sourceLineNo">5630</span> // returned. Returning a<a name="line.5630"></a>
-<span class="sourceLineNo">5631</span> // partial Result means that we should not reset the filters; filters<a name="line.5631"></a>
-<span class="sourceLineNo">5632</span> // should only be reset in<a name="line.5632"></a>
-<span class="sourceLineNo">5633</span> // between rows<a name="line.5633"></a>
-<span class="sourceLineNo">5634</span> if (!scannerContext.partialResultFormed()) resetFilters();<a name="line.5634"></a>
-<span class="sourceLineNo">5635</span><a name="line.5635"></a>
-<span class="sourceLineNo">5636</span> if (isFilterDoneInternal()) {<a name="line.5636"></a>
-<span class="sourceLineNo">5637</span> moreValues = false;<a name="line.5637"></a>
-<span class="sourceLineNo">5638</span> }<a name="line.5638"></a>
-<span class="sourceLineNo">5639</span><a name="line.5639"></a>
-<span class="sourceLineNo">5640</span> // If copyCellsFromSharedMem = true, then we need to copy the cells. Otherwise<a name="line.5640"></a>
-<span class="sourceLineNo">5641</span> // it is a call coming from the RsRpcServices.scan().<a name="line.5641"></a>
-<span class="sourceLineNo">5642</span> if (copyCellsFromSharedMem && !outResults.isEmpty()) {<a name="line.5642"></a>
-<span class="sourceLineNo">5643</span> // Do the copy of the results here.<a name="line.5643"></a>
-<span class="sourceLineNo">5644</span> ListIterator<Cell> listItr = outResults.listIterator();<a name="line.5644"></a>
-<span class="sourceLineNo">5645</span> Cell cell = null;<a name="line.5645"></a>
-<span class="sourceLineNo">5646</span> while (listItr.hasNext()) {<a name="line.5646"></a>
-<span class="sourceLineNo">5647</span> cell = listItr.next();<a name="line.5647"></a>
-<span class="sourceLineNo">5648</span> if (cell instanceof ShareableMemory) {<a name="line.5648"></a>
-<span class="sourceLineNo">5649</span> listItr.set(((ShareableMemory) cell).cloneToCell());<a name="line.5649"></a>
-<span class="sourceLineNo">5650</span> }<a name="line.5650"></a>
-<span class="sourceLineNo">5651</span> }<a name="line.5651"></a>
-<span class="sourceLineNo">5652</span> }<a name="line.5652"></a>
-<span class="sourceLineNo">5653</span> } finally {<a name="line.5653"></a>
-<span class="sourceLineNo">5654</span> if (copyCellsFromSharedMem) {<a name="line.5654"></a>
-<span class="sourceLineNo">5655</span> // In case of copyCellsFromSharedMem==true (where the CPs wrap a scanner) we return<a name="line.5655"></a>
-<span class="sourceLineNo">5656</span> // the blocks then and there (for wrapped CPs)<a name="line.5656"></a>
-<span class="sourceLineNo">5657</span> this.shipped();<a name="line.5657"></a>
-<span class="sourceLineNo">5658</span> }<a name="line.5658"></a>
-<span class="sourceLineNo">5659</span> }<a name="line.5659"></a>
-<span class="sourceLineNo">5660</span> return moreValues;<a name="line.5660"></a>
-<span class="sourceLineNo">5661</span> }<a name="line.5661"></a>
-<span class="sourceLineNo">5662</span><a name="line.5662"></a>
-<span class="sourceLineNo">5663</span> /**<a name="line.5663"></a>
-<span class="sourceLineNo">5664</span> * @return true if more cells exist after this batch, false if scanner is done<a name="line.5664"></a>
-<span class="sourceLineNo">5665</span> */<a name="line.5665"></a>
-<span class="sourceLineNo">5666</span> private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)<a name="line.5666"></a>
-<span class="sourceLineNo">5667</span> throws IOException {<a name="line.5667"></a>
-<span class="sourceLineNo">5668</span> assert joinedContinuationRow != null;<a name="line.5668"></a>
-<span class="sourceLineNo">5669</span> boolean moreValues = populateResult(results, this.joinedHeap, scannerContext,<a name="line.5669"></a>
-<span class="sourceLineNo">5670</span> joinedContinuationRow);<a name="line.5670"></a>
-<span class="sourceLineNo">5671</span><a name="line.5671"></a>
-<span class="sourceLineNo">5672</span> if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.5672"></a>
-<span class="sourceLineNo">5673</span> // We are done with this row, reset the continuation.<a name="line.5673"></a>
-<span class="sourceLineNo">5674</span> joinedContinuationRow = null;<a name="line.5674"></a>
-<span class="sourceLineNo">5675</span> }<a name="line.5675"></a>
-<span class="sourceLineNo">5676</span> // As the data is obtained from two independent heaps, we need to<a name="line.5676"></a>
-<span class="sourceLineNo">5677</span> // ensure that result list is sorted, because Result relies on that.<a name="line.5677"></a>
-<span class="sourceLineNo">5678</span> sort(results, comparator);<a name="line.5678"></a>
-<span class="sourceLineNo">5679</span> return moreValues;<a name="line.5679"></a>
-<span class="sourceLineNo">5680</span> }<a name="line.5680"></a>
-<span class="sourceLineNo">5681</span><a name="line.5681"></a>
-<span class="sourceLineNo">5682</span> /**<a name="line.5682"></a>
-<span class="sourceLineNo">5683</span> * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is<a name="line.5683"></a>
-<span class="sourceLineNo">5684</span> * reached, or remainingResultSize (if not -1) is reaced<a name="line.5684"></a>
-<span class="sourceLineNo">5685</span> * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.<a name="line.5685"></a>
-<span class="sourceLineNo">5686</span> * @param scannerContext<a name="line.5686"></a>
-<span class="sourceLineNo">5687</span> * @param currentRowCell<a name="line.5687"></a>
-<span class="sourceLineNo">5688</span> * @return state of last call to {@link KeyValueHeap#next()}<a name="line.5688"></a>
-<span class="sourceLineNo">5689</span> */<a name="line.5689"></a>
-<span class="sourceLineNo">5690</span> private boolean populateResult(List<Cell> results, KeyValueHeap heap,<a name="line.5690"></a>
-<span class="sourceLineNo">5691</span> ScannerContext scannerContext, Cell currentRowCell) throws IOException {<a name="line.5691"></a>
-<span class="sourceLineNo">5692</span> Cell nextKv;<a name="line.5692"></a>
-<span class="sourceLineNo">5693</span> boolean moreCellsInRow = false;<a name="line.5693"></a>
-<span class="sourceLineNo">5694</span> boolean tmpKeepProgress = scannerContext.getKeepProgress();<a name="line.5694"></a>
-<span class="sourceLineNo">5695</span> // Scanning between column families and thus the scope is between cells<a name="line.5695"></a>
-<span class="sourceLineNo">5696</span> LimitScope limitScope = LimitScope.BETWEEN_CELLS;<a name="line.5696"></a>
-<span class="sourceLineNo">5697</span> try {<a name="line.5697"></a>
-<span class="sourceLineNo">5698</span> do {<a name="line.5698"></a>
-<span class="sourceLineNo">5699</span> // We want to maintain any progress that is made towards the limits while scanning across<a name="line.5699"></a>
-<span class="sourceLineNo">5700</span> // different column families. To do this, we toggle the keep progress flag on during calls<a name="line.5700"></a>
-<span class="sourceLineNo">5701</span> // to the StoreScanner to ensure that any progress made thus far is not wiped away.<a name="line.5701"></a>
-<span class="sourceLineNo">5702</span> scannerContext.setKeepProgress(true);<a name="line.5702"></a>
-<span class="sourceLineNo">5703</span> heap.next(results, scannerContext);<a name="line.5703"></a>
-<span class="sourceLineNo">5704</span> scannerContext.setKeepProgress(tmpKeepProgress);<a name="line.5704"></a>
-<span class="sourceLineNo">5705</span><a name="line.5705"></a>
-<span class="sourceLineNo">5706</span> nextKv = heap.peek();<a name="line.5706"></a>
-<span class="sourceLineNo">5707</span> moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);<a name="line.5707"></a>
-<span class="sourceLineNo">5708</span> if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);<a name="line.5708"></a>
-<span class="sourceLineNo">5709</span> if (scannerContext.checkBatchLimit(limitScope)) {<a name="line.5709"></a>
-<span class="sourceLineNo">5710</span> return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();<a name="line.5710"></a>
-<span class="sourceLineNo">5711</span> } else if (scannerContext.checkSizeLimit(limitScope)) {<a name="line.5711"></a>
-<span class="sourceLineNo">5712</span> ScannerContext.NextState state =<a name="line.5712"></a>
-<span class="sourceLineNo">5713</span> moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;<a name="line.5713"></a>
-<span class="sourceLineNo">5714</span> return scannerContext.setScannerState(state).hasMoreValues();<a name="line.5714"></a>
-<span class="sourceLineNo">5715</span> } else if (scannerContext.checkTimeLimit(limitScope)) {<a name="line.5715"></a>
-<span class="sourceLineNo">5716</span> ScannerContext.NextState state =<a name="line.5716"></a>
-<span class="sourceLineNo">5717</span> moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;<a name="line.5717"></a>
-<span class="sourceLineNo">5718</span> return scannerContext.setScannerState(state).hasMoreValues();<a name="line.5718"></a>
-<span class="sourceLineNo">5719</span> }<a name="line.5719"></a>
-<span class="sourceLineNo">5720</span> } while (moreCellsInRow);<a name="line.5720"></a>
-<span class="sourceLineNo">5721</span> } catch (FileNotFoundException e) {<a name="line.5721"></a>
-<span class="sourceLineNo">5722</span> throw handleFileNotFound(e);<a name="line.5722"></a>
-<span class="sourceLineNo">5723</span> }<a name="line.5723"></a>
-<span class="sourceLineNo">5724</span> return nextKv != null;<a name="line.5724"></a>
-<span class="sourceLineNo">5725</span> }<a name="line.5725"></a>
-<span class="sourceLineNo">5726</span><a name="line.5726"></a>
-<span class="sourceLineNo">5727</span> /**<a name="line.5727"></a>
-<span class="sourceLineNo">5728</span> * Based on the nextKv in the heap, and the current row, decide whether or not there are more<a name="line.5728"></a>
-<span class="sourceLineNo">5729</span> * cells to be read in the heap. If the row of the nextKv in the heap matches the current row<a name="line.5729"></a>
-<span class="sourceLineNo">5730</span> * then there are more cells to be read in the row.<a name="line.5730"></a>
-<span class="sourceLineNo">5731</span> * @param nextKv<a name="line.5731"></a>
-<span class="sourceLineNo">5732</span> * @param currentRowCell<a name="line.5732"></a>
-<span class="sourceLineNo">5733</span> * @return true When there are more cells in the row to be read<a name="line.5733"></a>
-<span class="sourceLineNo">5734</span> */<a name="line.5734"></a>
-<span class="sourceLineNo">5735</span> private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {<a name="line.5735"></a>
-<span class="sourceLineNo">5736</span> return nextKv != null && CellUtil.matchingRow(nextKv, currentRowCell);<a name="line.5736"></a>
-<span class="sourceLineNo">5737</span> }<a name="line.5737"></a>
-<span class="sourceLineNo">5738</span><a name="line.5738"></a>
-<span class="sourceLineNo">5739</span> /*<a name="line.5739"></a>
-<span class="sourceLineNo">5740</span> * @return True if a filter rules the scanner is over, done.<a name="line.5740"></a>
-<span class="sourceLineNo">5741</span> */<a name="line.5741"></a>
-<span class="sourceLineNo">5742</span> @Override<a name="line.5742"></a>
-<span class="sourceLineNo">5743</span> public synchronized boolean isFilterDone() throws IOException {<a name="line.5743"></a>
-<span class="sourceLineNo">5744</span> return isFilterDoneInternal();<a name="line.5744"></a>
-<span class="sourceLineNo">5745</span> }<a name="line.5745"></a>
-<span class="sourceLineNo">5746</span><a name="line.5746"></a>
-<span class="sourceLineNo">5747</span> private boolean isFilterDoneInternal() throws IOException {<a name="line.5747"></a>
-<span class="sourceLineNo">5748</span> return this.filter != null && this.filter.filterAllRemaining();<a name="line.5748"></a>
-<span class="sourceLineNo">5749</span> }<a name="line.5749"></a>
-<span class="sourceLineNo">5750</span><a name="line.5750"></a>
-<span class="sourceLineNo">5751</span> private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)<a name="line.5751"></a>
-<span class="sourceLineNo">5752</span> throws IOException {<a name="line.5752"></a>
-<span class="sourceLineNo">5753</span> if (!results.isEmpty()) {<a name="line.5753"></a>
-<span class="sourceLineNo">5754</span> throw new IllegalArgumentException("First parameter should be an empty list");<a name="line.5754"></a>
-<span class="sourceLineNo">5755</span> }<a name="line.5755"></a>
-<span class="sourceLineNo">5756</span> if (scannerContext == null) {<a name="line.5756"></a>
-<span class="sourceLineNo">5757</span> throw new IllegalArgumentException("Scanner context cannot be null");<a name="line.5757"></a>
+<span class="sourceLineNo">5300</span> boolean isSuccessful = false;<a name="line.5300"></a>
+<span class="sourceLineNo">5301</span> try {<a name="line.5301"></a>
+<span class="sourceLineNo">5302</span> this.writeRequestsCount.increment();<a name="line.5302"></a>
+<span class="sourceLineNo">5303</span><a name="line.5303"></a>
+<span class="sourceLineNo">5304</span> // There possibly was a split that happened between when the split keys<a name="line.5304"></a>
+<span class="sourceLineNo">5305</span> // were gathered and before the HRegion's write lock was taken. We need<a name="line.5305"></a>
+<span class="sourceLineNo">5306</span> // to validate the HFile region before attempting to bulk load all of them<a name="line.5306"></a>
+<span class="sourceLineNo">5307</span> List<IOException> ioes = new ArrayList<IOException>();<a name="line.5307"></a>
+<span class="sourceLineNo">5308</span> List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();<a name="line.5308"></a>
+<span class="sourceLineNo">5309</span> for (Pair<byte[], String> p : familyPaths) {<a name="line.5309"></a>
+<span class="sourceLineNo">5310</span> byte[] familyName = p.getFirst();<a name="line.5310"></a>
+<span class="sourceLineNo">5311</span> String path = p.getSecond();<a name="line.5311"></a>
+<span class="sourceLineNo">5312</span><a name="line.5312"></a>
+<span class="sourceLineNo">5313</span> Store store = getStore(familyName);<a name="line.5313"></a>
+<span class="sourceLineNo">5314</span> if (store == null) {<a name="line.5314"></a>
+<span class="sourceLineNo">5315</span> IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(<a name="line.5315"></a>
+<span class="sourceLineNo">5316</span> "No such column family " + Bytes.toStringBinary(familyName));<a name="line.5316"></a>
+<span class="sourceLineNo">5317</span> ioes.add(ioe);<a name="line.5317"></a>
+<span class="sourceLineNo">5318</span> } else {<a name="line.5318"></a>
+<span class="sourceLineNo">5319</span> try {<a name="line.5319"></a>
+<span class="sourceLineNo">5320</span> store.assertBulkLoadHFileOk(new Path(path));<a name="line.5320"></a>
+<span class="sourceLineNo">5321</span> } catch (WrongRegionException wre) {<a name="line.5321"></a>
+<span class="sourceLineNo">5322</span> // recoverable (file doesn't fit in region)<a name="line.5322"></a>
+<span class="sourceLineNo">5323</span> failures.add(p);<a name="line.5323"></a>
+<span class="sourceLineNo">5324</span> } catch (IOException ioe) {<a name="line.5324"></a>
+<span class="sourceLineNo">5325</span> // unrecoverable (hdfs problem)<a name="line.5325"></a>
+<span class="sourceLineNo">5326</span> ioes.add(ioe);<a name="line.5326"></a>
+<span class="sourceLineNo">5327</span> }<a name="line.5327"></a>
+<span class="sourceLineNo">5328</span> }<a name="line.5328"></a>
+<span class="sourceLineNo">5329</span> }<a name="line.5329"></a>
+<span class="sourceLineNo">5330</span><a name="line.5330"></a>
+<span class="sourceLineNo">5331</span> // validation failed because of some sort of IO problem.<a name="line.5331"></a>
+<span class="sourceLineNo">5332</span> if (ioes.size() != 0) {<a name="line.5332"></a>
+<span class="sourceLineNo">5333</span> IOException e = MultipleIOException.createIOException(ioes);<a name="line.5333"></a>
+<span class="sourceLineNo">5334</span> LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);<a name="line.5334"></a>
+<span class="sourceLineNo">5335</span> throw e;<a name="line.5335"></a>
+<span class="sourceLineNo">5336</span> }<a name="line.5336"></a>
+<span class="sourceLineNo">5337</span><a name="line.5337"></a>
+<span class="sourceLineNo">5338</span> // validation failed, bail out before doing anything permanent.<a name="line.5338"></a>
+<span class="sourceLineNo">5339</span> if (failures.size() != 0) {<a name="line.5339"></a>
+<span class="sourceLineNo">5340</span> StringBuilder list = new StringBuilder();<a name="line.5340"></a>
+<span class="sourceLineNo">5341</span> for (Pair<byte[], String> p : failures) {<a name="line.5341"></a>
+<span class="sourceLineNo">5342</span> list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")<a name="line.5342"></a>
+<span class="sourceLineNo">5343</span> .append(p.getSecond());<a name="line.5343"></a>
+<span class="sourceLineNo">5344</span> }<a name="line.5344"></a>
+<span class="sourceLineNo">5345</span> // problem when validating<a name="line.5345"></a>
+<span class="sourceLineNo">5346</span> LOG.warn("There was a recoverable bulk load failure likely due to a" +<a name="line.5346"></a>
+<span class="sourceLineNo">5347</span> " split. These (family, HFile) pairs were not loaded: " + list);<a name="line.5347"></a>
+<span class="sourceLineNo">5348</span> return isSuccessful;<a name="line.5348"></a>
+<span class="sourceLineNo">5349</span> }<a name="line.5349"></a>
+<span class="sourceLineNo">5350</span><a name="line.5350"></a>
+<span class="sourceLineNo">5351</span> // We need to assign a sequential ID that's in between two memstores in order to preserve<a name="line.5351"></a>
+<span class="sourceLineNo">5352</span> // the guarantee that all the edits lower than the highest sequential ID from all the<a name="line.5352"></a>
+<span class="sourceLineNo">5353</span> // HFiles are flushed on disk. See HBASE-10958. The sequence id returned when we flush is<a name="line.5353"></a>
+<span class="sourceLineNo">5354</span> // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is<a name="line.5354"></a>
+<span class="sourceLineNo">5355</span> // a sequence id that we can be sure is beyond the last hfile written).<a name="line.5355"></a>
+<span class="sourceLineNo">5356</span> if (assignSeqId) {<a name="line.5356"></a>
+<span class="sourceLineNo">5357</span> FlushResult fs = flushcache(true, false);<a name="line.5357"></a>
+<span class="sourceLineNo">5358</span> if (fs.isFlushSucceeded()) {<a name="line.5358"></a>
+<span class="sourceLineNo">5359</span> seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5359"></a>
+<span class="sourceLineNo">5360</span> } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {<a name="line.5360"></a>
+<span class="sourceLineNo">5361</span> seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5361"></a>
+<span class="sourceLineNo">5362</span> } else {<a name="line.5362"></a>
+<span class="sourceLineNo">5363</span> throw new IOException("Could not bulk load with an assigned sequential ID because the "+<a name="line.5363"></a>
+<span class="sourceLineNo">5364</span> "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);<a name="line.5364"></a>
+<span class="sourceLineNo">5365</span> }<a name="line.5365"></a>
+<span class="sourceLineNo">5366</span> }<a name="line.5366"></a>
+<span class="sourceLineNo">5367</span><a name="line.5367"></a>
+<span class="sourceLineNo">5368</span> for (Pair<byte[], String> p : familyPaths) {<a name="line.5368"></a>
+<span class="sourceLineNo">5369</span> byte[] familyName = p.getFirst();<a name="line.5369"></a>
+<span class="sourceLineNo">5370</span> String path = p.getSecond();<a name="line.5370"></a>
+<span class="sourceLineNo">5371</span> Store store = getStore(familyName);<a name="line.5371"></a>
+<span class="sourceLineNo">5372</span> try {<a name="line.5372"></a>
+<span class="sourceLineNo">5373</span> String finalPath = path;<a name="line.5373"></a>
+<span class="sourceLineNo">5374</span> if (bulkLoadListener != null) {<a name="line.5374"></a>
+<span class="sourceLineNo">5375</span> finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);<a name="line.5375"></a>
+<span class="sourceLineNo">5376</span> }<a name="line.5376"></a>
+<span class="sourceLineNo">5377</span> Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);<a name="line.5377"></a>
+<span class="sourceLineNo">5378</span><a name="line.5378"></a>
+<span class="sourceLineNo">5379</span> if(storeFiles.containsKey(familyName)) {<a name="line.5379"></a>
+<span class="sourceLineNo">5380</span> storeFiles.get(familyName).add(commitedStoreFile);<a name="line.5380"></a>
+<span class="sourceLineNo">5381</span> } else {<a name="line.5381"></a>
+<span class="sourceLineNo">5382</span> List<Path> storeFileNames = new ArrayList<Path>();<a name="line.5382"></a>
+<span class="sourceLineNo">5383</span> storeFileNames.add(commitedStoreFile);<a name="line.5383"></a>
+<span class="sourceLineNo">5384</span> storeFiles.put(familyName, storeFileNames);<a name="line.5384"></a>
+<span class="sourceLineNo">5385</span> }<a name="line.5385"></a>
+<span class="sourceLineNo">5386</span> if (bulkLoadListener != null) {<a name="line.5386"></a>
+<span class="sourceLineNo">5387</span> bulkLoadListener.doneBulkLoad(familyName, path);<a name="line.5387"></a>
+<span class="sourceLineNo">5388</span> }<a name="line.5388"></a>
+<span class="sourceLineNo">5389</span> } catch (IOException ioe) {<a name="line.5389"></a>
+<span class="sourceLineNo">5390</span> // A failure here can cause an atomicity violation that we currently<a name="line.5390"></a>
+<span class="sourceLineNo">5391</span> // cannot recover from since it is likely a failed HDFS operation.<a name="line.5391"></a>
+<span class="sourceLineNo">5392</span><a name="line.5392"></a>
+<span class="sourceLineNo">5393</span> // TODO Need a better story for reverting partial failures due to HDFS.<a name="line.5393"></a>
+<span class="sourceLineNo">5394</span> LOG.error("There was a partial failure due to IO when attempting to" +<a name="line.5394"></a>
+<span class="sourceLineNo">5395</span> " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);<a name="line.5395"></a>
+<span class="sourceLineNo">5396</span> if (bulkLoadListener != null) {<a name="line.5396"></a>
+<span class="sourceLineNo">5397</span> try {<a name="line.5397"></a>
+<span class="sourceLineNo">5398</span> bulkLoadListener.failedBulkLoad(familyName, path);<a name="line.5398"></a>
+<span class="sourceLineNo">5399</span> } catch (Exception ex) {<a name="line.5399"></a>
+<span class="sourceLineNo">5400</span> LOG.error("Error while calling failedBulkLoad for family " +<a name="line.5400"></a>
+<span class="sourceLineNo">5401</span> Bytes.toString(familyName) + " with path " + path, ex);<a name="line.5401"></a>
+<span class="sourceLineNo">5402</span> }<a name="line.5402"></a>
+<span class="sourceLineNo">5403</span> }<a name="line.5403"></a>
+<span class="sourceLineNo">5404</span> throw ioe;<a name="line.5404"></a>
+<span class="sourceLineNo">5405</span> }<a name="line.5405"></a>
+<span class="sourceLineNo">5406</span> }<a name="line.5406"></a>
+<span class="sourceLineNo">5407</span><a name="line.5407"></a>
+<span class="sourceLineNo">5408</span> isSuccessful = true;<a name="line.5408"></a>
+<span class="sourceLineNo">5409</span> } finally {<a name="line.5409"></a>
+<span class="sourceLineNo">5410</span> if (wal != null && !storeFiles.isEmpty()) {<a name="line.5410"></a>
+<span class="sourceLineNo">5411</span> // Write a bulk load event for hfiles that are loaded<a name="line.5411"></a>
+<span class="sourceLineNo">5412</span> try {<a name="line.5412"></a>
+<span class="sourceLineNo">5413</span> WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(<a name="line.5413"></a>
+<span class="sourceLineNo">5414</span> this.getRegionInfo().getTable(),<a name="line.5414"></a>
+<span class="sourceLineNo">5415</span> ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);<a name="line.5415"></a>
+<span class="sourceLineNo">5416</span> WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),<a name="line.5416"></a>
+<span class="sourceLineNo">5417</span> loadDescriptor, mvcc);<a name="line.5417"></a>
+<span class="sourceLineNo">5418</span> } catch (IOException ioe) {<a name="line.5418"></a>
+<span class="sourceLineNo">5419</span> if (this.rsServices != null) {<a name="line.5419"></a>
+<span class="sourceLineNo">5420</span> // Have to abort region server because some hfiles has been loaded but we can't write<a name="line.5420"></a>
+<span class="sourceLineNo">5421</span> // the event into WAL<a name="line.5421"></a>
+<span class="sourceLineNo">5422</span> isSuccessful = false;<a name="line.5422"></a>
+<span class="sourceLineNo">5423</span> this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);<a name="line.5423"></a>
+<span class="sourceLineNo">5424</span> }<a name="line.5424"></a>
+<span class="sourceLineNo">5425</span> }<a name="line.5425"></a>
+<span class="sourceLineNo">5426</span> }<a name="line.5426"></a>
+<span class="sourceLineNo">5427</span><a name="line.5427"></a>
+<span class="sourceLineNo">5428</span> closeBulkRegionOperation();<a name="line.5428"></a>
+<span class="sourceLineNo">5429</span> }<a name="line.5429"></a>
+<span class="sourceLineNo">5430</span> return isSuccessful;<a name="line.5430"></a>
+<span class="sourceLineNo">5431</span> }<a name="line.5431"></a>
+<span class="sourceLineNo">5432</span><a name="line.5432"></a>
+<span class="sourceLineNo">5433</span> @Override<a name="line.5433"></a>
+<span class="sourceLineNo">5434</span> public boolean equals(Object o) {<a name="line.5434"></a>
+<span class="sourceLineNo">5435</span> return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(),<a name="line.5435"></a>
+<span class="sourceLineNo">5436</span> ((HRegion) o).getRegionInfo().getRegionName());<a name="line.5436"></a>
+<span class="sourceLineNo">5437</span> }<a name="line.5437"></a>
+<span class="sourceLineNo">5438</span><a name="line.5438"></a>
+<span class="sourceLineNo">5439</span> @Override<a name="line.5439"></a>
+<span class="sourceLineNo">5440</span> public int hashCode() {<a name="line.5440"></a>
+<span class="sourceLineNo">5441</span> return Bytes.hashCode(getRegionInfo().getRegionName());<a name="line.5441"></a>
+<span class="sourceLineNo">5442</span> }<a name="line.5442"></a>
+<span class="sourceLineNo">5443</span><a name="line.5443"></a>
+<span class="sourceLineNo">5444</span> @Override<a name="line.5444"></a>
+<span class="sourceLineNo">5445</span> public String toString() {<a name="line.5445"></a>
+<span class="sourceLineNo">5446</span> return getRegionInfo().getRegionNameAsString();<a name="line.5446"></a>
+<span class="sourceLineNo">5447</span> }<a name="line.5447"></a>
+<span class="sourceLineNo">5448</span><a name="line.5448"></a>
+<span class="sourceLineNo">5449</span> /**<a name="line.5449"></a>
+<span class="sourceLineNo">5450</span> * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).<a name="line.5450"></a>
+<span class="sourceLineNo">5451</span> */<a name="line.5451"></a>
+<span class="sourceLineNo">5452</span> class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {<a name="line.5452"></a>
+<span class="sourceLineNo">5453</span> // Package local for testability<a name="line.5453"></a>
+<span class="sourceLineNo">5454</span> KeyValueHeap storeHeap = null;<a name="line.5454"></a>
+<span class="sourceLineNo">5455</span> /** Heap of key-values that are not essential for the provided filters and are thus read<a name="line.5455"></a>
+<span class="sourceLineNo">5456</span> * on demand, if on-demand column family loading is enabled.*/<a name="line.5456"></a>
+<span class="sourceLineNo">5457</span> KeyValueHeap joinedHeap = null;<a name="line.5457"></a>
+<span class="sourceLineNo">5458</span> /**<a name="line.5458"></a>
+<span class="sourceLineNo">5459</span> * If the joined heap data gathering is interrupted due to scan limits, this will<a name="line.5459"></a>
+<span class="sourceLineNo">5460</span> * contain the row for which we are populating the values.*/<a name="line.5460"></a>
+<span class="sourceLineNo">5461</span> protected Cell joinedContinuationRow = null;<a name="line.5461"></a>
+<span class="sourceLineNo">5462</span> private boolean filterClosed = false;<a name="line.5462"></a>
+<span class="sourceLineNo">5463</span><a name="line.5463"></a>
+<span class="sourceLineNo">5464</span> protected final int isScan;<a name="line.5464"></a>
+<span class="sourceLineNo">5465</span> protected final byte[] stopRow;<a name="line.5465"></a>
+<span class="sourceLineNo">5466</span> protected final HRegion region;<a name="line.5466"></a>
+<span class="sourceLineNo">5467</span> protected final CellComparator comparator;<a name="line.5467"></a>
+<span class="sourceLineNo">5468</span> protected boolean copyCellsFromSharedMem = false;<a name="line.5468"></a>
+<span class="sourceLineNo">5469</span><a name="line.5469"></a>
+<span class="sourceLineNo">5470</span> private final long readPt;<a name="line.5470"></a>
+<span class="sourceLineNo">5471</span> private final long maxResultSize;<a name="line.5471"></a>
+<span class="sourceLineNo">5472</span> private final ScannerContext defaultScannerContext;<a name="line.5472"></a>
+<span class="sourceLineNo">5473</span> private final FilterWrapper filter;<a name="line.5473"></a>
+<span class="sourceLineNo">5474</span><a name="line.5474"></a>
+<span class="sourceLineNo">5475</span> @Override<a name="line.5475"></a>
+<span class="sourceLineNo">5476</span> public HRegionInfo getRegionInfo() {<a name="line.5476"></a>
+<span class="sourceLineNo">5477</span> return region.getRegionInfo();<a name="line.5477"></a>
+<span class="sourceLineNo">5478</span> }<a name="line.5478"></a>
+<span class="sourceLineNo">5479</span><a name="line.5479"></a>
+<span class="sourceLineNo">5480</span> public void setCopyCellsFromSharedMem(boolean copyCells) {<a name="line.5480"></a>
+<span class="sourceLineNo">5481</span> this.copyCellsFromSharedMem = copyCells;<a name="line.5481"></a>
+<span class="sourceLineNo">5482</span> }<a name="line.5482"></a>
+<span class="sourceLineNo">5483</span><a name="line.5483"></a>
+<span class="sourceLineNo">5484</span> RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,<a name="line.5484"></a>
+<span class="sourceLineNo">5485</span> boolean copyCellsFromSharedMem)<a name="line.5485"></a>
+<span class="sourceLineNo">5486</span> throws IOException {<a name="line.5486"></a>
+<span class="sourceLineNo">5487</span> this.region = region;<a name="line.5487"></a>
+<span class="sourceLineNo">5488</span> this.maxResultSize = scan.getMaxResultSize();<a name="line.5488"></a>
+<span class="sourceLineNo">5489</span> if (scan.hasFilter()) {<a name="line.5489"></a>
+<span class="sourceLineNo">5490</span> this.filter = new FilterWrapper(scan.getFilter());<a name="line.5490"></a>
+<span class="sourceLineNo">5491</span> } else {<a name="line.5491"></a>
+<span class="sourceLineNo">5492</span> this.filter = null;<a name="line.5492"></a>
+<span class="sourceLineNo">5493</span> }<a name="line.5493"></a>
+<span class="sourceLineNo">5494</span> this.comparator = region.getCellCompartor();<a name="line.5494"></a>
+<span class="sourceLineNo">5495</span> /**<a name="line.5495"></a>
+<span class="sourceLineNo">5496</span> * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default<a name="line.5496"></a>
+<span class="sourceLineNo">5497</span> * scanner context that can be used to enforce the batch limit in the event that a<a name="line.5497"></a>
+<span class="sourceLineNo">5498</span> * ScannerContext is not specified during an invocation of next/nextRaw<a name="line.5498"></a>
+<span class="sourceLineNo">5499</span> */<a name="line.5499"></a>
+<span class="sourceLineNo">5500</span> defaultScannerContext = ScannerContext.newBuilder()<a name="line.5500"></a>
+<span class="sourceLineNo">5501</span> .setBatchLimit(scan.getBatch()).build();<a name="line.5501"></a>
+<span class="sourceLineNo">5502</span><a name="line.5502"></a>
+<span class="sourceLineNo">5503</span> if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {<a name="line.5503"></a>
+<span class="sourceLineNo">5504</span> this.stopRow = null;<a name="line.5504"></a>
+<span class="sourceLineNo">5505</span> } else {<a name="line.5505"></a>
+<span class="sourceLineNo">5506</span> this.stopRow = scan.getStopRow();<a name="line.5506"></a>
+<span class="sourceLineNo">5507</span> }<a name="line.5507"></a>
+<span class="sourceLineNo">5508</span> // If we are doing a get, we want to be [startRow,endRow]. Normally<a name="line.5508"></a>
+<span class="sourceLineNo">5509</span> // it is [startRow,endRow) and if startRow=endRow we get nothing.<a name="line.5509"></a>
+<span class="sourceLineNo">5510</span> this.isScan = scan.isGetScan() ? 1 : 0;<a name="line.5510"></a>
+<span class="sourceLineNo">5511</span><a name="line.5511"></a>
+<span class="sourceLineNo">5512</span> // synchronize on scannerReadPoints so that nobody calculates<a name="line.5512"></a>
+<span class="sourceLineNo">5513</span> // getSmallestReadPoint, before scannerReadPoints is updated.<a name="line.5513"></a>
+<span class="sourceLineNo">5514</span> IsolationLevel isolationLevel = scan.getIsolationLevel();<a name="line.5514"></a>
+<span class="sourceLineNo">5515</span> synchronized(scannerReadPoints) {<a name="line.5515"></a>
+<span class="sourceLineNo">5516</span> this.readPt = getReadPoint(isolationLevel);<a name="line.5516"></a>
+<span class="sourceLineNo">5517</span> scannerReadPoints.put(this, this.readPt);<a name="line.5517"></a>
+<span class="sourceLineNo">5518</span> }<a name="line.5518"></a>
+<span class="sourceLineNo">5519</span><a name="line.5519"></a>
+<span class="sourceLineNo">5520</span> // Here we separate all scanners into two lists - scanner that provide data required<a name="line.5520"></a>
+<span class="sourceLineNo">5521</span> // by the filter to operate (scanners list) and all others (joinedScanners list).<a name="line.5521"></a>
+<span class="sourceLineNo">5522</span> List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());<a name="line.5522"></a>
+<span class="sourceLineNo">5523</span> List<KeyValueScanner> joinedScanners<a name="line.5523"></a>
+<span class="sourceLineNo">5524</span> = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());<a name="line.5524"></a>
+<span class="sourceLineNo">5525</span> if (additionalScanners != null) {<a name="line.5525"></a>
+<span class="sourceLineNo">5526</span> scanners.addAll(additionalScanners);<a name="line.5526"></a>
+<span class="sourceLineNo">5527</span> }<a name="line.5527"></a>
+<span class="sourceLineNo">5528</span><a name="line.5528"></a>
+<span class="sourceLineNo">5529</span> for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {<a name="line.5529"></a>
+<span class="sourceLineNo">5530</span> Store store = stores.get(entry.getKey());<a name="line.5530"></a>
+<span class="sourceLineNo">5531</span> KeyValueScanner scanner;<a name="line.5531"></a>
+<span class="sourceLineNo">5532</span> try {<a name="line.5532"></a>
+<span class="sourceLineNo">5533</span> scanner = store.getScanner(scan, entry.getValue(), this.readPt);<a name="line.5533"></a>
+<span class="sourceLineNo">5534</span> } catch (FileNotFoundException e) {<a name="line.5534"></a>
+<span class="sourceLineNo">5535</span> throw handleFileNotFound(e);<a name="line.5535"></a>
+<span class="sourceLineNo">5536</span> }<a name="line.5536"></a>
+<span class="sourceLineNo">5537</span> if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()<a name="line.5537"></a>
+<span class="sourceLineNo">5538</span> || this.filter.isFamilyEssential(entry.getKey())) {<a name="line.5538"></a>
+<span class="sourceLineNo">5539</span> scanners.add(scanner);<a name="line.5539"></a>
+<span class="sourceLineNo">5540</span> } else {<a name="line.5540"></a>
+<span class="sourceLineNo">5541</span> joinedScanners.add(scanner);<a name="line.5541"></a>
+<span class="sourceLineNo">5542</span> }<a name="line.5542"></a>
+<span class="sourceLineNo">5543</span> }<a name="line.5543"></a>
+<span class="sourceLineNo">5544</span> this.copyCellsFromSharedMem = copyCellsFromSharedMem;<a name="line.5544"></a>
+<span class="sourceLineNo">5545</span> initializeKVHeap(scanners, joinedScanners, region);<a name="line.5545"></a>
+<span class="sourceLineNo">5546</span> }<a name="line.5546"></a>
+<span class="sourceLineNo">5547</span><a name="line.5547"></a>
+<span class="sourceLineNo">5548</span> protected void initializeKVHeap(List<KeyValueScanner> scanners,<a name="line.5548"></a>
+<span class="sourceLineNo">5549</span> List<KeyValueScanner> joinedScanners, HRegion region)<a name="line.5549"></a>
+<span class="sourceLineNo">5550</span> throws IOException {<a name="line.5550"></a>
+<span class="sourceLineNo">5551</span> this.storeHeap = new KeyValueHeap(scanners, comparator);<a name="line.5551"></a>
+<span class="sourceLineNo">5552</span> if (!joinedScanners.isEmpty()) {<a name="line.5552"></a>
+<span class="sourceLineNo">5553</span> this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);<a name="line.5553"></a>
+<span class="sourceLineNo">5554</span> }<a name="line.5554"></a>
+<span class="sourceLineNo">5555</span> }<a name="line.5555"></a>
+<span class="sourceLineNo">5556</span><a name="line.5556"></a>
+<span class="sourceLineNo">5557</span> @Override<a name="line.5557"></a>
+<span class="sourceLineNo">5558</span> public long getMaxResultSize() {<a name="line.5558"></a>
+<span class="sourceLineNo">5559</span> return maxResultSize;<a name="line.5559"></a>
+<span class="sourceLineNo">5560</span> }<a name="line.5560"></a>
+<span class="sourceLineNo">5561</span><a name="line.5561"></a>
+<span class="sourceLineNo">5562</span> @Override<a name="line.5562"></a>
+<span class="sourceLineNo">5563</span> public long getMvccReadPoint() {<a name="line.5563"></a>
+<span class="sourceLineNo">5564</span> return this.readPt;<a name="line.5564"></a>
+<span class="sourceLineNo">5565</span> }<a name="line.5565"></a>
+<span class="sourceLineNo">5566</span><a name="line.5566"></a>
+<span class="sourceLineNo">5567</span> @Override<a name="line.5567"></a>
+<span class="sourceLineNo">5568</span> public int getBatch() {<a name="line.5568"></a>
+<span class="sourceLineNo">5569</span> return this.defaultScannerContext.getBatchLimit();<a name="line.5569"></a>
+<span class="sourceLineNo">5570</span> }<a name="line.5570"></a>
+<span class="sourceLineNo">5571</span><a name="line.5571"></a>
+<span class="sourceLineNo">5572</span> /**<a name="line.5572"></a>
+<span class="sourceLineNo">5573</span> * Reset both the filter and the old filter.<a name="line.5573"></a>
+<span class="sourceLineNo">5574</span> *<a name="line.5574"></a>
+<span class="sourceLineNo">5575</span> * @throws IOException in case a filter raises an I/O exception.<a name="line.5575"></a>
+<span class="sourceLineNo">5576</span> */<a name="line.5576"></a>
+<span class="sourceLineNo">5577</span> protected void resetFilters() throws IOException {<a name="line.5577"></a>
+<span class="sourceLineNo">5578</span> if (filter != null) {<a name="line.5578"></a>
+<span class="sourceLineNo">5579</span> filter.reset();<a name="line.5579"></a>
+<span class="sourceLineNo">5580</span> }<a name="line.5580"></a>
+<span class="sourceLineNo">5581</span> }<a name="line.5581"></a>
+<span class="sourceLineNo">5582</span><a name="line.5582"></a>
+<span class="sourceLineNo">5583</span> @Override<a name="line.5583"></a>
+<span class="sourceLineNo">5584</span> public boolean next(List<Cell> outResults)<a name="line.5584"></a>
+<span class="sourceLineNo">5585</span> throws IOException {<a name="line.5585"></a>
+<span class="sourceLineNo">5586</span> // apply the batching limit by default<a name="line.5586"></a>
+<span class="sourceLineNo">5587</span> return next(outResults, defaultScannerContext);<a name="line.5587"></a>
+<span class="sourceLineNo">5588</span> }<a name="line.5588"></a>
+<span class="sourceLineNo">5589</span><a name="line.5589"></a>
+<span class="sourceLineNo">5590</span> @Override<a name="line.5590"></a>
+<span class="sourceLineNo">5591</span> public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)<a name="line.5591"></a>
+<span class="sourceLineNo">5592</span> throws IOException {<a name="line.5592"></a>
+<span class="sourceLineNo">5593</span> if (this.filterClosed) {<a name="line.5593"></a>
+<span class="sourceLineNo">5594</span> throw new UnknownScannerException("Scanner was closed (timed out?) " +<a name="line.5594"></a>
+<span class="sourceLineNo">5595</span> "after we renewed it. Could be caused by a very slow scanner " +<a name="line.5595"></a>
+<span class="sourceLineNo">5596</span> "or a lengthy garbage collection");<a name="line.5596"></a>
+<span class="sourceLineNo">5597</span> }<a name="line.5597"></a>
+<span class="sourceLineNo">5598</span> startRegionOperation(Operation.SCAN);<a name="line.5598"></a>
+<span class="sourceLineNo">5599</span> readRequestsCount.increment();<a name="line.5599"></a>
+<span class="sourceLineNo">5600</span> try {<a name="line.5600"></a>
+<span class="sourceLineNo">5601</span> return nextRaw(outResults, scannerContext);<a name="line.5601"></a>
+<span class="sourceLineNo">5602</span> } finally {<a name="line.5602"></a>
+<span class="sourceLineNo">5603</span> closeRegionOperation(Operation.SCAN);<a name="line.5603"></a>
+<span class="sourceLineNo">5604</span> }<a name="line.5604"></a>
+<span class="sourceLineNo">5605</span> }<a name="line.5605"></a>
+<span class="sourceLineNo">5606</span><a name="line.5606"></a>
+<span class="sourceLineNo">5607</span> @Override<a name="line.5607"></a>
+<span class="sourceLineNo">5608</span> public boolean nextRaw(List<Cell> outResults) throws IOException {<a name="line.5608"></a>
+<span class="sourceLineNo">5609</span> // Use the RegionScanner's context by default<a name="line.5609"></a>
+<span class="sourceLineNo">5610</span> return nextRaw(outResults, defaultScannerContext);<a name="line.5610"></a>
+<span class="sourceLineNo">5611</span> }<a name="line.5611"></a>
+<span class="sourceLineNo">5612</span><a name="line.5612"></a>
+<span class="sourceLineNo">5613</span> @Override<a name="line.5613"></a>
+<span class="sourceLineNo">5614</span> public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)<a name="line.5614"></a>
+<span class="sourceLineNo">5615</span> throws IOException {<a name="line.5615"></a>
+<span class="sourceLineNo">5616</span> if (storeHeap == null) {<a name="line.5616"></a>
+<span class="sourceLineNo">5617</span> // scanner is closed<a name="line.5617"></a>
+<span class="sourceLineNo">5618</span> throw new UnknownScannerException("Scanner was closed");<a name="line.5618"></a>
+<span class="sourceLineNo">5619</span> }<a name="line.5619"></a>
+<span class="sourceLineNo">5620</span> boolean moreValues = false;<a name="line.5620"></a>
+<span class="sourceLineNo">5621</span> try {<a name="line.5621"></a>
+<span class="sourceLineNo">5622</span> if (outResults.isEmpty()) {<a name="line.5622"></a>
+<span class="sourceLineNo">5623</span> // Usually outResults is empty. This is true when next is called<a name="line.5623"></a>
+<span class="sourceLineNo">5624</span> // to handle scan or get operation.<a name="line.5624"></a>
+<span class="sourceLineNo">5625</span> moreValues = nextInternal(outResults, scannerContext);<a name="line.5625"></a>
+<span class="sourceLineNo">5626</span> } else {<a name="line.5626"></a>
+<span class="sourceLineNo">5627</span> List<Cell> tmpList = new ArrayList<Cell>();<a name="line.5627"></a>
+<span class="sourceLineNo">5628</span> moreValues = nextInternal(tmpList, scannerContext);<a name="line.5628"></a>
+<span class="sourceLineNo">5629</span> outResults.addAll(tmpList);<a name="line.5629"></a>
+<span class="sourceLineNo">5630</span> }<a name="line.5630"></a>
+<span class="sourceLineNo">5631</span><a name="line.5631"></a>
+<span class="sourceLineNo">5632</span> // If the size limit was reached it means a partial Result is being<a name="line.5632"></a>
+<span class="sourceLineNo">5633</span> // returned. Returning a<a name="line.5633"></a>
+<span class="sourceLineNo">5634</span> // partial Result means that we should not reset the filters; filters<a name="line.5634"></a>
+<span class="sourceLineNo">5635</span> // should only be reset in<a name="line.5635"></a>
+<span class="sourceLineNo">5636</span> // between rows<a name="line.5636"></a>
+<span class="sourceLineNo">5637</span> if (!scannerContext.partialResultFormed()) resetFilters();<a name="line.5637"></a>
+<span class="sourceLineNo">5638</span><a name="line.5638"></a>
+<span class="sourceLineNo">5639</span> if (isFilterDoneInternal()) {<a name="line.5639"></a>
+<span class="sourceLineNo">5640</span> moreValues = false;<a name="line.5640"></a>
+<span class="sourceLineNo">5641</span> }<a name="line.5641"></a>
+<span class="sourceLineNo">5642</span><a name="line.5642"></a>
+<span class="sourceLineNo">5643</span> // If copyCellsFromSharedMem = true, then we need to copy the cells. Otherwise<a name="line.5643"></a>
+<span class="sourceLineNo">5644</span> // it is a call coming from the RsRpcServices.scan().<a name="line.5644"></a>
+<span class="sourceLineNo">5645</span> if (copyCellsFromSharedMem && !outResults.isEmpty()) {<a name="line.5645"></a>
+<span class="sourceLineNo">5646</span> // Do the copy of the results here.<a name="line.5646"></a>
+<span class="sourceLineNo">5647</span> ListIterator<Cell> listItr = outResults.listIterator();<a name="line.5647"></a>
+<span class="sourceLineNo">5648</span> Cell cell = null;<a name="line.5648"></a>
+<span class="sourceLineNo">5649</span> while (listItr.hasNext()) {<a name="line.5649"></a>
+<span class="sourceLineNo">5650</span> cell = listItr.next();<a name="line.5650"></a>
+<span class="sourceLineNo">5651</span> if (cell instanceof ShareableMemory) {<a name="line.5651"></a>
+<span class="sourceLineNo">5652</span> listItr.set(((ShareableMemory) cell).cloneToCell());<a name="line.5652"></a>
+<span class="sourceLineNo">5653</span> }<a name="line.5653"></a>
+<span class="sourceLineNo">5654</span> }<a name="line.5654"></a>
+<span class="sourceLineNo">5655</span> }<a name="line.5655"></a>
+<span class="sourceLineNo">5656</span> } finally {<a name="line.5656"></a>
+<span class="sourceLineNo">5657</span> if (copyCellsFromSharedMem) {<a name="line.5657"></a>
+<span class="sourceLineNo">5658</span> // In case of copyCellsFromSharedMem==true (where the CPs wrap a scanner) we return<a name="line.5658"></a>
+<span class="sourceLineNo">5659</span> // the blocks then and there (for wrapped CPs)<a name="line.5659"></a>
+<span class="sourceLineNo">5660</span> this.shipped();<a name="line.5660"></a>
+<span class="sourceLineNo">5661</span> }<a name="line.5661"></a>
+<span class="sourceLineNo">5662</span> }<a name="line.5662"></a>
+<span class="sourceLineNo">5663</span> return moreValues;<a name="line.5663"></a>
+<span class="sourceLineNo">5664</span> }<a name="line.5664"></a>
+<span class="sourceLineNo">5665</span><a name="line.5665"></a>
+<span class="sourceLineNo">5666</span> /**<a name="line.5666"></a>
+<span class="sourceLineNo">5667</span> * @return true if more cells exist after this batch, false if scanner is done<a name="line.5667"></a>
+<span class="sourceLineNo">5668</span> */<a name="line.5668"></a>
+<span class="sourceLineNo">5669</span> private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)<a name="line.5669"></a>
+<span class="sourceLineNo">5670</span> throws IOException {<a name="line.5670"></a>
+<span class="sourceLineNo">5671</span> assert joinedContinuationRow != null;<a name="line.5671"></a>
+<span class="sourceLineNo">5672</span> boolean moreValues = populateResult(results, this.joinedHeap, scannerContext,<a name="line.5672"></a>
+<span class="sourceLineNo">5673</span> joinedContinuationRow);<a name="line.5673"></a>
+<span class="sourceLineNo">5674</span><a name="line.5674"></a>
+<span class="sourceLineNo">5675</span> if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.5675"></a>
+<span class="sourceLineNo">5676</span> // We are done with this row, reset the continuation.<a name="line.5676"></a>
+<span class="sourceLineNo">5677</span> joinedContinuationRow = null;<a name="line.5677"></a>
+<span class="sourceLineNo">5678</span> }<a name="line.5678"></a>
+<span class="sourceLineNo">5679</span> // As the data is obtained from two independent heaps, we need to<a name="line.5679"></a>
+<span class="sourceLineNo">5680</span> // ensure that result list is sorted, because Result relies on that.<a name="line.5680"></a>
+<span class="sourceLineNo">5681</span> sort(results, comparator);<a name="line.5681"></a>
+<span class="sourceLineNo">5682</span> return moreValues;<a name="line.5682"></a>
+<span class="sourceLineNo">5683</span> }<a name="line.5683"></a>
+<span class="sourceLineNo">5684</span><a name="line.5684"></a>
+<span class="sourceLineNo">5685</span> /**<a name="line.5685"></a>
+<span class="sourceLineNo">5686</span> * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is<a name="line.5686"></a>
+<span class="sourceLineNo">5687</span> * reached, or remainingResultSize (if not -1) is reaced<a name="line.5687"></a>
+<span class="sourceLineNo">5688</span> * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.<a name="line.5688"></a>
+<span class="sourceLineNo">5689</span> * @param scannerContext<a name="line.5689"></a>
+<span class="sourceLineNo">5690</span> * @param currentRowCell<a name="line.5690"></a>
+<span class="sourceLineNo">5691</span> * @return state of last call to {@link KeyValueHeap#next()}<a name="line.5691"></a>
+<span class="sourceLineNo">5692</span> */<a name="line.5692"></a>
+<span class="sourceLineNo">5693</span> private boolean populateResult(List<Cell> results, KeyValueHeap heap,<a name="line.5693"></a>
+<span class="sourceLineNo">5694</span> ScannerContext scannerContext, Cell currentRowCell) throws IOException {<a name="line.5694"></a>
+<span class="sourceLineNo">5695</span> Cell nextKv;<a name="line.5695"></a>
+<span class="sourceLineNo">5696</span> boolean moreCellsInRow = false;<a name="line.5696"></a>
+<span class="sourceLineNo">5697</span> boolean tmpKeepProgress = scannerContext.getKeepProgress();<a name="line.5697"></a>
+<span class="sourceLineNo">5698</span> // Scanning between column families and thus the scope is between cells<a name="line.5698"></a>
+<span class="sourceLineNo">5699</span> LimitScope limitScope = LimitScope.BETWEEN_CELLS;<a name="line
<TRUNCATED>