You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/08/25 18:35:15 UTC

svn commit: r1161634 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/util/ src/test/java/org/apache/hadoop/hbase/regionserver/

Author: stack
Date: Thu Aug 25 16:35:15 2011
New Revision: 1161634

URL: http://svn.apache.org/viewvc?rev=1161634&view=rev
Log:
HBASE-4241  Optimize flushing of the Memstore

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Aug 25 16:35:15 2011
@@ -403,6 +403,7 @@ Release 0.91.0 - Unreleased
    HBASE-4199  blockCache summary - backend (Doug Meil)
    HBASE-4240  Allow Loadbalancer to be pluggable
    HBASE-4244  Refactor bin/hbase help
+   HBASE-4241  Optimize flushing of the Memstore (Lars Hofhansl)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Aug 25 16:35:15 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.monitorin
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
@@ -90,6 +91,7 @@ public class Store implements HeapSize {
   // ttl in milliseconds.
   protected long ttl;
   protected int minVersions;
+  protected int maxVersions;
   long majorCompactionTime;
   private final int minFilesToCompact;
   private final int maxFilesToCompact;
@@ -178,6 +180,7 @@ public class Store implements HeapSize {
       this.ttl *= 1000;
     }
     this.minVersions = family.getMinVersions();
+    this.maxVersions = family.getMaxVersions();
     this.memstore = new MemStore(conf, this.comparator);
     this.storeNameStr = Bytes.toString(this.family.getName());
 
@@ -481,34 +484,45 @@ public class Store implements HeapSize {
     if (set.size() == 0) {
       return null;
     }
-    long oldestTimestamp = System.currentTimeMillis() - ttl;
-    // TODO:  We can fail in the below block before we complete adding this
-    // flush to list of store files.  Add cleanup of anything put on filesystem
-    // if we fail.
-    synchronized (flushLock) {
-      status.setStatus("Flushing " + this + ": creating writer");
-      // A. Write the map out to the disk
-      writer = createWriterInTmp(set.size());
-      writer.setTimeRangeTracker(snapshotTimeRangeTracker);
-      try {
-        for (KeyValue kv: set) {
-          // If minVersion > 0 we will wait until the next compaction to
-          // collect expired KVs. (following the logic for maxVersions).
-          // TODO: As Jonathan Gray points this can be optimized
-          // (see HBASE-4241)
-          if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) {
-            writer.append(kv);
-            flushed += this.memstore.heapSizeChange(kv, true);
+    Scan scan = new Scan();
+    scan.setMaxVersions(maxVersions);
+    // Use a store scanner to find which rows to flush.
+    // Note that we need to retain deletes, hence
+    // pass true as the StoreScanner's retainDeletesInOutput argument.
+    InternalScanner scanner = new StoreScanner(this, scan,
+        Collections.singletonList(new CollectionBackedScanner(set,
+            this.comparator)), true);
+    try {
+      // TODO:  We can fail in the below block before we complete adding this
+      // flush to list of store files.  Add cleanup of anything put on filesystem
+      // if we fail.
+      synchronized (flushLock) {
+        status.setStatus("Flushing " + this + ": creating writer");
+        // A. Write the map out to the disk
+        writer = createWriterInTmp(set.size());
+        writer.setTimeRangeTracker(snapshotTimeRangeTracker);
+        try {
+          List<KeyValue> kvs = new ArrayList<KeyValue>();
+          while (scanner.next(kvs)) {
+            if (!kvs.isEmpty()) {
+              for (KeyValue kv : kvs) {
+                writer.append(kv);
+                flushed += this.memstore.heapSizeChange(kv, true);
+              }
+              kvs.clear();
+            }
           }
+        } finally {
+          // Write out the log sequence number that corresponds to this output
+          // hfile.  The hfile is current up to and including logCacheFlushId.
+          status.setStatus("Flushing " + this + ": appending metadata");
+          writer.appendMetadata(logCacheFlushId, false);
+          status.setStatus("Flushing " + this + ": closing flushed file");
+          writer.close();
         }
-      } finally {
-        // Write out the log sequence number that corresponds to this output
-        // hfile.  The hfile is current up to and including logCacheFlushId.
-        status.setStatus("Flushing " + this + ": appending metadata");
-        writer.appendMetadata(logCacheFlushId, false);
-        status.setStatus("Flushing " + this + ": closing flushed file");
-        writer.close();
       }
+    } finally {
+      scanner.close();
     }
 
     // Write-out finished successfully, move into the right spot
@@ -1734,7 +1748,7 @@ public class Store implements HeapSize {
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
       (8 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
-      (5 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
+      (6 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java?rev=1161634&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java Thu Aug 25 16:35:15 2011
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+
+/**
+ * Utility scanner that wraps a sortable collection and serves
+ * as a KeyValueScanner.
+ */
+public class CollectionBackedScanner implements KeyValueScanner {
+  final private Iterable<KeyValue> data;
+  final KeyValue.KVComparator comparator;
+  private Iterator<KeyValue> iter;
+  private KeyValue current;
+
+  public CollectionBackedScanner(SortedSet<KeyValue> set) {
+    this(set, KeyValue.COMPARATOR);
+  }
+
+  public CollectionBackedScanner(SortedSet<KeyValue> set,
+      KeyValue.KVComparator comparator) {
+    this.comparator = comparator;
+    data = set;
+    init();
+  }
+
+  public CollectionBackedScanner(List<KeyValue> list) {
+    this(list, KeyValue.COMPARATOR);
+  }
+
+  public CollectionBackedScanner(List<KeyValue> list,
+      KeyValue.KVComparator comparator) {
+    Collections.sort(list, comparator);
+    this.comparator = comparator;
+    data = list;
+    init();
+  }
+
+  public CollectionBackedScanner(KeyValue.KVComparator comparator,
+      KeyValue... array) {
+    this.comparator = comparator;
+
+    List<KeyValue> tmp = new ArrayList<KeyValue>(array.length);
+    for( int i = 0; i < array.length ; ++i) {
+      tmp.add(array[i]);
+    }
+    Collections.sort(tmp, comparator);
+    data = tmp;
+    init();
+  }
+
+  private void init() {
+    iter = data.iterator();
+    if(iter.hasNext()){
+      current = iter.next();
+    }
+  }
+
+  @Override
+  public KeyValue peek() {
+    return current;
+  }
+
+  @Override
+  public KeyValue next() {
+    KeyValue oldCurrent = current;
+    if(iter.hasNext()){
+      current = iter.next();
+    } else {
+      current = null;
+    }
+    return oldCurrent;
+  }
+
+  @Override
+  public boolean seek(KeyValue seekKv) {
+    // restart iterator
+    iter = data.iterator();
+    return reseek(seekKv);
+  }
+
+  @Override
+  public boolean reseek(KeyValue seekKv) {
+    while(iter.hasNext()){
+      KeyValue next = iter.next();
+      int ret = comparator.compare(next, seekKv);
+      if(ret >= 0){
+        current = next;
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public long getSequenceID() {
+    return 0;
+  }
+
+  @Override
+  public void close() {
+    // do nothing
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java Thu Aug 25 16:35:15 2011
@@ -21,11 +21,10 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 import org.apache.hadoop.hbase.KeyValue;
 
 import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -34,21 +33,10 @@ import java.util.List;
  * to the provided comparator, and then the whole thing pretends
  * to be a store file scanner.
  */
-public class KeyValueScanFixture implements KeyValueScanner {
-  ArrayList<KeyValue> data;
-  Iterator<KeyValue> iter = null;
-  KeyValue current = null;
-  KeyValue.KVComparator comparator;
-
+public class KeyValueScanFixture extends CollectionBackedScanner {
   public KeyValueScanFixture(KeyValue.KVComparator comparator,
                              KeyValue... incData) {
-    this.comparator = comparator;
-
-    data = new ArrayList<KeyValue>(incData.length);
-    for( int i = 0; i < incData.length ; ++i) {
-      data.add(incData[i]);
-    }
-    Collections.sort(data, this.comparator);
+    super(comparator, incData);
   }
 
   public static List<KeyValueScanner> scanFixture(KeyValue[] ... kvArrays) {
@@ -58,54 +46,4 @@ public class KeyValueScanFixture impleme
     }
     return scanners;
   }
-
-
-  @Override
-  public KeyValue peek() {
-    return this.current;
-  }
-
-  @Override
-  public KeyValue next() {
-    KeyValue res = current;
-
-    if (iter.hasNext())
-      current = iter.next();
-    else
-      current = null;
-    return res;
-  }
-
-  @Override
-  public boolean seek(KeyValue key) {
-    // start at beginning.
-    iter = data.iterator();
-      int cmp;
-    KeyValue kv = null;
-    do {
-      if (!iter.hasNext()) {
-        current = null;
-        return false;
-      }
-      kv = iter.next();
-      cmp = comparator.compare(key, kv);
-    } while (cmp > 0);
-    current = kv;
-    return true;
-  }
-
-  @Override
-  public boolean reseek(KeyValue key) {
-    return seek(key);
-  }
-
-  @Override
-  public void close() {
-    // noop.
-  }
-
-  @Override
-  public long getSequenceID() {
-    return 0;
-  }
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java Thu Aug 25 16:35:15 2011
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 
 
 public class TestKeyValueHeap extends HBaseTestCase {
@@ -208,31 +209,11 @@ public class TestKeyValueHeap extends HB
     }
   }
 
-  private static class Scanner implements KeyValueScanner {
-    private Iterator<KeyValue> iter;
-    private KeyValue current;
+  private static class Scanner extends CollectionBackedScanner {
     private boolean closed = false;
 
     public Scanner(List<KeyValue> list) {
-      Collections.sort(list, KeyValue.COMPARATOR);
-      iter = list.iterator();
-      if(iter.hasNext()){
-        current = iter.next();
-      }
-    }
-
-    public KeyValue peek() {
-      return current;
-    }
-
-    public KeyValue next() {
-      KeyValue oldCurrent = current;
-      if(iter.hasNext()){
-        current = iter.next();
-      } else {
-        current = null;
-      }
-      return oldCurrent;
+      super(list);
     }
 
     public void close(){
@@ -242,28 +223,6 @@ public class TestKeyValueHeap extends HB
     public boolean isClosed() {
       return closed;
     }
-
-    public boolean seek(KeyValue seekKv) {
-      while(iter.hasNext()){
-        KeyValue next = iter.next();
-        int ret = KeyValue.COMPARATOR.compare(next, seekKv);
-        if(ret >= 0){
-          current = next;
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public boolean reseek(KeyValue key) throws IOException {
-      return seek(key);
-    }
-
-    @Override
-    public long getSequenceID() {
-      return 0;
-    }
   }
 
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java Thu Aug 25 16:35:15 2011
@@ -42,8 +42,6 @@ public class TestKeyValueScanFixture ext
     KeyValueScanner scan = new KeyValueScanFixture(
         KeyValue.COMPARATOR, kvs);
 
-    // test simple things.
-    assertNull(scan.peek());
     KeyValue kv = KeyValue.createFirstOnRow(Bytes.toBytes("RowA"));
     // should seek to this:
     assertTrue(scan.seek(kv));

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java Thu Aug 25 16:35:15 2011
@@ -315,14 +315,12 @@ public class TestMinVersions extends HBa
 
     // now flush
     region.flushcache();
-    region.compactStores();
 
-    // oldest version still exists
-    // flushing/minor compactions can't get rid of these, anymore
+    // with HBASE-4241 a flush will eliminate the expired rows
     g = new Get(T1);
     g.setTimeRange(0L, ts-2);
     r = region.get(g, null);
-    checkResult(r, c0, T1);
+    assertTrue(r.isEmpty());
 
     // major compaction
     region.compactStores(true);

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1161634&r1=1161633&r2=1161634&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Thu Aug 25 16:35:15 2011
@@ -122,6 +122,9 @@ public class TestStore extends TestCase 
     Path logdir = new Path(DIR+methodName+"/logs");
     Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
     HColumnDescriptor hcd = new HColumnDescriptor(family);
+    // some of the tests write 4 versions and then flush
+    // (with HBASE-4241, lower versions are collected on flush)
+    hcd.setMaxVersions(4);
     FileSystem fs = FileSystem.get(conf);
 
     fs.delete(logdir, true);