You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/09/11 23:29:08 UTC
svn commit: r814038 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/regionserver/
src/test/org/apache/hadoop/hbase/regionserver/
Author: stack
Date: Fri Sep 11 21:28:36 2009
New Revision: 814038
URL: http://svn.apache.org/viewvc?rev=814038&view=rev
Log:
HBASE-1740 ICV has a subtle race condition only visible under high load
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Sep 11 21:28:36 2009
@@ -18,6 +18,7 @@
get tossed as 'duplicates'
HBASE-1794 recovered log files are not inserted into the storefile map
HBASE-1824 [stargate] default timestamp should be LATEST_TIMESTAMP
+ HBASE-1740 ICV has a subtle race condition only visible under high load
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Sep 11 21:28:36 2009
@@ -2317,25 +2317,47 @@
boolean flush = false;
// Lock row
Integer lid = obtainRowLock(row);
- long result = 0L;
+ long result = amount;
try {
Store store = stores.get(family);
- // Determine what to do and perform increment on returned KV, no insertion
- Store.ICVResult vas =
- store.incrementColumnValue(row, family, qualifier, amount);
- // Write incremented value to WAL before inserting
+
+ // Get the old value:
+ Get get = new Get(row);
+ get.addColumn(family, qualifier);
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ NavigableSet<byte[]> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ qualifiers.add(qualifier);
+ store.get(get, qualifiers, results);
+
+ if (!results.isEmpty()) {
+ byte [] oldValue = results.get(0).getValue();
+ KeyValue kv = results.get(0);
+ byte [] buffer = kv.getBuffer();
+ int valueOffset = kv.getValueOffset();
+ result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
+ }
+
+ // bulid the KeyValue now:
+ KeyValue newKv = new KeyValue(row, family,
+ qualifier, System.currentTimeMillis(),
+ Bytes.toBytes(result));
+
+ // now log it:
if (writeToWAL) {
long now = System.currentTimeMillis();
List<KeyValue> edits = new ArrayList<KeyValue>(1);
- edits.add(vas.kv);
+ edits.add(newKv);
this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), edits,
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
}
- // Insert to the Store
- store.add(vas.kv);
- result = vas.value;
- long size = this.memstoreSize.addAndGet(vas.sizeAdded);
+
+ // Now request the ICV to the store, this will set the timestamp
+ // appropriately depending on if there is a value in memcache or not.
+ // returns the
+ long size = store.updateColumnValue(row, family, qualifier, result);
+
+ size = this.memstoreSize.addAndGet(size);
flush = isFlushSize(size);
} finally {
releaseRowLock(lid);
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Sep 11 21:28:36 2009
@@ -496,6 +496,43 @@
this.lock.readLock().unlock();
}
}
+
+ /**
+ * Gets from either the memstore or the snapshop, and returns a code
+ * to let you know which is which.
+ *
+ * @param matcher
+ * @param result
+ * @return 1 == memstore, 2 == snapshot, 0 == none
+ */
+ int getWithCode(QueryMatcher matcher, List<KeyValue> result) throws IOException {
+ this.lock.readLock().lock();
+ try {
+ boolean fromMemstore = internalGet(this.kvset, matcher, result);
+ if (fromMemstore || matcher.isDone())
+ return 1;
+
+ matcher.update();
+ boolean fromSnapshot = internalGet(this.snapshot, matcher, result);
+ if (fromSnapshot || matcher.isDone())
+ return 2;
+
+ return 0;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Small utility functions for use by Store.incrementColumnValue
+ * _only_ under the threat of pain and everlasting race conditions.
+ */
+ void readLockLock() {
+ this.lock.readLock().lock();
+ }
+ void readLockUnlock() {
+ this.lock.readLock().unlock();
+ }
/**
*
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Sep 11 21:28:36 2009
@@ -436,7 +436,7 @@
lock.readLock().unlock();
}
}
-
+
/**
* Adds a value to the memstore
*
@@ -1462,37 +1462,22 @@
scanner.get(result);
}
- /*
- * Data structure to hold incrementColumnValue result.
- */
- static class ICVResult {
- final long value;
- final long sizeAdded;
- final KeyValue kv;
-
- ICVResult(long value, long sizeAdded, KeyValue kv) {
- this.value = value;
- this.sizeAdded = sizeAdded;
- this.kv = kv;
- }
- }
-
/**
* Increments the value for the given row/family/qualifier
* @param row
* @param f
* @param qualifier
- * @param amount
- * @return The new value.
+ * @param newValue the new value to set into memstore
+ * @return memstore size delta
* @throws IOException
*/
- public ICVResult incrementColumnValue(byte [] row, byte [] f,
- byte [] qualifier, long amount)
+ public long updateColumnValue(byte [] row, byte [] f,
+ byte [] qualifier, long newValue)
throws IOException {
- long value = 0;
List<KeyValue> result = new ArrayList<KeyValue>();
KeyComparator keyComparator = this.comparator.getRawComparator();
+ KeyValue kv = null;
// Setting up the QueryMatcher
Get get = new Get(row);
NavigableSet<byte[]> qualifiers =
@@ -1501,78 +1486,41 @@
QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
keyComparator, 1);
- boolean newTs = true;
- KeyValue kv = null;
- // Read from memstore first:
- this.memstore.internalGet(this.memstore.kvset,
- matcher, result);
- if (!result.isEmpty()) {
- kv = result.get(0).clone();
- newTs = false;
- } else {
- // try the snapshot.
- this.memstore.internalGet(this.memstore.snapshot,
- matcher, result);
- if (!result.isEmpty()) {
- kv = result.get(0).clone();
- }
- }
+ // lock memstore snapshot for this critical section:
+ this.lock.readLock().lock();
+ memstore.readLockLock();
+ try {
+ int memstoreCode = this.memstore.getWithCode(matcher, result);
- if (kv != null) {
- // Received early-out from memstore
- // Make a copy of the KV and increment it
- byte [] buffer = kv.getBuffer();
- int valueOffset = kv.getValueOffset();
- value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
- Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0,
- Bytes.SIZEOF_LONG);
- if (newTs) {
- long currTs = System.currentTimeMillis();
- if (currTs == kv.getTimestamp()) {
- currTs++; // just in case something wacky happens.
- }
- byte [] stampBytes = Bytes.toBytes(currTs);
- Bytes.putBytes(buffer, kv.getTimestampOffset(), stampBytes, 0,
+ if (memstoreCode != 0) {
+ // was in memstore (or snapshot)
+ kv = result.get(0).clone();
+ byte [] buffer = kv.getBuffer();
+ int valueOffset = kv.getValueOffset();
+ Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0,
Bytes.SIZEOF_LONG);
+ if (memstoreCode == 2) {
+ // from snapshot, assign new TS
+ long currTs = System.currentTimeMillis();
+ if (currTs == kv.getTimestamp()) {
+ currTs++; // unlikely but catastrophic
+ }
+ Bytes.putBytes(buffer, kv.getTimestampOffset(),
+ Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG);
+ }
+ } else {
+ kv = new KeyValue(row, f, qualifier,
+ System.currentTimeMillis(),
+ Bytes.toBytes(newValue));
}
- return new ICVResult(value, 0, kv);
- }
- // Check if we even have storefiles
- if(this.storefiles.isEmpty()) {
- return createNewKeyValue(row, f, qualifier, value, amount);
- }
-
- // Get storefiles for this store
- List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
- for (StoreFile sf : this.storefiles.descendingMap().values()) {
- Reader r = sf.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + sf + " has a null Reader");
- continue;
- }
- storefileScanners.add(r.getScanner());
- }
-
- // StoreFileGetScan will handle reading this store's storefiles
- StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
-
- // Run a GET scan and put results into the specified list
- scanner.get(result);
- if(result.size() > 0) {
- value = Bytes.toLong(result.get(0).getValue());
+ return add(kv);
+ // end lock
+ } finally {
+ memstore.readLockUnlock();
+ this.lock.readLock().unlock();
}
- return createNewKeyValue(row, f, qualifier, value, amount);
}
- private ICVResult createNewKeyValue(byte [] row, byte [] f,
- byte [] qualifier, long value, long amount) {
- long newValue = value + amount;
- KeyValue newKv = new KeyValue(row, f, qualifier,
- System.currentTimeMillis(),
- Bytes.toBytes(newValue));
- return new ICVResult(newValue, newKv.heapSize(), newKv);
- }
-
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (17 * ClassSize.REFERENCE) +
(5 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN +
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri Sep 11 21:28:36 2009
@@ -60,9 +60,13 @@
// Test names
private final byte[] tableName = Bytes.toBytes("testtable");;
private final byte[] qual1 = Bytes.toBytes("qual1");
+ private final byte[] qual2 = Bytes.toBytes("qual2");
+ private final byte[] qual3 = Bytes.toBytes("qual3");
private final byte[] value1 = Bytes.toBytes("value1");
private final byte[] value2 = Bytes.toBytes("value2");
private final byte [] row = Bytes.toBytes("rowA");
+ private final byte [] row2 = Bytes.toBytes("rowB");
+ private final byte [] row3 = Bytes.toBytes("rowC");
/**
* @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
@@ -1246,7 +1250,6 @@
byte [] col2 = Bytes.toBytes("Pub222");
-
Put put = new Put(row1);
put.add(family, col1, Bytes.toBytes(10L));
region.put(put);
@@ -1275,11 +1278,166 @@
List<KeyValue> results = new ArrayList<KeyValue>();
assertEquals(false, s.next(results));
assertEquals(0, results.size());
+ }
+ public void testIncrementColumnValue_UpdatingInPlace() throws IOException {
+ initHRegion(tableName, getName(), fam1);
+ long value = 1L;
+ long amount = 3L;
+
+ Put put = new Put(row);
+ put.add(fam1, qual1, Bytes.toBytes(value));
+ region.put(put);
+ long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
+ assertEquals(value+amount, result);
+
+ Store store = region.getStore(fam1);
+ assertEquals(1, store.memstore.kvset.size());
+ assertTrue(store.memstore.snapshot.isEmpty());
+
+ assertICV(row, fam1, qual1, value+amount);
}
+
+ public void testIncrementColumnValue_ConcurrentFlush() throws IOException {
+ initHRegion(tableName, getName(), fam1);
+
+ long value = 1L;
+ long amount = 3L;
+
+ Put put = new Put(row);
+ put.add(fam1, qual1, Bytes.toBytes(value));
+ region.put(put);
+
+ // now increment during a flush
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ region.flushcache();
+ } catch (IOException e) {
+ LOG.info("test ICV, got IOE during flushcache()");
+ }
+ }
+ };
+ t.start();
+ long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
+ assertEquals(value+amount, r);
+
+ // this also asserts there is only 1 KeyValue in the set.
+ assertICV(row, fam1, qual1, value+amount);
+ }
+
+ public void testIncrementColumnValue_UpdatingInPlace_Negative()
+ throws IOException {
+ initHRegion(tableName, getName(), fam1);
+
+ long value = 3L;
+ long amount = -1L;
+
+ Put put = new Put(row);
+ put.add(fam1, qual1, Bytes.toBytes(value));
+ region.put(put);
+
+ long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
+ assertEquals(value+amount, result);
+
+ assertICV(row, fam1, qual1, value+amount);
+ }
+
+ public void testIncrementColumnValue_AddingNew()
+ throws IOException {
+ initHRegion(tableName, getName(), fam1);
+
+ long value = 1L;
+ long amount = 3L;
+
+ Put put = new Put(row);
+ put.add(fam1, qual1, Bytes.toBytes(value));
+ put.add(fam1, qual2, Bytes.toBytes(value));
+ region.put(put);
+
+ long result = region.incrementColumnValue(row, fam1, qual3, amount, true);
+ assertEquals(amount, result);
+
+ Get get = new Get(row);
+ get.addColumn(fam1, qual3);
+ Result rr = region.get(get, null);
+ assertEquals(1, rr.size());
+
+ // ensure none of the other cols were incremented.
+ assertICV(row, fam1, qual1, value);
+ assertICV(row, fam1, qual2, value);
+ assertICV(row, fam1, qual3, amount);
+ }
+
+ public void testIncrementColumnValue_UpdatingFromSF() throws IOException {
+ initHRegion(tableName, getName(), fam1);
+
+ long value = 1L;
+ long amount = 3L;
+
+ Put put = new Put(row);
+ put.add(fam1, qual1, Bytes.toBytes(value));
+ put.add(fam1, qual2, Bytes.toBytes(value));
+ region.put(put);
+
+ // flush to disk.
+ region.flushcache();
+
+ Store store = region.getStore(fam1);
+ assertEquals(0, store.memstore.kvset.size());
+
+ long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
+ assertEquals(value+amount, r);
+
+ assertICV(row, fam1, qual1, value+amount);
+ }
+
+ public void testIncrementColumnValue_AddingNewAfterSFCheck()
+ throws IOException {
+ initHRegion(tableName, getName(), fam1);
+
+ long value = 1L;
+ long amount = 3L;
+
+ Put put = new Put(row);
+ put.add(fam1, qual1, Bytes.toBytes(value));
+ put.add(fam1, qual2, Bytes.toBytes(value));
+ region.put(put);
+ region.flushcache();
+
+ Store store = region.getStore(fam1);
+ assertEquals(0, store.memstore.kvset.size());
+
+ long r = region.incrementColumnValue(row, fam1, qual3, amount, true);
+ assertEquals(amount, r);
+
+ assertICV(row, fam1, qual3, amount);
+
+ region.flushcache();
+
+ // ensure that this gets to disk.
+ assertICV(row, fam1, qual3, amount);
+ }
+
+ private void assertICV(byte [] row,
+ byte [] familiy,
+ byte[] qualifier,
+ long amount) throws IOException {
+ // run a get and see?
+ Get get = new Get(row);
+ get.addColumn(familiy, qualifier);
+ Result result = region.get(get, null);
+ assertEquals(1, result.size());
+
+ KeyValue kv = result.raw()[0];
+ long r = Bytes.toLong(kv.getValue());
+ assertEquals(amount, r);
+ }
+
+
public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions()
throws IOException {
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Fri Sep 11 21:28:36 2009
@@ -232,163 +232,40 @@
//////////////////////////////////////////////////////////////////////////////
// IncrementColumnValue tests
//////////////////////////////////////////////////////////////////////////////
- /**
- * Testing if the update in place works. When you want to update a value that
- * is already in memstore, you don't delete it and put a new one, but just
- * update the value in the original KeyValue
- * @throws IOException
+ /*
+ * test the internal details of how ICV works, especially during a flush scenario.
*/
- public void testIncrementColumnValue_UpdatingInPlace() throws IOException {
- init(this.getName());
-
- //Put data in memstore
- long value = 1L;
- long amount = 3L;
- this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
-
- Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
- assertEquals(value+amount, vas.value);
- store.add(vas.kv);
- Get get = new Get(row);
- get.addColumn(family, qf1);
- NavigableSet<byte[]> qualifiers =
- new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qf1);
- List<KeyValue> result = new ArrayList<KeyValue>();
- this.store.get(get, qualifiers, result);
- assertEquals(value + amount, Bytes.toLong(result.get(0).getValue()));
- }
-
- /**
- * Same as above but for a negative number
- * @throws IOException
- */
- public void testIncrementColumnValue_UpdatingInPlace_Negative()
- throws IOException {
- init(this.getName());
-
- //Put data in memstore
- long value = 3L;
- long amount = -1L;
- this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
-
- Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
- assertEquals(vas.value, value+amount);
- store.add(vas.kv);
- Get get = new Get(row);
- get.addColumn(family, qf1);
- NavigableSet<byte[]> qualifiers =
- new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qf1);
- List<KeyValue> result = new ArrayList<KeyValue>();
- this.store.get(get, qualifiers, result);
- assertEquals(value + amount, Bytes.toLong(result.get(0).getValue()));
- }
-
- /**
- * When there is no mathing key already, adding a new.
- * @throws IOException
- */
- public void testIncrementColumnValue_AddingNew() throws IOException {
- init(this.getName());
-
- //Put data in memstore
- long value = 1L;
- long amount = 3L;
- this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
- this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
-
- Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount);
- store.add(vas.kv);
- Get get = new Get(row);
- get.addColumn(family, qf3);
- NavigableSet<byte[]> qualifiers =
- new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qf3);
- List<KeyValue> result = new ArrayList<KeyValue>();
- this.store.get(get, qualifiers, result);
- assertEquals(amount, Bytes.toLong(result.get(0).getValue()));
- }
-
- /**
- * When we have the key in a file add a new key + value to memstore with the
- * updates value.
- * @throws IOException
- */
- public void testIncrementColumnValue_UpdatingFromSF() throws IOException {
- init(this.getName());
-
- //Put data in memstore
- long value = 1L;
- long amount = 3L;
- this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
- this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
-
- flush(1);
-
- Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
- store.add(vas.kv);
- Get get = new Get(row);
- get.addColumn(family, qf1);
- NavigableSet<byte[]> qualifiers =
- new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qf1);
- List<KeyValue> result = new ArrayList<KeyValue>();
- this.store.get(get, qualifiers, result);
- assertEquals(value + amount, Bytes.toLong(result.get(0).getValue()));
- }
-
- /**
- * Same as testIncrementColumnValue_AddingNew() except that the keys are
- * checked in file not in memstore
- * @throws IOException
- */
- public void testIncrementColumnValue_AddingNewAfterSFCheck()
- throws IOException {
- init(this.getName());
-
- //Put data in memstore
- long value = 1L;
- long amount = 3L;
- this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
- this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
-
- flush(1);
-
- Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount);
- store.add(vas.kv);
- Get get = new Get(row);
- get.addColumn(family, qf3);
- NavigableSet<byte[]> qualifiers =
- new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qf3);
- List<KeyValue> result = new ArrayList<KeyValue>();
- this.store.get(get, qualifiers, result);
- assertEquals(amount, Bytes.toLong(result.get(0).getValue()));
- }
-
public void testIncrementColumnValue_ICVDuringFlush()
throws IOException {
init(this.getName());
- long value = 1L;
- long amount = 3L;
+ long oldValue = 1L;
+ long newValue = 3L;
this.store.add(new KeyValue(row, family, qf1,
System.currentTimeMillis(),
- Bytes.toBytes(value)));
+ Bytes.toBytes(oldValue)));
// snapshot the store.
this.store.snapshot();
- // incrment during the snapshot...
+ // add other things:
+ this.store.add(new KeyValue(row, family, qf2,
+ System.currentTimeMillis(),
+ Bytes.toBytes(oldValue)));
+
+ // update during the snapshot.
+ long ret = this.store.updateColumnValue(row, family, qf1, newValue);
- Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
+ // memstore should have grown by some amount.
+ assertTrue(ret > 0);
// then flush.
this.store.flushCache(id++);
assertEquals(1, this.store.getStorefiles().size());
- assertEquals(0, this.store.memstore.kvset.size());
+ // from the one we inserted up there, and a new one
+ assertEquals(2, this.store.memstore.kvset.size());
+ // how many key/values for this row are there?
Get get = new Get(row);
get.addColumn(family, qf1);
get.setMaxVersions(); // all versions.
@@ -398,12 +275,15 @@
cols.add(qf1);
this.store.get(get, cols, results);
- // only one, because Store.ICV doesnt add to memcache.
- assertEquals(1, results.size());
+ assertEquals(2, results.size());
+
+ long ts1 = results.get(0).getTimestamp();
+ long ts2 = results.get(1).getTimestamp();
+
+ assertTrue(ts1 > ts2);
+
+ assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
+ assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
- // but the timestamps should be different...
- long icvTs = vas.kv.getTimestamp();
- long storeTs = results.get(0).getTimestamp();
- assertTrue(icvTs != storeTs);
}
}
\ No newline at end of file