You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/04/08 01:41:58 UTC

svn commit: r931723 - in /hadoop/hbase/branches/0.20: ./ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/ src/contrib/transactional/src...

Author: jdcryans
Date: Wed Apr  7 23:41:58 2010
New Revision: 931723

URL: http://svn.apache.org/viewvc?rev=931723&view=rev
Log:
HBASE-2286  [Transactional Contrib] Correctly handle or avoid cases where 
            writes occur in same millisecond (Clint Morgan via J-D)

Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
    hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
    hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
    hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Wed Apr  7 23:41:58 2010
@@ -114,6 +114,8 @@ Release 0.20.4 - Unreleased
    HBASE-2252  Mapping a very big table kills region servers
    HBASE-2411  Findbugs target
    HBASE-2412  [stargate] PerformanceEvaluation
+   HBASE-2286  [Transactional Contrib] Correctly handle or avoid cases where 
+               writes occur in same millisecond (Clint Morgan via J-D)
 
   NEW FEATURES
    HBASE-2257  [stargate] multiuser mode

Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java Wed Apr  7 23:41:58 2010
@@ -175,6 +175,9 @@ public class IndexedTable extends Transa
         Result row = indexResult[i];
         
         byte[] baseRow = row.getValue(INDEX_BASE_ROW_COLUMN);
+        if (baseRow == null) {
+          throw new IllegalStateException("Missing base row for indexed row: ["+Bytes.toString(row.getRow())+"]");
+        }
         LOG.debug("next index row [" + Bytes.toString(row.getRow())
             + "] -> base row [" + Bytes.toString(baseRow) + "]");
         Result baseResult = null;
@@ -195,7 +198,10 @@ public class IndexedTable extends Transa
         }
         
         if (baseResult != null) {
-          results.addAll(baseResult.list());
+          List<KeyValue> list = baseResult.list();
+          if (list != null) {
+            results.addAll(list);
+          }
         }
         
         result[i] = new Result(results);

Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java Wed Apr  7 23:41:58 2010
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Leases;
 import org.apache.hadoop.hbase.client.Delete;
@@ -141,10 +142,32 @@ class IndexedRegion extends Transactiona
     SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldResult);
     
     for (IndexSpecification indexSpec : indexesToUpdate) {
-      removeOldIndexEntry(indexSpec, put.getRow(), oldColumnValues);
-      updateIndex(indexSpec, put.getRow(), newColumnValues);
+      updateIndex(indexSpec, put, newColumnValues, oldColumnValues);
     }
   }
+
+  // FIXME: This call takes place in an RPC, and requires an RPC. This makes for
+  // a likely deadlock if the number of RPCs we are trying to serve is >= the
+  // number of handler threads.
+  private void updateIndex(IndexSpecification indexSpec, Put put,
+      NavigableMap<byte[], byte[]> newColumnValues,
+      SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
+    Delete indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, put.getRow(), oldColumnValues);
+    Put indexPut = makeIndexUpdate(indexSpec, put.getRow(), newColumnValues);
+    
+    HTable indexTable = getIndexTable(indexSpec);
+    if (indexDelete != null && !Bytes.equals(indexDelete.getRow(), indexPut.getRow())) {
+      // Only do the delete if the row changed. This way we save the put after delete issues in HBASE-2256
+      LOG.debug("Deleting old index row ["+Bytes.toString(indexDelete.getRow())+"]. New row is ["+Bytes.toString(indexPut.getRow())+"].");
+      indexTable.delete(indexDelete);
+    } else if (indexDelete != null){
+      LOG.debug("Skipping deleting index row ["+Bytes.toString(indexDelete.getRow())+"] because it has not changed.");
+    }
+    indexTable.put(indexPut);
+  }
+  
+ 
+  
   
   /** Return the columns needed for the update. */
   private NavigableSet<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) {
@@ -157,7 +180,7 @@ class IndexedRegion extends Transactiona
     return neededColumns;
   }
 
-  private void removeOldIndexEntry(IndexSpecification indexSpec, byte[] row,
+  private Delete makeDeleteToRemoveOldIndexEntry(IndexSpecification indexSpec, byte[] row,
       SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
     for (byte[] indexedCol : indexSpec.getIndexedColumns()) {
       if (!oldColumnValues.containsKey(indexedCol)) {
@@ -165,7 +188,7 @@ class IndexedRegion extends Transactiona
             + "] not trying to remove old entry for row ["
             + Bytes.toString(row) + "] because col ["
             + Bytes.toString(indexedCol) + "] is missing");
-        return;
+        return null;
       }
     }
 
@@ -173,7 +196,7 @@ class IndexedRegion extends Transactiona
         oldColumnValues);
     LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry ["
         + Bytes.toString(oldIndexRow) + "]");
-    getIndexTable(indexSpec).delete(new Delete(oldIndexRow));
+    return new Delete(oldIndexRow);
   }
   
   private NavigableMap<byte[], byte[]> getColumnsFromPut(Put put) {
@@ -204,23 +227,23 @@ class IndexedRegion extends Transactiona
     return false;
   }
 
