You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2009/07/28 10:51:06 UTC

svn commit: r798448 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/regionserver/

Author: rawson
Date: Tue Jul 28 08:51:06 2009
New Revision: 798448

URL: http://svn.apache.org/viewvc?rev=798448&view=rev
Log:
HBASE-1703 ICV across flush

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=798448&r1=798447&r2=798448&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Jul 28 08:51:06 2009
@@ -293,6 +293,8 @@
                when trying to read
    HBASE-1705  Thrift server: deletes in mutateRow/s don't delete
                (Tim Sell and Ryan Rawson via Stack)
+   HBASE-1703  ICVs across /during a flush can cause multiple keys with the 
+               same TS (bad)
 
   IMPROVEMENTS
    HBASE-1089  Add count of regions on filesystem to master UI; add percentage

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=798448&r1=798447&r2=798448&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Tue Jul 28 08:51:06 2009
@@ -58,7 +58,7 @@
 
   private final long ttl;
 
-  // MemStore.  Use a SkipListMap rather than SkipListSet because of the
+  // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
   // better semantics.  The Map will overwrite if passed a key it already had
   // whereas the Set will not add new KV if key is same though value might be
   // different.  Value is not important -- just make sure always same
@@ -575,7 +575,7 @@
    * @return true if done with store (early-out), false if not
    * @throws IOException
    */
-  private boolean internalGet(final NavigableSet<KeyValue> set,
+  boolean internalGet(final NavigableSet<KeyValue> set,
       final QueryMatcher matcher, final List<KeyValue> result)
   throws IOException {
     if(set.isEmpty()) return false;

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java?rev=798448&r1=798447&r2=798448&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java Tue Jul 28 08:51:06 2009
@@ -279,9 +279,9 @@
     MatchCode mc = columns.checkColumn(bytes, columnOffset, columnLength);
     if (mc == MatchCode.INCLUDE && this.filter != null) {
       switch(this.filter.filterKeyValue(kv)) {
-      case INCLUDE: return MatchCode.INCLUDE;
-      case SKIP: return MatchCode.SKIP;
-      default: return MatchCode.DONE;
+        case INCLUDE: return MatchCode.INCLUDE;
+        case SKIP: return MatchCode.SKIP;
+        default: return MatchCode.DONE;
       }
     }
     return mc;

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=798448&r1=798447&r2=798448&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Jul 28 08:51:06 2009
@@ -1607,22 +1607,46 @@
 
     // Setting up the QueryMatcher
     Get get = new Get(row);
-    NavigableSet<byte[]> qualifiers = 
+    NavigableSet<byte[]> qualifiers =
       new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
     qualifiers.add(qualifier);
     QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
       keyComparator, 1);
-    
-    // Read from memstore
-    if (this.memstore.get(matcher, result)) {
+
+    boolean newTs = true;
+    KeyValue kv = null;
+    // Read from memstore first:
+    this.memstore.internalGet(this.memstore.kvset,
+                                  matcher, result);
+    if (!result.isEmpty()) {
+      kv = result.get(0).clone();
+      newTs = false;
+    } else {
+      // try the snapshot.
+      this.memstore.internalGet(this.memstore.snapshot,
+          matcher, result);
+      if (!result.isEmpty()) {
+        kv = result.get(0).clone();
+      }
+    }
+
+    if (kv != null) {
       // Received early-out from memstore
       // Make a copy of the KV and increment it
-      KeyValue kv = result.get(0).clone();
       byte [] buffer = kv.getBuffer();
       int valueOffset = kv.getValueOffset();
       value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
       Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0,
         Bytes.SIZEOF_LONG);
+      if (newTs) {
+        long currTs = System.currentTimeMillis();
+        if (currTs == kv.getTimestamp()) {
+          currTs++; // just in case something wacky happens.
+        }
+        byte [] stampBytes = Bytes.toBytes(currTs);
+        Bytes.putBytes(buffer, kv.getTimestampOffset(), stampBytes, 0,
+            Bytes.SIZEOF_LONG);
+      }
       return new ICVResult(value, 0, kv);
     }
     // Check if we even have storefiles

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=798448&r1=798447&r2=798448&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Tue Jul 28 08:51:06 2009
@@ -1,15 +1,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.concurrent.ConcurrentSkipListSet;
-
 import junit.framework.TestCase;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -22,6 +13,15 @@
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.Progressable;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
 /**
  * Test class fosr the Store 
  */
@@ -242,7 +242,7 @@
     this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
     
     Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
-    assertEquals(vas.value, value+amount);
+    assertEquals(value+amount, vas.value);
     store.add(vas.kv);
     Get get = new Get(row);
     get.addColumn(family, qf1);
@@ -361,5 +361,45 @@
     this.store.get(get, qualifiers, result);
     assertEquals(amount, Bytes.toLong(result.get(0).getValue()));
   }
+
+  public void testIncrementColumnValue_ICVDuringFlush()
+    throws IOException {
+    init(this.getName());
+
+    long value = 1L;
+    long amount = 3L;
+    this.store.add(new KeyValue(row, family, qf1,
+        System.currentTimeMillis(),
+        Bytes.toBytes(value)));
+
+    // snapshot the store.
+    this.store.snapshot();
+
+    // incrment during the snapshot...
+
+    Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
+
+    // then flush.
+    this.store.flushCache(id++);
+    assertEquals(1, this.store.getStorefiles().size());
+    assertEquals(0, this.store.memstore.kvset.size());
+
+    Get get = new Get(row);
+    get.addColumn(family, qf1);
+    get.setMaxVersions(); // all versions.
+    List<KeyValue> results = new ArrayList<KeyValue>();
+
+    NavigableSet<byte[]> cols = new TreeSet<byte[]>();
+    cols.add(qf1);
+
+    this.store.get(get, cols, results);
+    // only one, because Store.ICV doesnt add to memcache.
+    assertEquals(1, results.size());
+
+    // but the timestamps should be different...
+    long icvTs = vas.kv.getTimestamp();
+    long storeTs = results.get(0).getTimestamp();
+    assertTrue(icvTs != storeTs);
+  }
   
 }