You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/06/07 01:16:42 UTC

[3/3] hbase git commit: HBASE-18145 The flush may cause the corrupt data for reading

HBASE-18145 The flush may cause the corrupt data for reading

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6784686c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6784686c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6784686c

Branch: refs/heads/branch-1.3
Commit: 6784686cfa5edd9d383a03f07419322ac114ea91
Parents: 69deecb
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Tue Jun 6 15:15:23 2017 +0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Jun 6 18:03:13 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/StoreScanner.java |   8 +-
 .../hadoop/hbase/regionserver/TestStore.java    | 146 +++++++++++++++++++
 2 files changed, 153 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6784686c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index c967071..de66d4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -89,6 +89,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   protected final long cellsPerHeartbeatCheck;
 
   /**
+   * If we close the memstore scanners before sending data to client, the chunk may be reclaimed
+   * by other updates and the data will be corrupt.
+   */
+  private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
+  /**
    * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
    * KVs skipped via seeking to next row/column. TODO: estimate them?
    */
@@ -437,6 +442,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   public void close() {
     if (this.closing) return;
     this.closing = true;
+    clearAndClose(scannersForDelayedClose);
     clearAndClose(memStoreScannersAfterFlush);
     // Under test, we dont have a this.store
     if (this.store != null)
@@ -850,7 +856,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     // remove the older memstore scanner
     for (int i = 0; i < currentScanners.size(); i++) {
       if (!currentScanners.get(i).isFileScanner()) {
-        currentScanners.remove(i).close();
+        scannersForDelayedClose.add(currentScanners.remove(i));
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6784686c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 4cebf1e..ed4ee57 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -37,6 +37,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.NavigableSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import static org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
@@ -1206,6 +1208,67 @@ public class TestStore {
 
   }
 
+  @Test
+  public void testReclaimChunkWhenScaning() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setFloat(CHUNK_POOL_MAXSIZE_KEY, 1);
+    init("testReclaimChunkWhenScaning", conf);
+    final long ts = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    byte[] value = Bytes.toBytes("value");
+    // older data whihc shouldn't be "seen" by client
+    store.add(createCell(qf1, ts, seqId, value));
+    store.add(createCell(qf2, ts, seqId, value));
+    store.add(createCell(qf3, ts, seqId, value));
+    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    quals.add(qf1);
+    quals.add(qf2);
+    quals.add(qf3);
+    try (InternalScanner scanner = (InternalScanner) store.getScanner(
+        new Scan(new Get(row)), quals, seqId)) {
+      List<Cell> results = new MyList<>(new MyListHook() {
+        @Override
+        public void hook(int size) {
+          switch (size) {
+            // 1) we get the first cell (qf1)
+            // 2) flush the data to have StoreScanner update inner scanners
+            // 3) the chunk will be reclaimed after updaing
+            case 1:
+              try {
+                flushStore(store, id++);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+              break;
+            // 1) we get the second cell (qf2)
+            // 2) add some cell to fill some byte into the chunk (we have only one chunk)
+            case 2:
+              try {
+                byte[] newValue = Bytes.toBytes("newValue");
+                // older data whihc shouldn't be "seen" by client
+                store.add(createCell(qf1, ts + 1, seqId + 1, newValue));
+                store.add(createCell(qf2, ts + 1, seqId + 1, newValue));
+                store.add(createCell(qf3, ts + 1, seqId + 1, newValue));
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+              break;
+            default:
+              break;
+          }
+        }
+      });
+      scanner.next(results);
+      assertEquals(3, results.size());
+      for (Cell c : results) {
+        byte[] actualValue = CellUtil.cloneValue(c);
+        assertTrue("expected:" + Bytes.toStringBinary(value)
+          + ", actual:" + Bytes.toStringBinary(actualValue)
+          , Bytes.equals(actualValue, value));
+      }
+    }
+  }
+
   private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
     HColumnDescriptor hcd = new HColumnDescriptor(family);
@@ -1237,4 +1300,87 @@ public class TestStore {
 
     void hook(MyStore store) throws IOException;
   }
+
+  interface MyListHook {
+    void hook(int currentSize);
+  }
+
+  private static class MyList<T> implements List<T> {
+    private final List<T> delegatee = new ArrayList<>();
+    private final MyListHook hookAtAdd;
+    MyList(final MyListHook hookAtAdd) {
+      this.hookAtAdd = hookAtAdd;
+    }
+    @Override
+    public int size() {return delegatee.size();}
+
+    @Override
+    public boolean isEmpty() {return delegatee.isEmpty();}
+
+    @Override
+    public boolean contains(Object o) {return delegatee.contains(o);}
+
+    @Override
+    public Iterator<T> iterator() {return delegatee.iterator();}
+
+    @Override
+    public Object[] toArray() {return delegatee.toArray();}
+
+    @Override
+    public <T> T[] toArray(T[] a) {return delegatee.toArray(a);}
+
+    @Override
+    public boolean add(T e) {
+      hookAtAdd.hook(size());
+      return delegatee.add(e);
+    }
+
+    @Override
+    public boolean remove(Object o) {return delegatee.remove(o);}
+
+    @Override
+    public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
+
+    @Override
+    public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
+
+    @Override
+    public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
+
+    @Override
+    public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
+
+    @Override
+    public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
+
+    @Override
+    public void clear() {delegatee.clear();}
+
+    @Override
+    public T get(int index) {return delegatee.get(index);}
+
+    @Override
+    public T set(int index, T element) {return delegatee.set(index, element);}
+
+    @Override
+    public void add(int index, T element) {delegatee.add(index, element);}
+
+    @Override
+    public T remove(int index) {return delegatee.remove(index);}
+
+    @Override
+    public int indexOf(Object o) {return delegatee.indexOf(o);}
+
+    @Override
+    public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
+
+    @Override
+    public ListIterator<T> listIterator() {return delegatee.listIterator();}
+
+    @Override
+    public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
+
+    @Override
+    public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
+  }
 }
\ No newline at end of file