You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/11/10 05:28:08 UTC

svn commit: r1407725 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: larsh
Date: Sat Nov 10 04:28:07 2012
New Revision: 1407725

URL: http://svn.apache.org/viewvc?rev=1407725&view=rev
Log:
HBASE-4583 Integrate RWCC with Append and Increment operations

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Nov 10 04:28:07 2012
@@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.ipc.HBase
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@@ -4505,11 +4506,7 @@ public class HRegion implements HeapSize
   // TODO: There's a lot of boiler plate code identical
   // to increment... See how to better unify that.
   /**
-   *
    * Perform one or more append operations on a row.
-   * <p>
-   * Appends performed are done under row lock but reads do not take locks out
-   * so this can be seen partially complete by gets and scans.
    *
    * @param append
    * @param lockid
@@ -4519,7 +4516,6 @@ public class HRegion implements HeapSize
    */
   public Result append(Append append, Integer lockid, boolean writeToWAL)
       throws IOException {
-    // TODO: Use MVCC to make this set of appends atomic to reads
     byte[] row = append.getRow();
     checkRow(row, "append");
     boolean flush = false;
@@ -4533,9 +4529,15 @@ public class HRegion implements HeapSize
     // Lock row
     startRegionOperation();
     this.writeRequestsCount.increment();
+    WriteEntry w = null;
     try {
       Integer lid = getLock(lockid, row, true);
       this.updatesLock.readLock().lock();
+      // wait for all prior MVCC transactions to finish - while we hold the row lock
+      // (so that we are guaranteed to see the latest state)
+      mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+      // now start my own transaction
+      w = mvcc.beginMemstoreInsert();
       try {
         long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
@@ -4598,6 +4600,7 @@ public class HRegion implements HeapSize
                 newKV.getBuffer(), newKV.getQualifierOffset(),
                 kv.getQualifierLength());
 
+            newKV.setMemstoreTS(w.getWriteNumber());
             kvs.add(newKV);
 
             // Append update to WAL
@@ -4627,7 +4630,15 @@ public class HRegion implements HeapSize
         //Actually write to Memstore now
         for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
           Store store = entry.getKey();
-          size += store.upsert(entry.getValue());
+          if (store.getFamily().getMaxVersions() == 1) {
+            // upsert if VERSIONS for this CF == 1
+            size += store.upsert(entry.getValue(), getSmallestReadPoint());
+          } else {
+            // otherwise keep older versions around
+            for (KeyValue kv : entry.getValue()) {
+              size += store.add(kv);
+            }
+          }
           allKVs.addAll(entry.getValue());
         }
         size = this.addAndGetGlobalMemstoreSize(size);
@@ -4640,6 +4651,9 @@ public class HRegion implements HeapSize
         syncOrDefer(txid); // sync the transaction log outside the rowlock
       }
     } finally {
+      if (w != null) {
+        mvcc.completeMemstoreInsert(w);
+      }
       closeRegionOperation();
     }
 
