You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/07/24 01:46:01 UTC

svn commit: r797262 - in /hadoop/hbase/trunk: ./ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/ src/contrib/transactional/src/java/org/apa...

Author: stack
Date: Thu Jul 23 23:46:00 2009
New Revision: 797262

URL: http://svn.apache.org/viewvc?rev=797262&view=rev
Log:
HBASE-1670 transactions / indexing fixes: trx deletes not handeled, index scan can't specify stopRow

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableDescriptor.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/JtaXAResource.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexMaintenanceUtils.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
    hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java
    hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Thu Jul 23 23:46:00 2009
@@ -282,6 +282,8 @@
                client package
    HBASE-1680  FilterList writable only works for HBaseObjectWritable
                defined types (Clint Morgan via Stack and Jon Gray)
+   HBASE-1607  transactions / indexing fixes: trx deletes not handeled, index
+               scan can't specify stopRow (Clint Morgan via Stack)
 
   IMPROVEMENTS
    HBASE-1089  Add count of regions on filesystem to master UI; add percentage

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java Thu Jul 23 23:46:00 2009
@@ -30,7 +30,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
@@ -39,14 +38,12 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
 import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /** HTable extended with indexed support. */
 public class IndexedTable extends TransactionalTable {
 
-  // FIXME, these belong elsewhere
+  // TODO move these schema constants elsewhere
   public static final byte[] INDEX_COL_FAMILY_NAME = Bytes.toBytes("__INDEX__");
   public static final byte[] INDEX_COL_FAMILY = Bytes.add(
       INDEX_COL_FAMILY_NAME, new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER });
@@ -79,6 +76,7 @@
    * 
    * @param indexId the id of the index to use
    * @param indexStartRow (created from the IndexKeyGenerator)
+   * @param indexStopRow (created from the IndexKeyGenerator)
    * @param indexColumns in the index table
    * @param indexFilter filter to run on the index'ed table. This can only use
    * columns that have been added to the index.
@@ -87,7 +85,7 @@
    * @throws IOException
    * @throws IndexNotFoundException
    */
-  public ResultScanner getIndexedScanner(String indexId, final byte[] indexStartRow,
+  public ResultScanner getIndexedScanner(String indexId, final byte[] indexStartRow,  final byte[] indexStopRow,
       byte[][] indexColumns, final Filter indexFilter,
       final byte[][] baseColumns) throws IOException, IndexNotFoundException {
     IndexSpecification indexSpec = this.indexedTableDescriptor.getIndex(indexId);
@@ -114,9 +112,15 @@
       allIndexColumns[allColumns.length] = INDEX_BASE_ROW_COLUMN;
     }
 
-    Scan indexScan = new Scan(indexStartRow);
-    //indexScan.setFilter(filter); // FIXME
+    Scan indexScan = new Scan();
+    indexScan.setFilter(indexFilter);
     indexScan.addColumns(allIndexColumns);
+    if (indexStartRow != null) {
+      indexScan.setStartRow(indexStartRow);
+    }
+    if (indexStopRow != null) {
+      indexScan.setStopRow(indexStopRow);
+    }
     ResultScanner indexScanner = indexTable.getScanner(indexScan);
 
     return new ScannerWrapper(indexScanner, baseColumns);
@@ -173,8 +177,6 @@
         byte[] baseRow = row.getValue(INDEX_BASE_ROW_COLUMN);
         LOG.debug("next index row [" + Bytes.toString(row.getRow())
             + "] -> base row [" + Bytes.toString(baseRow) + "]");
-        HbaseMapWritable<byte[], Cell> colValues =
-          new HbaseMapWritable<byte[], Cell>();
         Result baseResult = null;
         if (columns != null && columns.length > 0) {
           LOG.debug("Going to base table for remaining columns");

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableDescriptor.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableDescriptor.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableDescriptor.java Thu Jul 23 23:46:00 2009
@@ -28,11 +28,8 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 
 public class IndexedTableDescriptor {

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/JtaXAResource.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/JtaXAResource.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/JtaXAResource.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/JtaXAResource.java Thu Jul 23 23:46:00 2009
@@ -65,7 +65,9 @@
     } catch (CommitUnsuccessfulException e) {
       throw new XAException(XAException.XA_RBROLLBACK);
     } catch (IOException e) {
-      throw new XAException(XAException.XA_RBPROTO); // FIXME correct code?
+      XAException xae = new XAException(XAException.XAER_RMERR);
+      xae.initCause(e);
+      throw xae;
     } finally {
       threadLocalTransactionState.remove();
     }
@@ -85,7 +87,9 @@
       try {
         transactionManager.abort(state);
       } catch (IOException e) {
-        throw new RuntimeException(e); // FIXME, should be an XAException?
+        XAException xae = new XAException(XAException.XAER_RMERR);
+        xae.initCause(e);
+        throw xae;
       }
     }
   }