-  // FIXME: This call takes place in an RPC, and requires an RPC. This makes for
-  // a likely deadlock if the number of RPCs we are trying to serve is >= the
-  // number of handler threads.
-  private void updateIndex(IndexSpecification indexSpec, byte[] row,
+  private Put makeIndexUpdate(IndexSpecification indexSpec, byte[] row,
       SortedMap<byte[], byte[]> columnValues) throws IOException {
     Put indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, row, columnValues);
-    getIndexTable(indexSpec).put(indexUpdate); 
     LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["
         + Bytes.toString(indexUpdate.getRow()) + "] for row ["
         + Bytes.toString(row) + "]");
 
+    return indexUpdate; 
+
   }
 
+  // FIXME we can be smarter about this and avoid the base gets and index maintenance in many cases.
   @Override
   public void delete(Delete delete, final Integer lockid, boolean writeToWAL)
       throws IOException {
-    // First remove the existing indexes.
+    // First look at the current (to be the old) state.
+    SortedMap<byte[], byte[]> oldColumnValues = null;
     if (!getIndexes().isEmpty()) {
       // Need all columns
       NavigableSet<byte[]> neededColumns = getColumnsForIndexes(getIndexes());
@@ -231,12 +254,7 @@ class IndexedRegion extends Transactiona
       }
       
       Result oldRow = super.get(get, lockid);
-      SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldRow);
-      
-      
-      for (IndexSpecification indexSpec : getIndexes()) {
-        removeOldIndexEntry(indexSpec, delete.getRow(), oldColumnValues);
-      }
+      oldColumnValues = convertToValueMap(oldRow);
     }
     
     super.delete(delete, lockid, writeToWAL);
@@ -246,17 +264,64 @@ class IndexedRegion extends Transactiona
       
       // Rebuild index if there is still a version visible.
       Result currentRow = super.get(get, lockid);
-      if (!currentRow.isEmpty()) {
         SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentRow);
-        for (IndexSpecification indexSpec : getIndexes()) {
-          if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, currentColumnValues)) {
-            updateIndex(indexSpec, delete.getRow(), currentColumnValues);
+
+      for (IndexSpecification indexSpec : getIndexes()) {
+        Delete indexDelete = null;
+        if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, oldColumnValues)) {
+          indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, delete
+              .getRow(), oldColumnValues);
+        }
+        Put indexPut = null;
+        if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec,
+            currentColumnValues)) {
+          indexPut = makeIndexUpdate(indexSpec, delete.getRow(),
+              currentColumnValues);
+        }
+        if (indexPut == null && indexDelete == null) {
+          continue;
+        }
+
+        HTable indexTable = getIndexTable(indexSpec);
+        if (indexDelete != null
+            && (indexPut == null || !Bytes.equals(indexDelete.getRow(),
+                indexPut.getRow()))) {
+          // Only do the delete if the row changed. This way we save the put
+          // after delete issues in HBASE-2256
+          LOG.debug("Deleting old index row ["
+              + Bytes.toString(indexDelete.getRow()) + "].");
+          indexTable.delete(indexDelete);
+        } else if (indexDelete != null) {
+          LOG.debug("Skipping deleting index row ["
+              + Bytes.toString(indexDelete.getRow())
+              + "] because it has not changed.");
+          
+          for (byte [] indexCol : indexSpec.getAdditionalColumns()) {
+              byte[][] parsed = HStoreKey.parseColumn(indexCol);
+              List<KeyValue> famDeletes = delete.getFamilyMap().get(parsed[0]);
+              if (famDeletes != null) {
+                for (KeyValue kv : famDeletes) {
+                  if (Bytes.equals(indexCol, kv.getColumn())) {
+                    LOG.debug("Need to delete this specific column: "+Bytes.toString(kv.getColumn()));
+                    Delete columnDelete = new Delete(indexDelete.getRow());
+                    columnDelete.deleteColumns(indexCol);
+                    indexTable.delete(columnDelete);
+                  }
+                }
+                
+              }
           }
         }
+
+        if (indexPut != null) {
+          getIndexTable(indexSpec).put(indexPut);
+        }
       }
+
     }
-   
   }
