You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/10/03 11:45:59 UTC

hbase git commit: HBASE-15871 Memstore flush doesn't finish because of backwardseek() in memstore scanner. (Ram)

Repository: hbase
Updated Branches:
  refs/heads/master 250ad644e -> 7d0a6a82a


HBASE-15871 Memstore flush doesn't finish because of backwardseek() in
memstore scanner. (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7d0a6a82
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7d0a6a82
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7d0a6a82

Branch: refs/heads/master
Commit: 7d0a6a82ab5dde6b11add5a6b57a497e3f151d04
Parents: 250ad64
Author: Ramkrishna <ra...@intel.com>
Authored: Mon Oct 3 17:15:24 2016 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Mon Oct 3 17:15:24 2016 +0530

----------------------------------------------------------------------
 .../hbase/regionserver/MemStoreScanner.java     | 21 +++++++
 .../hbase/regionserver/SegmentScanner.java      | 33 ++++++++++-
 .../hadoop/hbase/regionserver/TestHRegion.java  | 58 ++++++++++++++++++++
 3 files changed, 111 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7d0a6a82/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
index 2371e20..2ccdf68 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
@@ -131,6 +131,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
    */
   @Override
   public Cell peek() {
+    if (closed) {
+      return null;
+    }
     if (this.heap != null) {
       return this.heap.peek();
     }
@@ -143,6 +146,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
    */
   @Override
   public Cell next() throws IOException {
+    if (closed) {
+      return null;
+    }
     if(this.heap != null) {
       // loop over till the next suitable value
       // take next value from the heap
@@ -167,6 +173,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
    */
   @Override
   public boolean seek(Cell cell) throws IOException {
+    if (closed) {
+      return false;
+    }
     initForwardKVHeapIfNeeded(comparator, scanners);
 
     if (cell == null) {
@@ -199,6 +208,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
     *
     *  TODO: The above comment copied from the original MemStoreScanner
     */
+    if (closed) {
+      return false;
+    }
     initForwardKVHeapIfNeeded(comparator, scanners);
     return heap.reseek(cell);
   }
@@ -241,6 +253,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
   public boolean backwardSeek(Cell cell) throws IOException {
     // The first time when this happens it sets the scanners to the seek key
     // passed by the incoming scan's start row
+    if (closed) {
+      return false;
+    }
     initReverseKVHeapIfNeeded(cell, comparator, scanners);
     return heap.backwardSeek(cell);
   }
@@ -253,6 +268,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
    */
   @Override
   public boolean seekToPreviousRow(Cell cell) throws IOException {
+    if (closed) {
+      return false;
+    }
     initReverseKVHeapIfNeeded(cell, comparator, scanners);
     if (heap.peek() == null) {
       restartBackwardHeap(cell);
@@ -262,6 +280,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
 
   @Override
   public boolean seekToLastRow() throws IOException {
+    if (closed) {
+      return false;
+    }
     return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7d0a6a82/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 92c3443..7803f7d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -56,6 +56,9 @@ public class SegmentScanner implements KeyValueScanner {
   // last iterated KVs by seek (to restore the iterator state after reseek)
   private Cell last = null;
 
+  // flag to indicate if this scanner is closed
+  private boolean closed = false;
+
   protected SegmentScanner(Segment segment, long readPoint) {
     this(segment, readPoint, DEFAULT_SCANNER_ORDER);
   }
@@ -73,6 +76,10 @@ public class SegmentScanner implements KeyValueScanner {
     // the initialization of the current is required for working with heap of SegmentScanners
     current = getNext();
     this.scannerOrder = scannerOrder;
+    if (current == null) {
+      // nothing to fetch from this scanner
+      close();
+    }
   }
 
   /**
@@ -81,6 +88,9 @@ public class SegmentScanner implements KeyValueScanner {
    */
   @Override
   public Cell peek() {          // sanity check, the current should be always valid
+    if (closed) {
+      return null;
+    }
     if (current!=null && current.getSequenceId() > readPoint) {
       throw new RuntimeException("current is invalid: read point is "+readPoint+", " +
           "while current sequence id is " +current.getSequenceId());
@@ -94,6 +104,9 @@ public class SegmentScanner implements KeyValueScanner {
    */
   @Override
   public Cell next() throws IOException {
+    if (closed) {
+      return null;
+    }
     Cell oldCurrent = current;
     current = getNext();                  // update the currently observed Cell
     return oldCurrent;
@@ -106,6 +119,9 @@ public class SegmentScanner implements KeyValueScanner {
    */
   @Override
   public boolean seek(Cell cell) throws IOException {
+    if (closed) {
+      return false;
+    }
     if(cell == null) {
       close();
       return false;
@@ -129,7 +145,9 @@ public class SegmentScanner implements KeyValueScanner {
    */
   @Override
   public boolean reseek(Cell cell) throws IOException {
-
+    if (closed) {
+      return false;
+    }
     /*
     See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
     This code is executed concurrently with flush and puts, without locks.
@@ -155,6 +173,9 @@ public class SegmentScanner implements KeyValueScanner {
    */
   @Override
   public boolean backwardSeek(Cell key) throws IOException {
+    if (closed) {
+      return false;
+    }
     seek(key);    // seek forward then go backward
     if (peek() == null || segment.compareRows(peek(), key) > 0) {
       return seekToPreviousRow(key);
@@ -172,6 +193,9 @@ public class SegmentScanner implements KeyValueScanner {
    */
   @Override
   public boolean seekToPreviousRow(Cell cell) throws IOException {
+    if (closed) {
+      return false;
+    }
     boolean keepSeeking;
     Cell key = cell;
     do {
@@ -205,6 +229,9 @@ public class SegmentScanner implements KeyValueScanner {
    */
   @Override
   public boolean seekToLastRow() throws IOException {
+    if (closed) {
+      return false;
+    }
     Cell higherCell = segment.isEmpty() ? null : segment.last();
     if (higherCell == null) {
       return false;
@@ -232,7 +259,11 @@ public class SegmentScanner implements KeyValueScanner {
    */
   @Override
   public void close() {
+    if (closed) {
+      return;
+    }
     getSegment().decScannerCount();
+    closed = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/7d0a6a82/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 79d305f..6322360 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -5902,6 +5902,64 @@ public class TestHRegion {
   }
 
   @Test
+  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
+    byte[] cf1 = Bytes.toBytes("CF1");
+    byte[][] families = { cf1 };
+    byte[] col = Bytes.toBytes("C");
+    String method = this.getName();
+    HBaseConfiguration conf = new HBaseConfiguration();
+    this.region = initHRegion(tableName, method, conf, families);
+    try {
+      // setup with one storefile and one memstore, to create scanner and get an earlier readPt
+      Put put = new Put(Bytes.toBytes("19996"));
+      put.addColumn(cf1, col, Bytes.toBytes("val"));
+      region.put(put);
+      Put put2 = new Put(Bytes.toBytes("19995"));
+      put2.addColumn(cf1, col, Bytes.toBytes("val"));
+      region.put(put2);
+      // create a reverse scan
+      Scan scan = new Scan(Bytes.toBytes("19996"));
+      scan.setReversed(true);
+      RegionScanner scanner = region.getScanner(scan);
+
+      // flush the cache. This will reset the store scanner
+      region.flushcache(true, true);
+
+      // create one memstore contains many rows will be skipped
+      // to check MemStoreScanner.seekToPreviousRow
+      for (int i = 10000; i < 20000; i++) {
+        Put p = new Put(Bytes.toBytes("" + i));
+        p.addColumn(cf1, col, Bytes.toBytes("" + i));
+        region.put(p);
+      }
+      List<Cell> currRow = new ArrayList<>();
+      boolean hasNext;
+      boolean assertDone = false;
+      do {
+        hasNext = scanner.next(currRow);
+        // With HBASE-15871, after the scanner is reset the memstore scanner should not be
+        // added here
+        if (!assertDone) {
+          StoreScanner current =
+              (StoreScanner) (((RegionScannerImpl) scanner).storeHeap).getCurrentForTesting();
+          List<KeyValueScanner> scanners = current.getAllScannersForTesting();
+          assertEquals("There should be only one scanner the store file scanner", 1,
+            scanners.size());
+          assertDone = true;
+        }
+      } while (hasNext);
+      assertEquals(2, currRow.size());
+      assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
+        currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
+      assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
+        currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
+    } finally {
+      HBaseTestingUtility.closeRegionAndWAL(this.region);
+      this.region = null;
+    }
+  }
+
+  @Test
   public void testSplitRegionWithReverseScan() throws IOException {
     TableName tableName = TableName.valueOf("testSplitRegionWithReverseScan");
     byte [] qualifier = Bytes.toBytes("qualifier");