You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/04/01 15:27:12 UTC

svn commit: r929962 - /hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Author: stack
Date: Thu Apr  1 13:27:11 2010
New Revision: 929962

URL: http://svn.apache.org/viewvc?rev=929962&view=rev
Log:
HBASE-2338  log recovery: deleted items may be resurrected; 2nd attempt... fixes broke build

Modified:
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=929962&r1=929961&r2=929962&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Apr  1 13:27:11 2010
@@ -1172,30 +1172,41 @@ public class HRegion implements HConstan
 
         byte[] family = e.getKey(); 
         List<KeyValue> kvs = e.getValue();
-        
+        Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ 
         Store store = getStore(family);
         for (KeyValue kv: kvs) {
           //  Check if time is LATEST, change to time of most recent addition if so
           //  This is expensive.
           if (kv.isLatestTimestamp() && kv.isDeleteType()) {
+            byte[] qual = kv.getQualifier();
+            if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
+            Integer count = kvCount.get(qual);
+            if (count == null) {
+              kvCount.put(qual, new Integer(1));
+            } else {
+              kvCount.put(qual, new Integer(count+1));
+            }
+            count = kvCount.get(qual);
+
             List<KeyValue> result = new ArrayList<KeyValue>(1);
             Get g = new Get(kv.getRow());
+            g.setMaxVersions(count);
             NavigableSet<byte []> qualifiers =
               new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-              byte [] q = kv.getQualifier();
-              if(q == null) q = HConstants.EMPTY_BYTE_ARRAY;
-              qualifiers.add(q);
-              get(store, g, qualifiers, result);
-              if (result.isEmpty()) {
-                // Nothing to delete
-                continue;
-              }
-              if (result.size() > 1) {
-                throw new RuntimeException("Unexpected size: " + result.size());
-              }
-              KeyValue getkv = result.get(0);
-              Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
-                  getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+            qualifiers.add(qual);
+            get(store, g, qualifiers, result);
+            if (result.size() < count) {
+              // Nothing to delete
+              kv.updateLatestStamp(byteNow);
+              continue;
+            }
+            if (result.size() > count) {
+              throw new RuntimeException("Unexpected size: " + result.size());
+            }
+            KeyValue getkv = result.get(count - 1);
+            Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
+                getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
           } else {
             kv.updateLatestStamp(byteNow);
           }