You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/11/10 05:28:08 UTC
svn commit: r1407725 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: larsh
Date: Sat Nov 10 04:28:07 2012
New Revision: 1407725
URL: http://svn.apache.org/viewvc?rev=1407725&view=rev
Log:
HBASE-4583 Integrate RWCC with Append and Increment operations
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Nov 10 04:28:07 2012
@@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.ipc.HBase
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@@ -4505,11 +4506,7 @@ public class HRegion implements HeapSize
// TODO: There's a lot of boiler plate code identical
// to increment... See how to better unify that.
/**
- *
* Perform one or more append operations on a row.
- * <p>
- * Appends performed are done under row lock but reads do not take locks out
- * so this can be seen partially complete by gets and scans.
*
* @param append
* @param lockid
@@ -4519,7 +4516,6 @@ public class HRegion implements HeapSize
*/
public Result append(Append append, Integer lockid, boolean writeToWAL)
throws IOException {
- // TODO: Use MVCC to make this set of appends atomic to reads
byte[] row = append.getRow();
checkRow(row, "append");
boolean flush = false;
@@ -4533,9 +4529,15 @@ public class HRegion implements HeapSize
// Lock row
startRegionOperation();
this.writeRequestsCount.increment();
+ WriteEntry w = null;
try {
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
+ // wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest state)
+ mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
@@ -4598,6 +4600,7 @@ public class HRegion implements HeapSize
newKV.getBuffer(), newKV.getQualifierOffset(),
kv.getQualifierLength());
+ newKV.setMemstoreTS(w.getWriteNumber());
kvs.add(newKV);
// Append update to WAL
@@ -4627,7 +4630,15 @@ public class HRegion implements HeapSize
//Actually write to Memstore now
for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
- size += store.upsert(entry.getValue());
+ if (store.getFamily().getMaxVersions() == 1) {
+ // upsert if VERSIONS for this CF == 1
+ size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ } else {
+ // otherwise keep older versions around
+ for (KeyValue kv : entry.getValue()) {
+ size += store.add(kv);
+ }
+ }
allKVs.addAll(entry.getValue());
}
size = this.addAndGetGlobalMemstoreSize(size);
@@ -4640,6 +4651,9 @@ public class HRegion implements HeapSize
syncOrDefer(txid); // sync the transaction log outside the rowlock
}
} finally {
+ if (w != null) {
+ mvcc.completeMemstoreInsert(w);
+ }
closeRegionOperation();
}
@@ -4656,11 +4670,7 @@ public class HRegion implements HeapSize
}
/**
- *
* Perform one or more increment operations on a row.
- * <p>
- * Increments performed are done under row lock but reads do not take locks
- * out so this can be seen partially complete by gets and scans.
* @param increment
* @param lockid
* @param writeToWAL
@@ -4670,7 +4680,6 @@ public class HRegion implements HeapSize
public Result increment(Increment increment, Integer lockid,
boolean writeToWAL)
throws IOException {
- // TODO: Use MVCC to make this set of increments atomic to reads
byte [] row = increment.getRow();
checkRow(row, "increment");
TimeRange tr = increment.getTimeRange();
@@ -4685,9 +4694,15 @@ public class HRegion implements HeapSize
// Lock row
startRegionOperation();
this.writeRequestsCount.increment();
+ WriteEntry w = null;
try {
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
+ // wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest state)
+ mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+ // now start my own transaction
+ w = mvcc.beginMemstoreInsert();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
@@ -4726,6 +4741,7 @@ public class HRegion implements HeapSize
// Append new incremented KeyValue to list
KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(),
now, Bytes.toBytes(amount));
+ newKV.setMemstoreTS(w.getWriteNumber());
kvs.add(newKV);
// Prepare WAL updates
@@ -4754,7 +4770,15 @@ public class HRegion implements HeapSize
//Actually write to Memstore now
for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
- size += store.upsert(entry.getValue());
+ if (store.getFamily().getMaxVersions() == 1) {
+ // upsert if VERSIONS for this CF == 1
+ size += store.upsert(entry.getValue(), getSmallestReadPoint());
+ } else {
+ // otherwise keep older versions around
+ for (KeyValue kv : entry.getValue()) {
+ size += store.add(kv);
+ }
+ }
allKVs.addAll(entry.getValue());
}
size = this.addAndGetGlobalMemstoreSize(size);
@@ -4767,6 +4791,9 @@ public class HRegion implements HeapSize
syncOrDefer(txid); // sync the transaction log outside the rowlock
}
} finally {
+ if (w != null) {
+ mvcc.completeMemstoreInsert(w);
+ }
closeRegionOperation();
this.metricsRegion.updateIncrement();
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Sat Nov 10 04:28:07 2012
@@ -2021,7 +2021,19 @@ public class HStore implements Store {
return this.region.regionInfo;
}
- @Override
+ /**
+ * Used in tests. TODO: Remove
+ *
+ * Updates the value for the given row/family/qualifier. This function will always be seen as
+ * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
+ * control necessary.
+ * @param row row to update
+ * @param f family to update
+ * @param qualifier qualifier to update
+ * @param newValue the new value to set into memstore
+ * @return memstore size delta
+ * @throws IOException
+ */
public long updateColumnValue(byte [] row, byte [] f,
byte [] qualifier, long newValue)
throws IOException {
@@ -2042,11 +2054,10 @@ public class HStore implements Store {
}
@Override
- public long upsert(Iterable<KeyValue> kvs) throws IOException {
+ public long upsert(Iterable<KeyValue> kvs, long readpoint) throws IOException {
this.lock.readLock().lock();
try {
- // TODO: Make this operation atomic w/ MVCC
- return this.memstore.upsert(kvs);
+ return this.memstore.upsert(kvs, readpoint);
} finally {
this.lock.readLock().unlock();
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sat Nov 10 04:28:07 2012
@@ -435,6 +435,8 @@ public class MemStore implements HeapSiz
}
/**
+ * Only used by tests. TODO: Remove
+ *
* Given the specs of a column, update it, first by inserting a new record,
* then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS
* will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
@@ -449,7 +451,7 @@ public class MemStore implements HeapSiz
* @param now
* @return Timestamp
*/
- public long updateColumnValue(byte[] row,
+ long updateColumnValue(byte[] row,
byte[] family,
byte[] qualifier,
long newValue,
@@ -497,7 +499,7 @@ public class MemStore implements HeapSiz
// create or update (upsert) a new KeyValue with
// 'now' and a 0 memstoreTS == immediately visible
return upsert(Arrays.asList(
- new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
+ new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))), 1L
);
} finally {
this.lock.readLock().unlock();
@@ -519,15 +521,15 @@ public class MemStore implements HeapSiz
* atomically. Scans will only see each KeyValue update as atomic.
*
* @param kvs
+ * @param readpoint readpoint below which we can safely remove duplicate KVs
* @return change in memstore size
*/
- public long upsert(Iterable<KeyValue> kvs) {
+ public long upsert(Iterable<KeyValue> kvs, long readpoint) {
this.lock.readLock().lock();
try {
long size = 0;
for (KeyValue kv : kvs) {
- kv.setMemstoreTS(0);
- size += upsert(kv);
+ size += upsert(kv, readpoint);
}
return size;
} finally {
@@ -549,7 +551,7 @@ public class MemStore implements HeapSiz
* @param kv
* @return change in size of MemStore
*/
- private long upsert(KeyValue kv) {
+ private long upsert(KeyValue kv, long readpoint) {
// Add the KeyValue to the MemStore
// Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially
@@ -566,6 +568,8 @@ public class MemStore implements HeapSiz
kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
Iterator<KeyValue> it = ss.iterator();
+ // versions visible to oldest scanner
+ int versionsVisible = 0;
while ( it.hasNext() ) {
KeyValue cur = it.next();
@@ -573,23 +577,23 @@ public class MemStore implements HeapSiz
// ignore the one just put in
continue;
}
- // if this isn't the row we are interested in, then bail
- if (!kv.matchingRow(cur)) {
- break;
- }
-
- // if the qualifier matches and it's a put, remove it
- if (kv.matchingQualifier(cur)) {
-
- // to be extra safe we only remove Puts that have a memstoreTS==0
- if (kv.getType() == KeyValue.Type.Put.getCode() &&
- kv.getMemstoreTS() == 0) {
- // false means there was a change, so give us the size.
- addedSize -= heapSizeChange(kv, true);
- it.remove();
+ // check that this is the row and column we are interested in, otherwise bail
+ if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
+ // only remove Puts that concurrent scanners cannot possibly see
+ if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMemstoreTS() <= readpoint) {
+ if (versionsVisible > 1) {
+ // if we get here we have seen at least one version visible to the oldest scanner,
+ // which means we can prove that no scanner will see this version
+
+ // false means there was a change, so give us the size.
+ addedSize -= heapSizeChange(cur, true);
+ it.remove();
+ } else {
+ versionsVisible++;
+ }
}
} else {
- // past the column, done
+ // past the row or column, done
break;
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Nov 10 04:28:07 2012
@@ -74,20 +74,6 @@ public interface Store extends HeapSize
throws IOException;
/**
- * Updates the value for the given row/family/qualifier. This function will always be seen as
- * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
- * control necessary.
- * @param row row to update
- * @param f family to update
- * @param qualifier qualifier to update
- * @param newValue the new value to set into memstore
- * @return memstore size delta
- * @throws IOException
- */
- public long updateColumnValue(byte[] row, byte[] f, byte[] qualifier, long newValue)
- throws IOException;
-
- /**
* Adds or replaces the specified KeyValues.
* <p>
* For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
@@ -96,10 +82,11 @@ public interface Store extends HeapSize
* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
* across all of them.
* @param kvs
+ * @param readpoint readpoint below which we can safely remove duplicate KVs
* @return memstore size delta
* @throws IOException
*/
- public long upsert(Iterable<KeyValue> kvs) throws IOException;
+ public long upsert(Iterable<KeyValue> kvs, long readpoint) throws IOException;
/**
* Adds a value to the memstore
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java Sat Nov 10 04:28:07 2012
@@ -108,7 +108,8 @@ public class TestAtomicOperation extends
public void testIncrementMultiThreads() throws IOException {
LOG.info("Starting test testIncrementMultiThreads");
- initHRegion(tableName, getName(), fam1);
+ // run a with mixed column families (1 and 3 versions)
+ initHRegion(tableName, getName(), new int[] {1,3}, fam1, fam2);
// create 100 threads, each will increment by its own quantity
int numThreads = 100;
@@ -135,6 +136,8 @@ public class TestAtomicOperation extends
}
}
assertICV(row, fam1, qual1, expectedTotal);
+ assertICV(row, fam1, qual2, expectedTotal*2);
+ assertICV(row, fam2, qual3, expectedTotal*3);
LOG.info("testIncrementMultiThreads successfully verified that total is " +
expectedTotal);
}
@@ -156,17 +159,20 @@ public class TestAtomicOperation extends
}
private void initHRegion (byte [] tableName, String callingMethod,
- byte[] ... families)
- throws IOException {
- initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families);
+ byte[] ... families)
+ throws IOException {
+ initHRegion(tableName, callingMethod, null, families);
}
- private void initHRegion (byte [] tableName, String callingMethod,
- Configuration conf, byte [] ... families)
- throws IOException{
+ private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
+ byte[] ... families)
+ throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
+ int i=0;
for(byte [] family : families) {
- htd.addFamily(new HColumnDescriptor(family));
+ HColumnDescriptor hcd = new HColumnDescriptor(family);
+ hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
+ htd.addFamily(hcd);
}
HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
Path path = new Path(DIR + callingMethod);
@@ -175,7 +181,7 @@ public class TestAtomicOperation extends
throw new IOException("Failed delete of " + path);
}
}
- region = HRegion.createHRegion(info, path, conf, htd);
+ region = HRegion.createHRegion(info, path, HBaseConfiguration.create(), htd);
}
/**
@@ -184,18 +190,14 @@ public class TestAtomicOperation extends
public static class Incrementer extends Thread {
private final HRegion region;
- private final int threadNumber;
private final int numIncrements;
private final int amount;
- private int count;
public Incrementer(HRegion region,
int threadNumber, int amount, int numIncrements) {
this.region = region;
- this.threadNumber = threadNumber;
this.numIncrements = numIncrements;
- this.count = 0;
this.amount = amount;
setDaemon(true);
}
@@ -206,16 +208,79 @@ public class TestAtomicOperation extends
try {
Increment inc = new Increment(row);
inc.addColumn(fam1, qual1, amount);
- Result result = region.increment(inc, null, true);
- // LOG.info("thread:" + threadNumber + " iter:" + i);
+ inc.addColumn(fam1, qual2, amount*2);
+ inc.addColumn(fam2, qual3, amount*3);
+ region.increment(inc, null, true);
+
+ // verify: Make sure we only see completed increments
+ Get g = new Get(row);
+ Result result = region.get(g, null);
+ assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2)));
+ assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3)));
} catch (IOException e) {
e.printStackTrace();
}
- count++;
}
}
}
+ public void testAppendMultiThreads() throws IOException {
+ LOG.info("Starting test testAppendMultiThreads");
+ // run a with mixed column families (1 and 3 versions)
+ initHRegion(tableName, getName(), new int[] {1,3}, fam1, fam2);
+
+ int numThreads = 100;
+ int opsPerThread = 100;
+ AtomicOperation[] all = new AtomicOperation[numThreads];
+ final byte[] val = new byte[]{1};
+
+ AtomicInteger failures = new AtomicInteger(0);
+ // create all threads
+ for (int i = 0; i < numThreads; i++) {
+ all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
+ @Override
+ public void run() {
+ for (int i=0; i<numOps; i++) {
+ try {
+ Append a = new Append(row);
+ a.add(fam1, qual1, val);
+ a.add(fam1, qual2, val);
+ a.add(fam2, qual3, val);
+ region.append(a, null, true);
+
+ Get g = new Get(row);
+ Result result = region.get(g, null);
+ assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
+ assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
+ } catch (IOException e) {
+ e.printStackTrace();
+ failures.incrementAndGet();
+ fail();
+ }
+ }
+ }
+ };
+ }
+
+ // run all threads
+ for (int i = 0; i < numThreads; i++) {
+ all[i].start();
+ }
+
+ // wait for all threads to finish
+ for (int i = 0; i < numThreads; i++) {
+ try {
+ all[i].join();
+ } catch (InterruptedException e) {
+ }
+ }
+ assertEquals(0, failures.get());
+ Get g = new Get(row);
+ Result result = region.get(g, null);
+ assertEquals(result.getValue(fam1, qual1).length, 10000);
+ assertEquals(result.getValue(fam1, qual2).length, 10000);
+ assertEquals(result.getValue(fam2, qual3).length, 10000);
+ }
/**
* Test multi-threaded row mutations.
*/