@@ -4656,11 +4670,7 @@ public class HRegion implements HeapSize
   }
 
   /**
-   *
    * Perform one or more increment operations on a row.
-   * <p>
-   * Increments performed are done under row lock but reads do not take locks
-   * out so this can be seen partially complete by gets and scans.
    * @param increment
    * @param lockid
    * @param writeToWAL
@@ -4670,7 +4680,6 @@ public class HRegion implements HeapSize
   public Result increment(Increment increment, Integer lockid,
       boolean writeToWAL)
   throws IOException {
-    // TODO: Use MVCC to make this set of increments atomic to reads
     byte [] row = increment.getRow();
     checkRow(row, "increment");
     TimeRange tr = increment.getTimeRange();
@@ -4685,9 +4694,15 @@ public class HRegion implements HeapSize
     // Lock row
     startRegionOperation();
     this.writeRequestsCount.increment();
+    WriteEntry w = null;
     try {
       Integer lid = getLock(lockid, row, true);
       this.updatesLock.readLock().lock();
+      // wait for all prior MVCC transactions to finish - while we hold the row lock
+      // (so that we are guaranteed to see the latest state)
+      mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+      // now start my own transaction
+      w = mvcc.beginMemstoreInsert();
       try {
         long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
@@ -4726,6 +4741,7 @@ public class HRegion implements HeapSize
             // Append new incremented KeyValue to list
             KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(),
                 now, Bytes.toBytes(amount));
+            newKV.setMemstoreTS(w.getWriteNumber());
             kvs.add(newKV);
 
             // Prepare WAL updates
@@ -4754,7 +4770,15 @@ public class HRegion implements HeapSize
         //Actually write to Memstore now
         for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
           Store store = entry.getKey();
-          size += store.upsert(entry.getValue());
+          if (store.getFamily().getMaxVersions() == 1) {
+            // upsert if VERSIONS for this CF == 1
+            size += store.upsert(entry.getValue(), getSmallestReadPoint());
+          } else {
+            // otherwise keep older versions around
+            for (KeyValue kv : entry.getValue()) {
+              size += store.add(kv);
+            }
+          }
           allKVs.addAll(entry.getValue());
         }
         size = this.addAndGetGlobalMemstoreSize(size);
@@ -4767,6 +4791,9 @@ public class HRegion implements HeapSize
         syncOrDefer(txid); // sync the transaction log outside the rowlock
       }
     } finally {
+      if (w != null) {
+        mvcc.completeMemstoreInsert(w);
+      }
       closeRegionOperation();
       this.metricsRegion.updateIncrement();
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Sat Nov 10 04:28:07 2012
@@ -2021,7 +2021,19 @@ public class HStore implements Store {
     return this.region.regionInfo;
   }
 
-  @Override
+  /**
+   * Used in tests. TODO: Remove
+   *
+   * Updates the value for the given row/family/qualifier. This function will always be seen as
+   * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
+   * control necessary.
+   * @param row row to update
+   * @param f family to update
+   * @param qualifier qualifier to update
+   * @param newValue the new value to set into memstore
+   * @return memstore size delta
+   * @throws IOException
+   */
   public long updateColumnValue(byte [] row, byte [] f,
                                 byte [] qualifier, long newValue)
       throws IOException {
@@ -2042,11 +2054,10 @@ public class HStore implements Store {
   }
 
   @Override
-  public long upsert(Iterable<KeyValue> kvs) throws IOException {
+  public long upsert(Iterable<KeyValue> kvs, long readpoint) throws IOException {
     this.lock.readLock().lock();
     try {
-      // TODO: Make this operation atomic w/ MVCC
-      return this.memstore.upsert(kvs);
+      return this.memstore.upsert(kvs, readpoint);
     } finally {
       this.lock.readLock().unlock();
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sat Nov 10 04:28:07 2012
@@ -435,6 +435,8 @@ public class MemStore implements HeapSiz
   }
 
   /**
+   * Only used by tests. TODO: Remove
+   *
    * Given the specs of a column, update it, first by inserting a new record,
    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
@@ -449,7 +451,7 @@ public class MemStore implements HeapSiz
    * @param now
    * @return  Timestamp
    */
-  public long updateColumnValue(byte[] row,
+  long updateColumnValue(byte[] row,
                                 byte[] family,
                                 byte[] qualifier,
                                 long newValue,
@@ -497,7 +499,7 @@ public class MemStore implements HeapSiz
       // create or update (upsert) a new KeyValue with
       // 'now' and a 0 memstoreTS == immediately visible
       return upsert(Arrays.asList(
-          new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
+          new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))), 1L
       );
     } finally {
       this.lock.readLock().unlock();
@@ -519,15 +521,15 @@ public class MemStore implements HeapSiz
    * atomically.  Scans will only see each KeyValue update as atomic.
    *
    * @param kvs
+   * @param readpoint readpoint below which we can safely remove duplicate KVs 
    * @return change in memstore size
    */
-  public long upsert(Iterable<KeyValue> kvs) {
+  public long upsert(Iterable<KeyValue> kvs, long readpoint) {
    this.lock.readLock().lock();
     try {
       long size = 0;
       for (KeyValue kv : kvs) {
-        kv.setMemstoreTS(0);
-        size += upsert(kv);
+        size += upsert(kv, readpoint);
       }
       return size;
     } finally {
@@ -549,7 +551,7 @@ public class MemStore implements HeapSiz
    * @param kv
    * @return change in size of MemStore
    */
-  private long upsert(KeyValue kv) {
+  private long upsert(KeyValue kv, long readpoint) {
     // Add the KeyValue to the MemStore
     // Use the internalAdd method here since we (a) already have a lock
     // and (b) cannot safely use the MSLAB here without potentially
@@ -566,6 +568,8 @@ public class MemStore implements HeapSiz
         kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
     SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
     Iterator<KeyValue> it = ss.iterator();
+    // versions visible to oldest scanner
+    int versionsVisible = 0;
     while ( it.hasNext() ) {
       KeyValue cur = it.next();
 
@@ -573,23 +577,23 @@ public class MemStore implements HeapSiz
         // ignore the one just put in
         continue;
       }
-      // if this isn't the row we are interested in, then bail
-      if (!kv.matchingRow(cur)) {
-        break;
-      }
-
-      // if the qualifier matches and it's a put, remove it
-      if (kv.matchingQualifier(cur)) {
-
-        // to be extra safe we only remove Puts that have a memstoreTS==0
-        if (kv.getType() == KeyValue.Type.Put.getCode() &&
-            kv.getMemstoreTS() == 0) {
-          // false means there was a change, so give us the size.
-          addedSize -= heapSizeChange(kv, true);
-          it.remove();
+      // check that this is the row and column we are interested in, otherwise bail
+      if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
+        // only remove Puts that concurrent scanners cannot possibly see
+        if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMemstoreTS() <= readpoint) {
+          if (versionsVisible > 1) {
+            // if we get here we have seen at least one version visible to the oldest scanner,
+            // which means we can prove that no scanner will see this version
+
+            // false means there was a change, so give us the size.
+            addedSize -= heapSizeChange(cur, true);
+            it.remove();
+          } else {
+            versionsVisible++;
+          }
         }
       } else {
-        // past the column, done
+        // past the row or column, done
         break;
       }
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Nov 10 04:28:07 2012
@@ -74,20 +74,6 @@ public interface Store extends  HeapSize
       throws IOException;
 
   /**
-   * Updates the value for the given row/family/qualifier. This function will always be seen as
-   * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
-   * control necessary.
-   * @param row row to update
-   * @param f family to update
-   * @param qualifier qualifier to update
-   * @param newValue the new value to set into memstore
-   * @return memstore size delta
-   * @throws IOException
-   */
-  public long updateColumnValue(byte[] row, byte[] f, byte[] qualifier, long newValue)
-      throws IOException;
-
-  /**
    * Adds or replaces the specified KeyValues.
    * <p>
    * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
@@ -96,10 +82,11 @@ public interface Store extends  HeapSize
    * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
    * across all of them.
    * @param kvs
+   * @param readpoint readpoint below which we can safely remove duplicate KVs 
    * @return memstore size delta
    * @throws IOException
    */
-  public long upsert(Iterable<KeyValue> kvs) throws IOException;
+  public long upsert(Iterable<KeyValue> kvs, long readpoint) throws IOException;
 
   /**
    * Adds a value to the memstore

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1407725&r1=1407724&r2=1407725&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java Sat Nov 10 04:28:07 2012
@@ -108,7 +108,8 @@ public class TestAtomicOperation extends
   public void testIncrementMultiThreads() throws IOException {
 
     LOG.info("Starting test testIncrementMultiThreads");
-    initHRegion(tableName, getName(), fam1);
+    // run a with mixed column families (1 and 3 versions)
+    initHRegion(tableName, getName(), new int[] {1,3}, fam1, fam2);
 
     // create 100 threads, each will increment by its own quantity
     int numThreads = 100;
@@ -135,6 +136,8 @@ public class TestAtomicOperation extends
       }
     }
     assertICV(row, fam1, qual1, expectedTotal);
+    assertICV(row, fam1, qual2, expectedTotal*2);
+    assertICV(row, fam2, qual3, expectedTotal*3);
     LOG.info("testIncrementMultiThreads successfully verified that total is " +
              expectedTotal);
   }
@@ -156,17 +159,20 @@ public class TestAtomicOperation extends
   }
 
   private void initHRegion (byte [] tableName, String callingMethod,
-    byte[] ... families)
-  throws IOException {
-    initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families);
+      byte[] ... families)
+    throws IOException {
+    initHRegion(tableName, callingMethod, null, families);
   }
 
-  private void initHRegion (byte [] tableName, String callingMethod,
-    Configuration conf, byte [] ... families)
-  throws IOException{
+  private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
+    byte[] ... families)
+  throws IOException {
     HTableDescriptor htd = new HTableDescriptor(tableName);
+    int i=0;
     for(byte [] family : families) {
-      htd.addFamily(new HColumnDescriptor(family));
+      HColumnDescriptor hcd = new HColumnDescriptor(family);
+      hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
+      htd.addFamily(hcd);
     }
     HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
     Path path = new Path(DIR + callingMethod);
@@ -175,7 +181,7 @@ public class TestAtomicOperation extends
         throw new IOException("Failed delete of " + path);
       }
     }
-    region = HRegion.createHRegion(info, path, conf, htd);
+    region = HRegion.createHRegion(info, path, HBaseConfiguration.create(), htd);
   }
 
   /**
@@ -184,18 +190,14 @@ public class TestAtomicOperation extends
   public static class Incrementer extends Thread {
 
     private final HRegion region;
-    private final int threadNumber;
     private final int numIncrements;
     private final int amount;
 
-    private int count;
 
     public Incrementer(HRegion region,
         int threadNumber, int amount, int numIncrements) {
       this.region = region;
-      this.threadNumber = threadNumber;
       this.numIncrements = numIncrements;
-      this.count = 0;
       this.amount = amount;
       setDaemon(true);
     }
@@ -206,16 +208,79 @@ public class TestAtomicOperation extends
         try {
           Increment inc = new Increment(row);
           inc.addColumn(fam1, qual1, amount);
-          Result result = region.increment(inc, null, true);
-          // LOG.info("thread:" + threadNumber + " iter:" + i);
+          inc.addColumn(fam1, qual2, amount*2);
+          inc.addColumn(fam2, qual3, amount*3);
+          region.increment(inc, null, true);
+
+          // verify: Make sure we only see completed increments
+          Get g = new Get(row);
+          Result result = region.get(g, null);
+          assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); 
+          assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3)));
         } catch (IOException e) {
           e.printStackTrace();
         }
-        count++;
       }
     }
   }
 
+  public void testAppendMultiThreads() throws IOException {
+    LOG.info("Starting test testAppendMultiThreads");
+    // run a with mixed column families (1 and 3 versions)
+    initHRegion(tableName, getName(), new int[] {1,3}, fam1, fam2);
+
+    int numThreads = 100;
+    int opsPerThread = 100;
+    AtomicOperation[] all = new AtomicOperation[numThreads];
+    final byte[] val = new byte[]{1};
+
+    AtomicInteger failures = new AtomicInteger(0);
+    // create all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
+        @Override
+        public void run() {
+          for (int i=0; i<numOps; i++) {
+            try {
+              Append a = new Append(row);
+              a.add(fam1, qual1, val);
+              a.add(fam1, qual2, val);
+              a.add(fam2, qual3, val);
+              region.append(a, null, true);
+
+              Get g = new Get(row);
+              Result result = region.get(g, null);
+              assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length); 
+              assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length); 
+            } catch (IOException e) {
+              e.printStackTrace();
+              failures.incrementAndGet();
+              fail();
+            }
+          }
+        }
+      };
+    }
+
+    // run all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i].start();
+    }
+
+    // wait for all threads to finish
+    for (int i = 0; i < numThreads; i++) {
+      try {
+        all[i].join();
+      } catch (InterruptedException e) {
+      }
+    }
+    assertEquals(0, failures.get());
+    Get g = new Get(row);
+    Result result = region.get(g, null);
+    assertEquals(result.getValue(fam1, qual1).length, 10000);
+    assertEquals(result.getValue(fam1, qual2).length, 10000);
+    assertEquals(result.getValue(fam2, qual3).length, 10000);
+  }
   /**
    * Test multi-threaded row mutations.
    */