@@ -108,9 +112,13 @@
     try {
       status = this.transactionManager.prepareCommit(state);
     } catch (CommitUnsuccessfulException e) {
-      throw new XAException(XAException.XA_HEURRB); // FIXME correct code?
+      XAException xae = new XAException(XAException.XA_HEURRB);
+      xae.initCause(e);
+      throw xae;
     } catch (IOException e) {
-      throw new XAException(XAException.XA_RBPROTO); // FIXME correct code?
+      XAException xae = new XAException(XAException.XAER_RMERR);
+      xae.initCause(e);
+      throw xae;
     }
 
     switch (status) {
@@ -119,7 +127,7 @@
     case TransactionalRegionInterface.COMMIT_OK_READ_ONLY:
       return XAResource.XA_RDONLY;
     default:
-      throw new XAException(XAException.XA_RBPROTO); // FIXME correct code?
+      throw new XAException(XAException.XA_RBPROTO); 
     }
   }
 

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java Thu Jul 23 23:46:00 2009
@@ -36,6 +36,10 @@
  * 
  */
 public class TransactionManager {
+  static {
+    TransactionalRPC.initialize();
+  }
+
   static final Log LOG = LogFactory.getLog(TransactionManager.class);
 
   private final HConnection connection;
@@ -123,7 +127,6 @@
       LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
           + "] was unsucsessful", e);
       // This happens on a NSRE that is triggered by a split
-      // FIXME, but then abort fails
       try {
         abort(transactionState);
       } catch (Exception abortException) {
@@ -177,7 +180,6 @@
       LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
           + "] was unsucsessful", e);
       // This happens on a NSRE that is triggered by a split
