You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/15 02:04:49 UTC

svn commit: r1183570 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: nspiegelberg
Date: Sat Oct 15 00:04:48 2011
New Revision: 1183570

URL: http://svn.apache.org/viewvc?rev=1183570&view=rev
Log:
HBASE-4241  Optimize flushing of the Memstore

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sat Oct 15 00:04:48 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 
 /**
  * The MemStore holds in-memory modifications to the Store.  Modifications
@@ -466,6 +467,15 @@ public class MemStore implements HeapSiz
   }
 
   /**
+   * @return scanner on snapshot
+   */
+  public static List<KeyValueScanner> getSnapshotScanners(
+      SortedSet<KeyValue> snapshot, KeyValue.KVComparator comparator) {
+    return Collections.<KeyValueScanner>singletonList(
+        new CollectionBackedScanner(snapshot, comparator));
+  }
+  
+  /**
    * Check if this memstore may contain the required keys
    * @param scan
    * @return False if the key definitely does not exist in this Memstore

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Oct 15 00:04:48 2011
@@ -475,7 +475,7 @@ public class Store extends SchemaConfigu
    * @return StoreFile created.
    * @throws IOException
    */
-  private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
+  private StoreFile internalFlushCache(final SortedSet<KeyValue> snapshot,
       final long logCacheFlushId,
       TimeRangeTracker snapshotTimeRangeTracker,
       MonitoredTask status) throws IOException {
@@ -483,36 +483,53 @@ public class Store extends SchemaConfigu
     String fileName;
     long flushed = 0;
     // Don't flush if there are no entries.
-    if (set.size() == 0) {
+    if (snapshot.size() == 0) {
       return null;
     }
-    long oldestTimestamp = System.currentTimeMillis() - ttl;
-    // TODO:  We can fail in the below block before we complete adding this
-    // flush to list of store files.  Add cleanup of anything put on filesystem
-    // if we fail.
-    synchronized (flushLock) {
-      status.setStatus("Flushing " + this + ": creating writer");
-      // A. Write the map out to the disk
-      writer = createWriterInTmp(set.size());
-      writer.setTimeRangeTracker(snapshotTimeRangeTracker);
-      fileName = writer.getPath().getName();
-      int entries = 0;
-      try {
-        for (KeyValue kv: set) {
-          if (!isExpired(kv, oldestTimestamp)) {
-            writer.append(kv);
-            entries++;
-            flushed += this.memstore.heapSizeChange(kv, true);
-          }
+    
+    // Use a store scanner from snapshot to find out which rows to flush
+    // Note that we need to retain deletes.
+    Scan scan = new Scan();
+    scan.setMaxVersions(family.getMaxVersions());
+    InternalScanner scanner = new StoreScanner(this, scan, 
+        MemStore.getSnapshotScanners(snapshot, this.comparator), true);
+    
+    try {
+      // TODO:  We can fail in the below block before we complete adding this
+      // flush to list of store files.  Add cleanup of anything put on filesystem
+      // if we fail.
+      synchronized (flushLock) {
+        status.setStatus("Flushing " + this + ": creating writer");
+        // A. Write the map out to the disk
+        writer = createWriterInTmp(snapshot.size());
+        writer.setTimeRangeTracker(snapshotTimeRangeTracker);
+        fileName = writer.getPath().getName();
+        int entries = 0;
+        try {
+          final List<KeyValue> kvs = new ArrayList<KeyValue>();
+          boolean hasMore;
+          do {
+            hasMore = scanner.next(kvs);
+            if (!kvs.isEmpty()) {
+              for (KeyValue kv : kvs) {
+                writer.append(kv);
+                entries++;
+                flushed += this.memstore.heapSizeChange(kv, true);
+              }
+              kvs.clear();
+            }
+          } while (hasMore);
+        } finally {
+          // Write out the log sequence number that corresponds to this output
+          // hfile.  The hfile is current up to and including logCacheFlushId.
+          status.setStatus("Flushing " + this + ": appending metadata");
+          writer.appendMetadata(logCacheFlushId, false);
+          status.setStatus("Flushing " + this + ": closing flushed file");
+          writer.close();
         }
-      } finally {
-        // Write out the log sequence number that corresponds to this output
-        // hfile.  The hfile is current up to and including logCacheFlushId.
-        status.setStatus("Flushing " + this + ": appending metadata");
-        writer.appendMetadata(logCacheFlushId, false);
-        status.setStatus("Flushing " + this + ": closing flushed file");
-        writer.close();
       }
+    } finally {
+      scanner.close();
     }
 
     Path dstPath = new Path(homedir, fileName);

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java Sat Oct 15 00:04:48 2011
@@ -21,11 +21,10 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 import org.apache.hadoop.hbase.KeyValue;
 
 import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -34,21 +33,11 @@ import java.util.List;
  * to the provided comparator, and then the whole thing pretends
  * to be a store file scanner.
  */
-public class KeyValueScanFixture extends NonLazyKeyValueScanner {
-  ArrayList<KeyValue> data;
-  Iterator<KeyValue> iter = null;
-  KeyValue current = null;
-  KeyValue.KVComparator comparator;
+public class KeyValueScanFixture extends CollectionBackedScanner {
 
   public KeyValueScanFixture(KeyValue.KVComparator comparator,
                              KeyValue... incData) {
-    this.comparator = comparator;
-
-    data = new ArrayList<KeyValue>(incData.length);
-    for( int i = 0; i < incData.length ; ++i) {
-      data.add(incData[i]);
-    }
-    Collections.sort(data, this.comparator);
+    super(comparator, incData);
   }
 
   public static List<KeyValueScanner> scanFixture(KeyValue[] ... kvArrays) {
@@ -58,54 +47,4 @@ public class KeyValueScanFixture extends
     }
     return scanners;
   }
-
-
-  @Override
-  public KeyValue peek() {
-    return this.current;
-  }
-
-  @Override
-  public KeyValue next() {
-    KeyValue res = current;
-
-    if (iter.hasNext())
-      current = iter.next();
-    else
-      current = null;
-    return res;
-  }
-
-  @Override
-  public boolean seek(KeyValue key) {
-    // start at beginning.
-    iter = data.iterator();
-      int cmp;
-    KeyValue kv = null;
-    do {
-      if (!iter.hasNext()) {
-        current = null;
-        return false;
-      }
-      kv = iter.next();
-      cmp = comparator.compare(key, kv);
-    } while (cmp > 0);
-    current = kv;
-    return true;
-  }
-
-  @Override
-  public boolean reseek(KeyValue key) {
-    return seek(key);
-  }
-
-  @Override
-  public void close() {
-    // noop.
-  }
-
-  @Override
-  public long getSequenceID() {
-    return 0;
-  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java Sat Oct 15 00:04:48 2011
@@ -338,4 +338,29 @@ public class TestBlocksRead extends HBas
     verifyData(kvs[1], "row", "col2", 12);
     verifyData(kvs[2], "row", "col3", 13);
   }
+  
+  @Test
+  public void testLazySeekBlocksReadWithDelete() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testLazySeekBlocksReadWithDelete");
+    byte [] FAMILY = Bytes.toBytes("cf1");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+    KeyValue kvs[];
+
+    HBaseConfiguration conf = getConf();
+    initHRegion(TABLE, getName(), conf, FAMILIES);
+    
+    deleteFamily(FAMILY, "row", 200);
+    for (int i = 0 ; i < 100; i++) {
+      putData(FAMILY, "row", "col"+i, i);
+    }
+    putData(FAMILY, "row", "col99", 201);
+
+    region.flushcache();
+    kvs = getData(FAMILY, "row", Arrays.asList("col0"), 2);
+    assertEquals(0, kvs.length);
+    
+    kvs = getData(FAMILY, "row", Arrays.asList("col99"), 2);
+    assertEquals(1, kvs.length);
+    verifyData(kvs[0], "row", "col99", 201);
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java Sat Oct 15 00:04:48 2011
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 
 
 public class TestKeyValueHeap extends HBaseTestCase {
@@ -67,25 +68,36 @@ public class TestKeyValueHeap extends HB
     //Cases that need to be checked are:
     //1. The "smallest" KeyValue is in the same scanners as current
     //2. Current scanner gets empty
-
+    KeyValue startKV;
+    Scanner scanner;
+    
     List<KeyValue> l1 = new ArrayList<KeyValue>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
+    startKV = new KeyValue(row1, fam1, col5, data);
+    l1.add(startKV);
     l1.add(new KeyValue(row2, fam1, col1, data));
     l1.add(new KeyValue(row2, fam1, col2, data));
-    scanners.add(new Scanner(l1));
+    scanner = new Scanner(l1);
+    scanner.seek(startKV);
+    scanners.add(scanner);
 
     List<KeyValue> l2 = new ArrayList<KeyValue>();
+    startKV = new KeyValue(row1, fam1, col1, data);
     l2.add(new KeyValue(row1, fam1, col1, data));
     l2.add(new KeyValue(row1, fam1, col2, data));
-    scanners.add(new Scanner(l2));
+    scanner = new Scanner(l2);
+    scanner.seek(startKV);
+    scanners.add(scanner);
 
     List<KeyValue> l3 = new ArrayList<KeyValue>();
+    startKV = new KeyValue(row1, fam1, col3, data);
     l3.add(new KeyValue(row1, fam1, col3, data));
     l3.add(new KeyValue(row1, fam1, col4, data));
     l3.add(new KeyValue(row1, fam2, col1, data));
     l3.add(new KeyValue(row1, fam2, col2, data));
     l3.add(new KeyValue(row2, fam1, col3, data));
-    scanners.add(new Scanner(l3));
+    scanner = new Scanner(l3);
+    scanner.seek(startKV);
+    scanners.add(scanner);
 
     List<KeyValue> expected = new ArrayList<KeyValue>();
     expected.add(new KeyValue(row1, fam1, col1, data));
@@ -129,25 +141,36 @@ public class TestKeyValueHeap extends HB
     //Cases:
     //1. Seek KeyValue that is not in scanner
     //2. Check that smallest that is returned from a seek is correct
-
+    KeyValue startKV ;
+    Scanner scanner;
+    
     List<KeyValue> l1 = new ArrayList<KeyValue>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
+    startKV = new KeyValue(row1, fam1, col5, data);
+    l1.add(startKV);
     l1.add(new KeyValue(row2, fam1, col1, data));
     l1.add(new KeyValue(row2, fam1, col2, data));
-    scanners.add(new Scanner(l1));
+    scanner = new Scanner(l1);
+    scanner.seek(startKV);
+    scanners.add(scanner);
 
     List<KeyValue> l2 = new ArrayList<KeyValue>();
+    startKV = new KeyValue(row1, fam1, col1, data);
     l2.add(new KeyValue(row1, fam1, col1, data));
     l2.add(new KeyValue(row1, fam1, col2, data));
-    scanners.add(new Scanner(l2));
+    scanner = new Scanner(l2);
+    scanner.seek(startKV);
+    scanners.add(scanner);
 
     List<KeyValue> l3 = new ArrayList<KeyValue>();
+    startKV = new KeyValue(row1, fam1, col3, data);
     l3.add(new KeyValue(row1, fam1, col3, data));
     l3.add(new KeyValue(row1, fam1, col4, data));
     l3.add(new KeyValue(row1, fam2, col1, data));
     l3.add(new KeyValue(row1, fam2, col2, data));
     l3.add(new KeyValue(row2, fam1, col3, data));
-    scanners.add(new Scanner(l3));
+    scanner = new Scanner(l3);
+    scanner.seek(startKV);
+    scanners.add(scanner);
 
     List<KeyValue> expected = new ArrayList<KeyValue>();
     expected.add(new KeyValue(row2, fam1, col1, data));
@@ -175,25 +198,36 @@ public class TestKeyValueHeap extends HB
 
   public void testScannerLeak() throws IOException {
     // Test for unclosed scanners (HBASE-1927)
-
+    KeyValue startKV ;
+    Scanner scanner;
+    
     List<KeyValue> l1 = new ArrayList<KeyValue>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
+    startKV = new KeyValue(row1, fam1, col5, data);
+    l1.add(startKV);
     l1.add(new KeyValue(row2, fam1, col1, data));
     l1.add(new KeyValue(row2, fam1, col2, data));
-    scanners.add(new Scanner(l1));
+    scanner = new Scanner(l1);
+    scanner.seek(startKV);
+    scanners.add(scanner);
 
     List<KeyValue> l2 = new ArrayList<KeyValue>();
+    startKV = new KeyValue(row1, fam1, col1, data);
     l2.add(new KeyValue(row1, fam1, col1, data));
     l2.add(new KeyValue(row1, fam1, col2, data));
-    scanners.add(new Scanner(l2));
+    scanner = new Scanner(l2);
+    scanner.seek(startKV);
+    scanners.add(scanner);
 
     List<KeyValue> l3 = new ArrayList<KeyValue>();
+    startKV = new KeyValue(row1, fam1, col3, data);
     l3.add(new KeyValue(row1, fam1, col3, data));
     l3.add(new KeyValue(row1, fam1, col4, data));
     l3.add(new KeyValue(row1, fam2, col1, data));
     l3.add(new KeyValue(row1, fam2, col2, data));
     l3.add(new KeyValue(row2, fam1, col3, data));
-    scanners.add(new Scanner(l3));
+    scanner = new Scanner(l3);
+    scanner.seek(startKV);
+    scanners.add(scanner);
 
     List<KeyValue> l4 = new ArrayList<KeyValue>();
     scanners.add(new Scanner(l4));
@@ -203,66 +237,15 @@ public class TestKeyValueHeap extends HB
 
     while(kvh.next() != null);
 
-    for(KeyValueScanner scanner : scanners) {
-      assertTrue(((Scanner)scanner).isClosed());
+    for(KeyValueScanner kvScanner : scanners) {
+      assertTrue(((Scanner)kvScanner).isClosed());
     }
   }
 
-  private static class Scanner extends NonLazyKeyValueScanner {
-    private Iterator<KeyValue> iter;
-    private KeyValue current;
-    private boolean closed = false;
-
+  private static class Scanner extends CollectionBackedScanner {
+    
     public Scanner(List<KeyValue> list) {
-      Collections.sort(list, KeyValue.COMPARATOR);
-      iter = list.iterator();
-      if(iter.hasNext()){
-        current = iter.next();
-      }
-    }
-
-    public KeyValue peek() {
-      return current;
-    }
-
-    public KeyValue next() {
-      KeyValue oldCurrent = current;
-      if(iter.hasNext()){
-        current = iter.next();
-      } else {
-        current = null;
-      }
-      return oldCurrent;
-    }
-
-    public void close(){
-      closed = true;
-    }
-
-    public boolean isClosed() {
-      return closed;
-    }
-
-    public boolean seek(KeyValue seekKv) {
-      while(iter.hasNext()){
-        KeyValue next = iter.next();
-        int ret = KeyValue.COMPARATOR.compare(next, seekKv);
-        if(ret >= 0){
-          current = next;
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public boolean reseek(KeyValue key) throws IOException {
-      return seek(key);
-    }
-
-    @Override
-    public long getSequenceID() {
-      return 0;
+     super(list,KeyValue.COMPARATOR);
     }
   }
 

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1183570&r1=1183569&r2=1183570&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Sat Oct 15 00:04:48 2011
@@ -92,6 +92,8 @@ public class TestStore extends TestCase 
 
   private static final String DIR = HBaseTestingUtility.getTestDir() + "/TestStore/";
 
+  private static final int MAX_VERSION = 4;
+
   /**
    * Setup
    * @throws IOException
@@ -121,8 +123,10 @@ public class TestStore extends TestCase 
     Path logdir = new Path(DIR+methodName+"/logs");
     Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
     HColumnDescriptor hcd = new HColumnDescriptor(family);
+    // with HBASE-4241, lower versions are collected on flush
+    hcd.setMaxVersions(TestStore.MAX_VERSION);
     FileSystem fs = FileSystem.get(conf);
-
+    
     fs.delete(logdir, true);
 
     HTableDescriptor htd = new HTableDescriptor(table);
@@ -549,6 +553,8 @@ public class TestStore extends TestCase 
       this.store.add(kv);
     }
 
+    assertEquals(Math.max(timestamps1.length, timestamps2.length), 
+        TestStore.MAX_VERSION);
     List<KeyValue> result;
     Get get = new Get(Bytes.toBytes(1));
     get.addColumn(family,qf1);