You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/07/17 01:08:58 UTC
svn commit: r964973 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/util/
src/test/java/org/apache/hadoop/hbase/regionserver/
Author: rawson
Date: Fri Jul 16 23:08:57 2010
New Revision: 964973
URL: http://svn.apache.org/viewvc?rev=964973&view=rev
Log:
HBASE-2553 Revisit IncrementColumnValue implementation in 0.22
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=964973&r1=964972&r2=964973&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Jul 16 23:08:57 2010
@@ -26,6 +26,7 @@ Release 0.21.0 - Unreleased
HBASE-2397 Bytes.toStringBinary escapes printable chars
HBASE-2771 Update our hadoop jar to be latest from 0.20-append branch
HBASE-2803 Remove remaining Get code from Store.java,etc
+ HBASE-2553 Revisit IncrementColumnValue implementation in 0.22
BUG FIXES
HBASE-1791 Timeout in IndexRecordWriter (Bradford Stephens via Andrew
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=964973&r1=964972&r2=964973&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Jul 16 23:08:57 2010
@@ -351,6 +351,74 @@ public class MemStore implements HeapSiz
}
}
+ /**
+ * Given the specs of a column, update it, first by inserting a new record,
+ * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
+ * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
+ * store will ensure that the insert/delete each are atomic. A scanner/reader will either
+ * get the new value, or the old value and all readers will eventually only see the new
+ * value after the old was removed.
+ *
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param newValue
+ * @param now
+ * @return
+ */
+ public long updateColumnValue(byte[] row,
+ byte[] family,
+ byte[] qualifier,
+ long newValue,
+ long now) {
+ this.lock.readLock().lock();
+ try {
+ // create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
+ KeyValue newKv = new KeyValue(row, family, qualifier,
+ now,
+ Bytes.toBytes(newValue));
+
+ long addedSize = add(newKv);
+
+ // now find and RM the old one(s) to prevent version explosion:
+ SortedSet<KeyValue> ss = kvset.tailSet(newKv);
+ Iterator<KeyValue> it = ss.iterator();
+ while ( it.hasNext() ) {
+ KeyValue kv = it.next();
+
+ if (kv == newKv) {
+ // ignore the one i just put in (heh)
+ continue;
+ }
+ // if this isnt the row we are interested in, then bail:
+ if (0 != Bytes.compareTo(
+ newKv.getBuffer(), newKv.getRowOffset(), newKv.getRowLength(),
+ kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+ break; // rows dont match, bail.
+ }
+
+ // if the qualifier matches and it's a put, just RM it out of the kvset.
+ if (0 == Bytes.compareTo(
+ newKv.getBuffer(), newKv.getQualifierOffset(), newKv.getQualifierLength(),
+ kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength())) {
+
+ // to be extra safe we only remove Puts that have a memstoreTS==0
+ if (kv.getType() == KeyValue.Type.Put.getCode() &&
+ kv.getMemstoreTS() == 0) {
+ // false means there was a change, so give us the size.
+ addedSize -= heapSizeChange(kv, false);
+
+ it.remove();
+ }
+ }
+ }
+
+ return addedSize;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
/*
* Immutable data structure to hold member found in set and the set it was
* found in. Include set because it is carrying context.
@@ -400,76 +468,6 @@ public class MemStore implements HeapSiz
}
}
-
- // TODO fix this not to use QueryMatcher!
- /**
- * Gets from either the memstore or the snapshop, and returns a code
- * to let you know which is which.
- *
- * @param matcher query matcher
- * @param result puts results here
- * @return 1 == memstore, 2 == snapshot, 0 == none
- */
- int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
- this.lock.readLock().lock();
- try {
- boolean fromMemstore = internalGet(this.kvset, matcher, result);
- if (fromMemstore || matcher.isDone())
- return 1;
-
- matcher.update();
- boolean fromSnapshot = internalGet(this.snapshot, matcher, result);
- if (fromSnapshot || matcher.isDone())
- return 2;
-
- return 0;
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
- /**
- * Small utility functions for use by Store.incrementColumnValue
- * _only_ under the threat of pain and everlasting race conditions.
- */
- void readLockLock() {
- this.lock.readLock().lock();
- }
- void readLockUnlock() {
- this.lock.readLock().unlock();
- }
-
- /**
- *
- * @param set memstore or snapshot
- * @param matcher query matcher
- * @param result list to add results to
- * @return true if done with store (early-out), false if not
- */
- boolean internalGet(final NavigableSet<KeyValue> set,
- final QueryMatcher matcher, final List<KeyValue> result) {
- if(set.isEmpty()) return false;
- // Seek to startKey
- SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
- for (KeyValue kv : tail) {
- QueryMatcher.MatchCode res = matcher.match(kv);
- switch(res) {
- case INCLUDE:
- result.add(kv);
- break;
- case SKIP:
- break;
- case NEXT:
- return false;
- case DONE:
- return true;
- default:
- throw new RuntimeException("Unexpected " + res);
- }
- }
- return false;
- }
-
/**
* Check if this memstore may contain the required keys
* @param scan
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=964973&r1=964972&r2=964973&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Jul 16 23:08:57 2010
@@ -1298,49 +1298,18 @@ public class Store implements HeapSize {
public long updateColumnValue(byte [] row, byte [] f,
byte [] qualifier, long newValue)
throws IOException {
- List<KeyValue> result = new ArrayList<KeyValue>();
- KeyComparator keyComparator = this.comparator.getRawComparator();
- KeyValue kv = null;
- // Setting up the QueryMatcher
- Get get = new Get(row);
- NavigableSet<byte[]> qualifiers =
- new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qualifier);
- QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
- keyComparator, 1);
-
- // lock memstore snapshot for this critical section:
this.lock.readLock().lock();
- memstore.readLockLock();
try {
- int memstoreCode = this.memstore.getWithCode(matcher, result);
+ long now = System.currentTimeMillis();
+
+ return this.memstore.updateColumnValue(row,
+ f,
+ qualifier,
+ newValue,
+ now);
- if (memstoreCode != 0) {
- // was in memstore (or snapshot)
- kv = result.get(0).clone();
- byte [] buffer = kv.getBuffer();
- int valueOffset = kv.getValueOffset();
- Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0,
- Bytes.SIZEOF_LONG);
- if (memstoreCode == 2) {
- // from snapshot, assign new TS
- long currTs = System.currentTimeMillis();
- if (currTs == kv.getTimestamp()) {
- currTs++; // unlikely but catastrophic
- }
- Bytes.putBytes(buffer, kv.getTimestampOffset(),
- Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG);
- }
- } else {
- kv = new KeyValue(row, f, qualifier,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue));
- }
- return add(kv);
- // end lock
} finally {
- memstore.readLockUnlock();
this.lock.readLock().unlock();
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=964973&r1=964972&r2=964973&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Fri Jul 16 23:08:57 2010
@@ -843,27 +843,27 @@ public class Bytes {
/**
* Lexographically compare two arrays.
*
- * @param b1 left operand
- * @param b2 right operand
- * @param s1 Where to start comparing in the left buffer
- * @param s2 Where to start comparing in the right buffer
- * @param l1 How much to compare from the left buffer
- * @param l2 How much to compare from the right buffer
+ * @param buffer1 left operand
+ * @param buffer2 right operand
+ * @param offset1 Where to start comparing in the left buffer
+ * @param offset2 Where to start comparing in the right buffer
+ * @param length1 How much to compare from the left buffer
+ * @param length2 How much to compare from the right buffer
* @return 0 if equal, < 0 if left is less than right, etc.
*/
- public static int compareTo(byte[] b1, int s1, int l1,
- byte[] b2, int s2, int l2) {
+ public static int compareTo(byte[] buffer1, int offset1, int length1,
+ byte[] buffer2, int offset2, int length2) {
// Bring WritableComparator code local
- int end1 = s1 + l1;
- int end2 = s2 + l2;
- for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
- int a = (b1[i] & 0xff);
- int b = (b2[j] & 0xff);
+ int end1 = offset1 + length1;
+ int end2 = offset2 + length2;
+ for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
+ int a = (buffer1[i] & 0xff);
+ int b = (buffer2[j] & 0xff);
if (a != b) {
return a - b;
}
}
- return l1 - l2;
+ return length1 - length2;
}
/**
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=964973&r1=964972&r2=964973&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri Jul 16 23:08:57 2010
@@ -1859,7 +1859,8 @@ public class TestHRegion extends HBaseTe
assertEquals(value+amount, result);
Store store = region.getStore(fam1);
- assertEquals(1, store.memstore.kvset.size());
+ // we will have the original Put, and also the ICV'ed Put as well.
+ assertEquals(2, store.memstore.kvset.size());
assertTrue(store.memstore.snapshot.isEmpty());
assertICV(row, fam1, qual1, value+amount);