-      // FIXME, but then abort fails
       try {
         abort(transactionState);
       } catch (Exception abortException) {

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java Thu Jul 23 23:46:00 2009
@@ -41,10 +41,9 @@
  * 
  */
 public class TransactionalTable extends HTable {
-
-  private static final byte RPC_CODE = 100;
+  
   static {
-    HBaseRPC.addToMap(TransactionalRegionInterface.class, RPC_CODE);
+    TransactionalRPC.initialize();
   }
   
   /**

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexMaintenanceUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexMaintenanceUtils.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexMaintenanceUtils.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexMaintenanceUtils.java Thu Jul 23 23:46:00 2009
@@ -28,7 +28,6 @@
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
 import org.apache.hadoop.hbase.client.tableindexed.IndexedTable;
-import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java Thu Jul 23 23:46:00 2009
@@ -31,7 +31,6 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +48,6 @@
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
 import org.apache.hadoop.hbase.client.tableindexed.IndexedTableDescriptor;
-import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HLog;
 import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion;
@@ -148,24 +146,6 @@
     }
   }
   
-  private void updateIndexes(Delete delete) {
-    // FIXME
-    // Handle delete batch updates. Go back and get the next older values
-//    for (BatchOperation op : batchUpdate) {
-//      if (!op.isPut()) { 
-//        Cell current = oldColumnCells.get(op.getColumn());
-//        if (current != null) {
-//          // TODO: Fix this profligacy!!! St.Ack
-//          Cell [] older = Cell.createSingleCellArray(super.get(batchUpdate.getRow(),
-//              op.getColumn(), current.getTimestamp(), 1));
-//          if (older != null && older.length > 0) {
-//            newColumnValues.put(op.getColumn(), older[0].getValue());
-//          }
-//        }
-//      }
-//    }
-  }
-
   /** Return the columns needed for the update. */
   private NavigableSet<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) {
     NavigableSet<byte[]> neededColumns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java Thu Jul 23 23:46:00 2009
@@ -70,7 +70,7 @@
 
   public void writeDeleteToLog(HRegionInfo regionInfo, final long transactionId, final Delete delete)
       throws IOException {
-    // FIXME
+    this.append(regionInfo, delete, transactionId);
   }
 
   public void writeCommitToLog(HRegionInfo regionInfo, final long transactionId) throws IOException {
@@ -112,7 +112,7 @@
   public void append(HRegionInfo regionInfo, Put update, long transactionId)
       throws IOException {
 
-    long commitTime = System.currentTimeMillis(); // FIXME ?
+    long commitTime = System.currentTimeMillis(); 
 
     THLogKey key = new THLogKey(regionInfo.getRegionName(), regionInfo
         .getTableDesc().getName(), -1, commitTime, THLogKey.TrxOp.OP,
@@ -123,6 +123,30 @@
     }
   }
 
+  /**
+   * Write a transactional delete to the log.
+   * 
+   * @param regionInfo
+   * @param now
+   * @param update
+   * @param transactionId
+   * @throws IOException
+   */
+  public void append(HRegionInfo regionInfo, Delete delete, long transactionId)
+      throws IOException {
+
+    long commitTime = System.currentTimeMillis(); 
+
+    THLogKey key = new THLogKey(regionInfo.getRegionName(), regionInfo
+        .getTableDesc().getName(), -1, commitTime, THLogKey.TrxOp.OP,
+        transactionId);
+
+    for (KeyValue value : convertToKeyValues(delete)) {
+      super.append(regionInfo, key, value);
+    }
+  }
+
+  
   private List<KeyValue> convertToKeyValues(Put update) {
     List<KeyValue> edits = new ArrayList<KeyValue>();
 
@@ -133,4 +157,15 @@
     }
     return edits;
   }
+  
+  private List<KeyValue> convertToKeyValues(Delete delete) {
+    List<KeyValue> edits = new ArrayList<KeyValue>();
+
+    for (List<KeyValue> kvs : delete.getFamilyMap().values()) {
+      for (KeyValue kv : kvs) {
+        edits.add(kv);
+      }
+    }
+    return edits;
+  }
 }

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Thu Jul 23 23:46:00 2009
@@ -19,7 +19,9 @@
  */
 package org.apache.hadoop.hbase.regionserver.transactional;
 
+import java.io.IOException;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.NavigableSet;
@@ -30,6 +32,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
@@ -37,10 +40,14 @@
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Holds the state of a transaction.
+ * Holds the state of a transaction. This includes a buffer of all writes, a
+ * record of all reads / scans, and information about which other transactions
+ * we need to check against.
  */
 class TransactionState {
 
@@ -70,8 +77,8 @@
     protected byte[] endRow;
 
     public ScanRange(byte[] startRow, byte[] endRow) {
-      this.startRow = startRow;
-      this.endRow = endRow;
+      this.startRow = startRow == HConstants.EMPTY_START_ROW ? null : startRow;
+      this.endRow = endRow == HConstants.EMPTY_END_ROW ? null : endRow;
     }
 
     /**
@@ -104,8 +111,9 @@
   private Status status;
   private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
       Bytes.BYTES_COMPARATOR);
-  private List<Put> writeSet = new LinkedList<Put>();
-  private List<ScanRange> scanSet = new LinkedList<ScanRange>();
+  private List<Put> puts = new LinkedList<Put>();
+  private List<ScanRange> scans = new LinkedList<ScanRange>();
+  private List<Delete> deletes = new LinkedList<Delete>();
   private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
   private int startSequenceNumber;
   private Integer sequenceNumber;
@@ -128,15 +136,19 @@
   }
 
   void addWrite(final Put write) {
-    writeSet.add(write);
+    puts.add(write);
+  }
+  
+  boolean hasWrite() {
+    return puts.size() > 0 || deletes.size() > 0;
   }
 
-  List<Put> getWriteSet() {
-    return writeSet;
+  List<Put> getPuts() {
+    return puts;
   }
   
   void addDelete(final Delete delete) {
-    throw new UnsupportedOperationException("NYI");
+    deletes.add(delete);
   }
 
   /**
@@ -149,9 +161,11 @@
    */
   Result localGet(Get get) {
     
+    // TODO take deletes into account as well
+    
     List<KeyValue> localKVs = new LinkedList<KeyValue>();
     
-    for (Put put : writeSet) {
+    for (Put put : puts) {
       if (!Bytes.equals(get.getRow(), put.getRow())) {
         continue;
       }
@@ -203,7 +217,7 @@
       return false; // Cannot conflict with aborted transactions
     }
 
-    for (Put otherUpdate : checkAgainst.getWriteSet()) {
+    for (Put otherUpdate : checkAgainst.getPuts()) {
       if (this.getReadSet().contains(otherUpdate.getRow())) {
         LOG.debug("Transaction [" + this.toString()
             + "] has read which conflicts with [" + checkAgainst.toString()
@@ -211,7 +225,7 @@
             + Bytes.toString(otherUpdate.getRow()) + "]");
         return true;
       }
-      for (ScanRange scanRange : this.scanSet) {
+      for (ScanRange scanRange : this.scans) {
         if (scanRange.contains(otherUpdate.getRow())) {
           LOG.debug("Transaction [" + this.toString()
               + "] has scan which conflicts with [" + checkAgainst.toString()
@@ -289,9 +303,9 @@
     result.append(" read Size: ");
     result.append(readSet.size());
     result.append(" scan Size: ");
-    result.append(scanSet.size());
+    result.append(scans.size());
     result.append(" write Size: ");
-    result.append(writeSet.size());
+    result.append(puts.size());
     result.append(" startSQ: ");
     result.append(startSequenceNumber);
     if (sequenceNumber != null) {
@@ -328,7 +342,7 @@
         scanRange.startRow == null ? "null" : Bytes
             .toString(scanRange.startRow), scanRange.endRow == null ? "null"
             : Bytes.toString(scanRange.endRow)));
-    scanSet.add(scanRange);
+    scans.add(scanRange);
   }
   
   int getCommitPendingWaits() {
@@ -338,4 +352,106 @@
   void incrementCommitPendingWaits() {
     this.commitPendingWaits++;
   }
+
+  /** Get deleteSet.
+   * @return deleteSet
+   */
+   List<Delete> getDeleteSet() {
+    return deletes;
+  }
+
+  /** Set deleteSet.
+   * @param deleteSet the deleteSet to set
+   */
+   void setDeleteSet(List<Delete> deleteSet) {
+    this.deletes = deleteSet;
+  }
+
+   /** Get a scanner to go through the puts from this transaction. Used to weave together the local trx puts with the global state.
+    * 
+    * @return scanner
+    */
+   KeyValueScanner getScanner() {
+     return new PutScanner();
+   }
+   
+   /** Scanner of the puts that occur during this transaction.
+    * 
+    * @author clint.morgan
+    *
+    */
+   private class PutScanner implements KeyValueScanner, InternalScanner {
+
+     private NavigableSet<KeyValue> kvSet;
+     private Iterator<KeyValue> iterator;
+     private boolean didHasNext = false;
+     private KeyValue next = null;
+     
+     
+     PutScanner() {
+       kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+       for (Put put : puts) {
+         for (List<KeyValue> putKVs : put.getFamilyMap().values()) {
+           kvSet.addAll(putKVs);
+         }
+       }
+       iterator = kvSet.iterator();
+     }
+     
+    public void close() {
+      // Nothing to close
+    }
+
+    public KeyValue next() {
+      getNext();
+      didHasNext = false;
+      return next;
+    }
+
+    public KeyValue peek() {
+      getNext();
+      return next;
+    }
+
+    public boolean seek(KeyValue key) {
+      iterator = kvSet.headSet(key).iterator();
+
+      getNext();
+      return next != null;
+    }
+     
+    private KeyValue getNext() {
+      if (didHasNext) {
+        return next;
+      }
+      didHasNext = true;
+      if (iterator.hasNext()) {
+      next = iterator.next(); }
+      else {
+        next= null;
+      }
+      return next;
+    }
+
+    public boolean next(List<KeyValue> results) throws IOException {
+        KeyValue peek = this.peek();
+        if (peek == null) {
+          return false;
+        }
+        byte [] row = peek.getRow();
+        results.add(peek);
+        while (true){
+          if (this.peek() == null) {
+            break;
+          }
+          if (!Bytes.equals(row, this.peek().getRow())) {
+            break;
+          }
+          results.add(this.next());
+        }
+        return true;
+        
+    }
+    
+   }
 }

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java Thu Jul 23 23:46:00 2009
@@ -57,6 +57,7 @@
 import org.apache.hadoop.hbase.regionserver.HLog;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
 import org.apache.hadoop.util.Progressable;
@@ -201,13 +202,12 @@
     TransactionState state = new TransactionState(transactionId, super.getLog()
         .getSequenceNumber(), super.getRegionInfo());
 
-    // Order is important here ...
-    List<TransactionState> commitPendingCopy = new LinkedList<TransactionState>(
+    state.setStartSequenceNumber(nextSequenceId.get());
+    List<TransactionState> commitPendingCopy = new ArrayList<TransactionState>(
         commitPendingTransactions);
     for (TransactionState commitPending : commitPendingCopy) {
       state.addTransactionToCheck(commitPending);
     }
-    state.setStartSequenceNumber(nextSequenceId.get());
 
     synchronized (transactionsById) {
       transactionsById.put(key, state);
@@ -271,7 +271,9 @@
 
     TransactionState state = getTransactionState(transactionId);
     state.addScan(scan);
-    return new ScannerWrapper(transactionId, super.getScanner(scan));
+    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
+    scanners.add(state.getScanner());
+    return super.getScanner(scan, scanners);
   }
 
   /**
@@ -310,7 +312,6 @@
 
   /**
    * Add a delete to the transaction. Does not get applied until commit process.
-   * FIXME, not sure about this approach
    * 
    * @param transactionId
    * @param delete
@@ -350,7 +351,7 @@
           + ". Voting for commit");
 
       // If there are writes we must keep record off the transaction
-      if (state.getWriteSet().size() > 0) {
+      if (state.hasWrite()) {
         // Order is important
         state.setStatus(Status.COMMIT_PENDING);
         commitPendingTransactions.add(state);
@@ -403,20 +404,19 @@
    * @throws IOException
    */
   public void commit(final long transactionId) throws IOException {
-    // Not checking closing...
     TransactionState state;
     try {
       state = getTransactionState(transactionId);
     } catch (UnknownTransactionException e) {
       LOG.fatal("Asked to commit unknown transaction: " + transactionId
           + " in region " + super.getRegionInfo().getRegionNameAsString());
-      // FIXME Write to the transaction log that this transaction was corrupted
+      // TODO. Anything to handle here?
       throw e;
     }
 
     if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
       LOG.fatal("Asked to commit a non pending transaction");
-      // FIXME Write to the transaction log that this transaction was corrupted
+      // TODO. Anything to handle here?
       throw new IOException("commit failure");
     }
 
@@ -461,17 +461,21 @@
 
     this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId());
 
-    for (Put update : state.getWriteSet()) {
+    for (Put update : state.getPuts()) {
       this.put(update, false); // Don't need to WAL these
-      // FIME, maybe should be walled so we don't need to look so far back.
+    }
+    
+    for (Delete delete : state.getDeleteSet()) {
+      this.delete(delete, null, false);
     }
 
     state.setStatus(Status.COMMITED);
-    if (state.getWriteSet().size() > 0
+    if (state.hasWrite()
         && !commitPendingTransactions.remove(state)) {
       LOG
           .fatal("Commiting a non-query transaction that is not in commitPendingTransactions");
-      throw new IOException("commit failure"); // FIXME, how to handle?
+      // Something has gone really wrong.
+      throw new IOException("commit failure"); 
     }
     retireTransaction(state);
   }
@@ -480,11 +484,11 @@
   public List<StoreFile> close(boolean abort) throws IOException {
     prepareToClose();
     if (!commitPendingTransactions.isEmpty()) {
-      // FIXME, better way to handle?
       LOG.warn("Closing transactional region ["
           + getRegionInfo().getRegionNameAsString() + "], but still have ["
           + commitPendingTransactions.size()
-          + "] transactions  that are pending commit");
+          + "] transactions  that are pending commit.");
+      // TODO resolve from the Global Trx Log.
     }
     return super.close(abort);
   }
@@ -495,7 +499,7 @@
   }
 
   boolean closing = false;
-
+  private static final int CLOSE_WAIT_ON_COMMIT_PENDING = 1000;
   /**
    * Get ready to close.
    * 
@@ -511,10 +515,10 @@
           + commitPendingTransactions.size()
           + "] transactions that are pending commit. Sleeping");
       for (TransactionState s : commitPendingTransactions) {
-        LOG.info(s.toString());
+        LOG.info("commit pending: " + s.toString());
       }
       try {
-        Thread.sleep(200);
+        Thread.sleep(CLOSE_WAIT_ON_COMMIT_PENDING);
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }
@@ -700,33 +704,4 @@
       }
     }
   }
-
-  /** Wrapper which keeps track of rows returned by scanner. */
-  private class ScannerWrapper implements InternalScanner {
-    private long transactionId;
-    private InternalScanner scanner;
-
-    /**
-     * @param transactionId
-     * @param scanner
-     * @throws UnknownTransactionException
-     */
-    public ScannerWrapper(final long transactionId,
-        final InternalScanner scanner) throws UnknownTransactionException {
-
-      this.transactionId = transactionId;
-      this.scanner = scanner;
-    }
-
-    public void close() throws IOException {
-      scanner.close();
-    }
-
-    public boolean next(List<KeyValue> results) throws IOException {
-      boolean result = scanner.next(results);
-      TransactionState state = getTransactionState(transactionId);
-      // FIXME need to weave in new stuff from this transaction too.
-      return result;
-    }
-  }
 }

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java Thu Jul 23 23:46:00 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.transactional.TransactionalRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
 import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
 import org.apache.hadoop.hbase.regionserver.HLog;
@@ -53,6 +54,11 @@
  */
 public class TransactionalRegionServer extends HRegionServer implements
     TransactionalRegionInterface {
+  
+  static {
+    TransactionalRPC.initialize();
+  }
+
   private static final String LEASE_TIME = "hbase.transaction.leasetime";
   private static final int DEFAULT_LEASE_TIME = 60 * 1000;
   private static final int LEASE_CHECK_FREQUENCY = 1000;

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java Thu Jul 23 23:46:00 2009
@@ -73,8 +73,7 @@
 
     IndexedTableDescriptor indexDesc = new IndexedTableDescriptor(desc);
     // Create a new index that does lexicographic ordering on COL_A
-    IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A,
-        COL_A);
+    IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A, COL_A);
     indexDesc.addIndex(colAIndex);
 
     admin = new IndexedTableAdmin(conf);
@@ -98,10 +97,11 @@
     writeInitalRows();
     assertRowsInOrder(NUM_ROWS);
   }
-  
-  private void assertRowsInOrder(int numRowsExpected) throws IndexNotFoundException, IOException {
-    ResultScanner scanner = table.getIndexedScanner(INDEX_COL_A,
-        HConstants.EMPTY_START_ROW, null, null, null);
+
+  private void assertRowsInOrder(int numRowsExpected)
+      throws IndexNotFoundException, IOException {
+    ResultScanner scanner = table.getIndexedScanner(INDEX_COL_A, null, null,
+        null, null, null);
     int numRows = 0;
     byte[] lastColA = null;
     for (Result rowResult : scanner) {
@@ -115,7 +115,7 @@
       numRows++;
     }
     scanner.close();
-    Assert.assertEquals(numRowsExpected, numRows);  
+    Assert.assertEquals(numRowsExpected, numRows);
   }
 
   public void testMultipleWrites() throws IOException {

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java Thu Jul 23 23:46:00 2009
@@ -21,6 +21,8 @@
 
 import java.io.IOException;
 
+import junit.framework.Assert;
+
 import org.apache.hadoop.hbase.HBaseClusterTestCase;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -29,6 +31,8 @@
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
 import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -95,7 +99,7 @@
     transactionManager.tryCommit(transactionState2);
   }
 
-  public void TestTwoTransactionsWithConflict() throws IOException,
+  public void testTwoTransactionsWithConflict() throws IOException,
       CommitUnsuccessfulException {
     TransactionState transactionState1 = makeTransaction1();
     TransactionState transactionState2 = makeTransaction2();
@@ -110,14 +114,73 @@
     }
   }
 
+  public void testGetAfterPut() throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    int originalValue = Bytes.toInt(table.get(transactionState,
+        new Get(ROW1).addColumn(COL_A)).value());
+    int newValue = originalValue + 1;
+
+    table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
+        .toBytes(newValue)));
+
+    Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A));
+    Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
+  }
+
+  public void testScanAfterUpdatePut() throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    int originalValue = Bytes.toInt(table.get(transactionState,
+        new Get(ROW1).addColumn(COL_A)).value());
+    int newValue = originalValue + 1;
+    table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
+        .toBytes(newValue)));
+
+    ResultScanner scanner = table.getScanner(transactionState, new Scan()
+        .addFamily(FAMILY));
+
+    Result result = scanner.next();
+    Assert.assertNotNull(result);
+
+    Assert.assertEquals(Bytes.toString(ROW1), Bytes.toString(result.getRow()));
+    Assert.assertEquals(newValue, Bytes.toInt(result.value()));
+
+    result = scanner.next();
+    Assert.assertNull(result);
+
+  }
+
+  public void testScanAfterNewPut() throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    int row2Value = 199;
+    table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
+        .toBytes(row2Value)));
+
+    ResultScanner scanner = table.getScanner(transactionState, new Scan()
+        .addFamily(FAMILY));
+
+    Result result = scanner.next();
+    Assert.assertNotNull(result);
+    Assert.assertEquals(Bytes.toString(ROW1), Bytes.toString(result.getRow()));
+
+    result = scanner.next();
+    Assert.assertNotNull(result);
+    Assert.assertEquals(Bytes.toString(ROW2), Bytes.toString(result.getRow()));
+    Assert.assertEquals(row2Value, Bytes.toInt(result.value()));
+  }
+
   // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
   private TransactionState makeTransaction1() throws IOException {
     TransactionState transactionState = transactionManager.beginTransaction();
 
     Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A));
 