+   
+  
 
   private SortedMap<byte[], byte[]> convertToValueMap(Result result) {
     SortedMap<byte[], byte[]> currentColumnValues = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);

Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Wed Apr  7 23:41:58 2010
@@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -138,17 +140,25 @@ class TransactionState {
   }
 
   void addWrite(final Put write) {
+    updateLatestTimestamp(write.getFamilyMap().values());
+    puts.add(write);
+  }
+  
+  
+  // FIXME REVIEW not sure about this. Needed for log recovery? but broke other tests.
+  private void updateLatestTimestamp(Collection<List<KeyValue>> kvsCollection) {
     byte [] now = Bytes.toBytes(System.currentTimeMillis());
-      // HAVE to manually set the KV timestamps
-      for (List<KeyValue> kvs : write.getFamilyMap().values()) {
-          for (KeyValue kv : kvs) {
+    // HAVE to manually set the KV timestamps
+    for (List<KeyValue> kvs : kvsCollection) {
+        for (KeyValue kv : kvs) {
+          if (kv.isLatestTimestamp()) {
             kv.updateLatestStamp(now);
           }
-      }
-
-    puts.add(write);
+        }
+    }
   }
   
+  
   boolean hasWrite() {
     return puts.size() > 0 || deletes.size() > 0;
   }
@@ -158,6 +168,7 @@ class TransactionState {
   }
   
   void addDelete(final Delete delete) {
+    //updateLatestTimestamp(delete.getFamilyMap().values());
     deletes.add(delete);
   }
 
@@ -371,13 +382,6 @@ class TransactionState {
     return deletes;
   }
 
-  /** Set deleteSet.
-   * @param deleteSet the deleteSet to set
-   */
-   void setDeleteSet(List<Delete> deleteSet) {
-    this.deletes = deleteSet;
-  }
-
    /** Get a scanner to go through the puts from this transaction. Used to weave together the local trx puts with the global state.
     * 
     * @return scanner
@@ -393,20 +397,56 @@ class TransactionState {
     */
    private class PutScanner implements KeyValueScanner, InternalScanner {
 
-     private NavigableSet<KeyValue> kvSet;
+     private List<KeyValue> kvList;
      private Iterator<KeyValue> iterator;
      private boolean didHasNext = false;
      private KeyValue next = null;
      
      
      PutScanner() {
-       kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+       kvList = new ArrayList<KeyValue>();
+         
        for (Put put : puts) {
          for (List<KeyValue> putKVs : put.getFamilyMap().values()) {
-           kvSet.addAll(putKVs);
+           kvList.addAll(putKVs);
+         }
+       }
+       
+       Collections.sort(kvList, new Comparator<KeyValue>() {
+         
+        /** We want to honor the order of the puts in the case where multiple have the same timestamp.
+         * 
+         * @param o1
+         * @param o2
+         * @return
+         */
+        public int compare(KeyValue o1, KeyValue o2) {
+          int result = KeyValue.COMPARATOR.compare(o1, o2);
+          if (result != 0) {
+            return result;
+          }
+          if (o1 == o2) {
+            return 0;
+          }
+          int put1Number = getPutNumber(o1);
+          int put2Number = getPutNumber(o2);
+          return put2Number - put1Number;
+        }
+      });
+       
+       iterator = kvList.iterator();
+     }
+     
+     private int getPutNumber(KeyValue kv) {
+       for (int i=0; i < puts.size(); i++) {
+         for (List<KeyValue> putKVs : puts.get(i).getFamilyMap().values()) {
+           for (KeyValue putKV : putKVs)
+           if (putKV == kv) {
+             return i;
+           }
          }
        }
-       iterator = kvSet.iterator();
+       throw new IllegalStateException("Can not fine put KV in puts");
      }
      
     public void close() {
@@ -424,8 +464,18 @@ class TransactionState {
       return next;
     }
 
+    private void iteratorFrom(KeyValue key) {
+      iterator = kvList.iterator();
+      while (iterator.hasNext()) {
+        KeyValue next = iterator.next();
+        if (KeyValue.COMPARATOR.compare(next, key) >= 0) {
+          break;
+        }
+      }
+    }
+    
     public boolean seek(KeyValue key) {
-      iterator = kvSet.headSet(key).iterator();
+      iteratorFrom(key);
 
       getNext();
       return next != null;

Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java Wed Apr  7 23:41:58 2010
@@ -128,6 +128,25 @@ public class TestTransactions extends HB
     Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
   }
 
+  public void testGetAfterPutPut() throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    int originalValue = Bytes.toInt(table.get(transactionState,
+        new Get(ROW1).addColumn(COL_A)).value());
+    int newValue = originalValue + 1;
+
+    table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
+        .toBytes(newValue)));
+    
+    newValue = newValue + 1;
+    
+    table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
+        .toBytes(newValue)));
+
+    Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A));
+    Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
+  }
+  
   public void testScanAfterUpdatePut() throws IOException {
     TransactionState transactionState = transactionManager.beginTransaction();
 
@@ -174,14 +193,10 @@ public class TestTransactions extends HB
   public void testPutPutScan() throws IOException {
     TransactionState transactionState = transactionManager.beginTransaction();
 
+
     int row2Value = 199;
     table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
         .toBytes(row2Value)));
-    try {
-      Thread.sleep(500);
-    } catch (InterruptedException ex) {
-      // just ignore
-    }
     
     row2Value = 299;
     table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
@@ -198,8 +213,18 @@ public class TestTransactions extends HB
     Assert.assertNotNull(result);
     Assert.assertEquals(Bytes.toString(ROW2), Bytes.toString(result.getRow()));
     Assert.assertEquals(row2Value, Bytes.toInt(result.value()));
+    
+    // TODO commit and verifty that we see second put.
+  }
+  
+  public void testPutPutScanOverAndOver() throws IOException {
+    // Do this test many times to try and hit two puts in the same millisecond
+    for (int i=0 ; i < 100; i++) {
+      testPutPutScan();
+    }
   }
 
+
   // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
   private TransactionState makeTransaction1() throws IOException {
     TransactionState transactionState = transactionManager.beginTransaction();