-    table.put(new Put(ROW2).add(FAMILY, QUAL_A, row1_A.getValue(COL_A)));
-    table.put(new Put(ROW3).add(FAMILY, QUAL_A, row1_A.getValue(COL_A)));
+    table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, row1_A
+        .getValue(COL_A)));
+    table.put(transactionState, new Put(ROW3).add(FAMILY, QUAL_A, row1_A
+        .getValue(COL_A)));
 
     return transactionState;
   }
@@ -130,7 +193,8 @@
 
     int value = Bytes.toInt(row1_A.getValue(COL_A));
 
-    table.put(new Put(ROW1).add(FAMILY, QUAL_A, Bytes.toBytes(value + 1)));
+    table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
+        .toBytes(value + 1)));
 
     return transactionState;
   }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java Thu Jul 23 23:46:00 2009
@@ -620,6 +620,9 @@
   //---------------------------------------------------------------------------
   
   public String toString() {
+    if (this.bytes == null || this.bytes.length == 0) {
+      return "empty";
+    }
     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
       "/vlen=" + getValueLength();
   }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=797262&r1=797261&r2=797262&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Jul 23 23:46:00 2009
@@ -1063,6 +1063,10 @@
    */
   public InternalScanner getScanner(Scan scan)
   throws IOException {
+   return getScanner(scan, null);
+  }
+  
+  protected InternalScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
     newScannerLock.readLock().lock();
     try {
       if (this.closed.get()) {
@@ -1078,7 +1082,7 @@
           scan.addFamily(family);
         }
       }
-      return new RegionScanner(scan);
+      return new RegionScanner(scan, additionalScanners);
       
     } finally {
       newScannerLock.readLock().unlock();
@@ -1677,8 +1681,8 @@
   class RegionScanner implements InternalScanner {
     private final KeyValueHeap storeHeap;
     private final byte [] stopRow;
-    
-    RegionScanner(Scan scan) {
+
+    RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
         this.stopRow = null;
       } else {
@@ -1686,6 +1690,9 @@
       }
       
       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+      if (additionalScanners != null) {
+        scanners.addAll(additionalScanners);
+      }
       for (Map.Entry<byte[], NavigableSet<byte[]>> entry : 
           scan.getFamilyMap().entrySet()) {
         Store store = stores.get(entry.getKey());
@@ -1694,6 +1701,10 @@
       this.storeHeap = 
         new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
     }
+    
+    RegionScanner(Scan scan) {
+      this(scan, null);
+    }
 
     /**
      * Get the next row of results from this region.