You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/01/17 20:38:13 UTC

svn commit: r1232551 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/replication/regionserver/ test/java/org/apache/hadoop/hbase/client/ test/java/org/ap...

Author: larsh
Date: Tue Jan 17 19:38:13 2012
New Revision: 1232551

URL: http://svn.apache.org/viewvc?rev=1232551&view=rev
Log:
HBASE-5203 Group atomic put/delete operation into a single WALEdit to handle region server failures. (Lars H)

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java Tue Jan 17 19:38:13 2012
@@ -138,6 +138,35 @@ public class Delete extends Mutation
   }
 
   /**
+   * Advanced use only.
+   * Add an existing delete marker to this Delete object.
+   * @param kv An existing KeyValue of type "delete".
+   * @return this for invocation chaining
+   * @throws IOException
+   */
+  public Delete addDeleteMarker(KeyValue kv) throws IOException {
+    if (!kv.isDelete()) {
+      throw new IOException("The recently added KeyValue is not of type "
+          + "delete. Rowkey: " + Bytes.toStringBinary(this.row));
+    }
+    if (Bytes.compareTo(this.row, 0, row.length, kv.getBuffer(),
+        kv.getRowOffset(), kv.getRowLength()) != 0) {
+      throw new IOException("The row in the recently added KeyValue "
+          + Bytes.toStringBinary(kv.getBuffer(), kv.getRowOffset(),
+              kv.getRowLength()) + " doesn't match the original one "
+          + Bytes.toStringBinary(this.row));
+    }
+    byte [] family = kv.getFamily();
+    List<KeyValue> list = familyMap.get(family);
+    if (list == null) {
+      list = new ArrayList<KeyValue>();
+    }
+    list.add(kv);
+    familyMap.put(family, list);
+    return this;
+  }
+
+  /**
    * Delete all versions of all columns of the specified family.
    * <p>
    * Overrides previous calls to deleteColumn and deleteColumns for the

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Jan 17 19:38:13 2012
@@ -1686,7 +1686,7 @@ public class HRegion implements HeapSize
       try {
         // All edits for the given row (across all column families) must happen atomically.
         prepareDelete(delete);
-        internalDelete(delete, delete.getClusterId(), writeToWAL, null, null);
+        internalDelete(delete, delete.getClusterId(), writeToWAL);
       } finally {
         if(lockid == null) releaseRowLock(lid);
       }
@@ -1707,26 +1707,77 @@ public class HRegion implements HeapSize
     delete.setFamilyMap(familyMap);
     delete.setClusterId(clusterId);
     delete.setWriteToWAL(writeToWAL);
-    internalDelete(delete, clusterId, writeToWAL, null, null);
+    internalDelete(delete, clusterId, writeToWAL);
+  }
+
+  /**
+   * Setup a Delete object with correct timestamps.
+   * Caller should the row and region locks.
+   * @param delete
+   * @param now
+   * @throws IOException
+   */
+  private void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
+      throws IOException {
+    Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
+    for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
+
+      byte[] family = e.getKey();
+      List<KeyValue> kvs = e.getValue();
+      Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+
+      for (KeyValue kv: kvs) {
+        //  Check if time is LATEST, change to time of most recent addition if so
+        //  This is expensive.
+        if (kv.isLatestTimestamp() && kv.isDeleteType()) {
+          byte[] qual = kv.getQualifier();
+          if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
+
+          Integer count = kvCount.get(qual);
+          if (count == null) {
+            kvCount.put(qual, 1);
+          } else {
+            kvCount.put(qual, count + 1);
+          }
+          count = kvCount.get(qual);
+
+          Get get = new Get(kv.getRow());
+          get.setMaxVersions(count);
+          get.addColumn(family, qual);
+
+          List<KeyValue> result = get(get, false);
+
+          if (result.size() < count) {
+            // Nothing to delete
+            kv.updateLatestStamp(byteNow);
+            continue;
+          }
+          if (result.size() > count) {
+            throw new RuntimeException("Unexpected size: " + result.size());
+          }
+          KeyValue getkv = result.get(count - 1);
+          Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
+              getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+        } else {
+          kv.updateLatestStamp(byteNow);
+        }
+      }
+    }
   }
 
   /**
    * @param delete The Delete command
-   * @param familyMap map of family to edits for the given family.
+   * @param clusterId UUID of the originating cluster (for replication).
    * @param writeToWAL
-   * @param writeEntry Optional mvcc write point to use
-   * @param walEdit Optional walEdit to use. A non-null walEdit indicates
-   * that the coprocessor hooks are run by the caller
    * @throws IOException
    */
   private void internalDelete(Delete delete, UUID clusterId,
-      boolean writeToWAL, MultiVersionConsistencyControl.WriteEntry writeEntry,
-      WALEdit walEdit) throws IOException {
+      boolean writeToWAL) throws IOException {
     Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
-    WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
+    WALEdit walEdit = new WALEdit();
     /* Run coprocessor pre hook outside of locks to avoid deadlock */
-    if (coprocessorHost != null && walEdit == null) {
-      if (coprocessorHost.preDelete(delete, localWalEdit, writeToWAL)) {
+    if (coprocessorHost != null) {
+      if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) {
         return;
       }
     }
@@ -1737,49 +1788,7 @@ public class HRegion implements HeapSize
 
     updatesLock.readLock().lock();
     try {
-      for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
-
-        byte[] family = e.getKey();
-        List<KeyValue> kvs = e.getValue();
-        Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-
-        for (KeyValue kv: kvs) {
-          //  Check if time is LATEST, change to time of most recent addition if so
-          //  This is expensive.
-          if (kv.isLatestTimestamp() && kv.isDeleteType()) {
-            byte[] qual = kv.getQualifier();
-            if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
-
-            Integer count = kvCount.get(qual);
-            if (count == null) {
-              kvCount.put(qual, 1);
-            } else {
-              kvCount.put(qual, count + 1);
-            }
-            count = kvCount.get(qual);
-
-            Get get = new Get(kv.getRow());
-            get.setMaxVersions(count);
-            get.addColumn(family, qual);
-
-            List<KeyValue> result = get(get, false);
-
-            if (result.size() < count) {
-              // Nothing to delete
-              kv.updateLatestStamp(byteNow);
-              continue;
-            }
-            if (result.size() > count) {
-              throw new RuntimeException("Unexpected size: " + result.size());
-            }
-            KeyValue getkv = result.get(count - 1);
-            Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
-                getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
-          } else {
-            kv.updateLatestStamp(byteNow);
-          }
-        }
-      }
+      prepareDeleteTimestamps(delete, byteNow);
 
       if (writeToWAL) {
         // write/sync to WAL should happen before we touch memstore.
@@ -1790,21 +1799,21 @@ public class HRegion implements HeapSize
         //
         // bunch up all edits across all column families into a
         // single WALEdit.
-        addFamilyMapToWALEdit(familyMap, localWalEdit);
+        addFamilyMapToWALEdit(familyMap, walEdit);
         this.log.append(regionInfo, this.htableDescriptor.getName(),
-            localWalEdit, clusterId, now, this.htableDescriptor);
+            walEdit, clusterId, now, this.htableDescriptor);
       }
 
       // Now make changes to the memstore.
-      long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
+      long addedSize = applyFamilyMapToMemstore(familyMap, null);
       flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
 
     } finally {
       this.updatesLock.readLock().unlock();
     }
     // do after lock
-    if (coprocessorHost != null && walEdit == null) {
-      coprocessorHost.postDelete(delete, localWalEdit, writeToWAL);
+    if (coprocessorHost != null) {
+      coprocessorHost.postDelete(delete, walEdit, writeToWAL);
     }
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
@@ -1876,7 +1885,7 @@ public class HRegion implements HeapSize
 
       try {
         // All edits for the given row (across all column families) must happen atomically.
-        internalPut(put, put.getClusterId(), writeToWAL, null, null);
+        internalPut(put, put.getClusterId(), writeToWAL);
       } finally {
         if(lockid == null) releaseRowLock(lid);
       }
@@ -2305,13 +2314,11 @@ public class HRegion implements HeapSize
           // originating cluster. A slave cluster receives the result as a Put
           // or Delete
           if (isPut) {
-            internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL,
-                null, null);
+            internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
           } else {
             Delete d = (Delete)w;
             prepareDelete(d);
-            internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL, null,
-                null);
+            internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
           }
           return true;
         }
@@ -2406,26 +2413,23 @@ public class HRegion implements HeapSize
     p.setFamilyMap(familyMap);
     p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
     p.setWriteToWAL(true);
-    this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true, null, null);
+    this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true);
   }
 
   /**
    * Add updates first to the hlog (if writeToWal) and then add values to memstore.
    * Warning: Assumption is caller has lock on passed in row.
    * @param put The Put command
+   * @param clusterId UUID of the originating cluster (for replication).
    * @param writeToWAL if true, then we should write to the log
-   * @param writeEntry Optional mvcc write point to use
-   * @param walEdit Optional walEdit to use. A non-null walEdit indicates
-   * that the coprocessor hooks are run by the caller
    * @throws IOException
    */
-  private void internalPut(Put put, UUID clusterId, boolean writeToWAL,
-      MultiVersionConsistencyControl.WriteEntry writeEntry, WALEdit walEdit) throws IOException {
+  private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException {
     Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
-    WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
+    WALEdit walEdit = new WALEdit();
     /* run pre put hook outside of lock to avoid deadlock */
-    if (coprocessorHost != null && walEdit == null) {
-      if (coprocessorHost.prePut(put, localWalEdit, writeToWAL)) {
+    if (coprocessorHost != null) {
+      if (coprocessorHost.prePut(put, walEdit, writeToWAL)) {
         return;
       }
     }
@@ -2445,19 +2449,19 @@ public class HRegion implements HeapSize
       // for some reason fail to write/sync to commit log, the memstore
       // will contain uncommitted transactions.
       if (writeToWAL) {
-        addFamilyMapToWALEdit(familyMap, localWalEdit);
+        addFamilyMapToWALEdit(familyMap, walEdit);
         this.log.append(regionInfo, this.htableDescriptor.getName(),
-            localWalEdit, clusterId, now, this.htableDescriptor);
+            walEdit, clusterId, now, this.htableDescriptor);
       }
 
-      long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
+      long addedSize = applyFamilyMapToMemstore(familyMap, null);
       flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
     } finally {
       this.updatesLock.readLock().unlock();
     }
 
-    if (coprocessorHost != null && walEdit == null) {
-      coprocessorHost.postPut(put, localWalEdit, writeToWAL);
+    if (coprocessorHost != null) {
+      coprocessorHost.postPut(put, walEdit, writeToWAL);
     }
 
     // do after lock
@@ -4140,92 +4144,107 @@ public class HRegion implements HeapSize
     return results;
   }
 
-  public int mutateRow(RowMutation rm,
+  public void mutateRow(RowMutation rm,
       Integer lockid) throws IOException {
+    boolean flush = false;
 
     startRegionOperation();
-    List<WALEdit> walEdits = new ArrayList<WALEdit>(rm.getMutations().size());
+    Integer lid = null;
+    try {
+      // 1. run all pre-hooks before the atomic operation
+      // if any pre hook indicates "bypass", bypass the entire operation
 
-    // 1. run all pre-hooks before the atomic operation
-    // if any pre hook indicates "bypass", bypass the entire operation
-    // Note that this requires creating the WALEdits here and passing
-    // them to the actual Put/Delete operations.
-    for (Mutation m : rm.getMutations()) {
+      // one WALEdit is used for all edits.
       WALEdit walEdit = new WALEdit();
-      walEdits.add(walEdit);
-      if (coprocessorHost == null) {
-        continue;
-      }
-      if (m instanceof Put) {
-        if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
-          // by pass everything
-          return 0;
-        }
-      } else if (m instanceof Delete) {
-        Delete d = (Delete) m;
-        prepareDelete(d);
-        if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
-          // by pass everything
-          return 0;
+      if (coprocessorHost != null) {
+        for (Mutation m : rm.getMutations()) {
+          if (m instanceof Put) {
+            if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
+              // by pass everything
+              return;
+            }
+          } else if (m instanceof Delete) {
+            Delete d = (Delete) m;
+            prepareDelete(d);
+            if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
+              // by pass everything
+              return;
+            }
+          }
         }
       }
-    }
 
-    // 2. acquire the row lock
-    Integer lid = getLock(lockid, rm.getRow(), true);
+      // 2. acquire the row lock
+      lid = getLock(lockid, rm.getRow(), true);
 
-    // 3. acquire the region lock
-    this.updatesLock.readLock().lock();
+      // 3. acquire the region lock
+      this.updatesLock.readLock().lock();
 
-    // 4. Get a mvcc write number
-    MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
-    try {
-      int i = 0;
-      // 5. Perform the actual mutations
-      for (Mutation m : rm.getMutations()) {
-        if (m instanceof Put) {
-          internalPut((Put) m, HConstants.DEFAULT_CLUSTER_ID,
-              m.getWriteToWAL(), w, walEdits.get(i));
-        } else if (m instanceof Delete) {
-          Delete d = (Delete) m;
-          prepareDelete(d);
-          internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, d.getWriteToWAL(),
-              w, walEdits.get(i));
-        } else {
-          throw new DoNotRetryIOException(
-              "Action must be Put or Delete. But was: "
-                  + m.getClass().getName());
-        }
-        i++;
-      }
-      return i;
-    } finally {
-      // 6. roll mvcc forward
-      mvcc.completeMemstoreInsert(w);
-      // 7. release region lock
-      this.updatesLock.readLock().unlock();
+      // 4. Get a mvcc write number
+      MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
+
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      byte[] byteNow = Bytes.toBytes(now);
       try {
-        // 8. run all coprocessor post hooks
-        if (coprocessorHost != null) {
-          int i = 0;
-          for (Mutation m : rm.getMutations()) {
-            if (m instanceof Put) {
-              coprocessorHost.postPut((Put) m, walEdits.get(i),
-                  m.getWriteToWAL());
-            } else if (m instanceof Delete) {
-              coprocessorHost.postDelete((Delete) m, walEdits.get(i),
-                  m.getWriteToWAL());
-            }
-            i++;
+        // 5. Check mutations and apply edits to a single WALEdit
+        for (Mutation m : rm.getMutations()) {
+          if (m instanceof Put) {
+            Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
+            checkFamilies(familyMap.keySet());
+            checkTimestamps(familyMap, now);
+            updateKVTimestamps(familyMap.values(), byteNow);
+          } else if (m instanceof Delete) {
+            Delete d = (Delete) m;
+            prepareDelete(d);
+            prepareDeleteTimestamps(d, byteNow);
+          } else {
+            throw new DoNotRetryIOException(
+                "Action must be Put or Delete. But was: "
+                    + m.getClass().getName());
+          }
+          if (m.getWriteToWAL()) {
+            addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
           }
         }
+
+        // 6. append/sync all edits at once
+        // TODO: Do batching as in doMiniBatchPut
+        this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit,
+            HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
+
+        // 7. apply to memstore
+        long addedSize = 0;
+        for (Mutation m : rm.getMutations()) {
+          addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
+        }
+        flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
       } finally {
-        if (lid != null) {
-          // 9. release the row lock
-          releaseRowLock(lid);
+        // 8. roll mvcc forward
+        mvcc.completeMemstoreInsert(w);
+
+        // 9. release region lock
+        this.updatesLock.readLock().unlock();
+      }
+      // 10. run all coprocessor post hooks, after region lock is released
+      if (coprocessorHost != null) {
+        for (Mutation m : rm.getMutations()) {
+          if (m instanceof Put) {
+            coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
+          } else if (m instanceof Delete) {
+            coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
+          }
         }
-        closeRegionOperation();
       }
+    } finally {
+      if (lid != null) {
+        // 11. release the row lock
+        releaseRowLock(lid);
+      }
+      if (flush) {
+        // 12. Flush cache if needed. Do it outside update lock.
+        requestFlush();
+      }
+      closeRegionOperation();
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Jan 17 19:38:13 2012
@@ -23,11 +23,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -94,62 +94,39 @@ public class ReplicationSink {
     // to the same table.
     try {
       long totalReplicated = 0;
-      // Map of table => list of puts, we only want to flushCommits once per
+      // Map of table => list of Rows, we only want to flushCommits once per
       // invocation of this method per table.
-      Map<byte[], List<Put>> puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
+      Map<byte[], List<Row>> rows = new TreeMap<byte[], List<Row>>(Bytes.BYTES_COMPARATOR);
       for (HLog.Entry entry : entries) {
         WALEdit edit = entry.getEdit();
+        byte[] table = entry.getKey().getTablename();
+        Put put = null;
+        Delete del = null;
+        KeyValue lastKV = null;
         List<KeyValue> kvs = edit.getKeyValues();
-        if (kvs.get(0).isDelete()) {
-          Delete delete = new Delete(kvs.get(0).getRow(),
-              kvs.get(0).getTimestamp(), null);
-          delete.setClusterId(entry.getKey().getClusterId());
-          for (KeyValue kv : kvs) {
-            switch (Type.codeToType(kv.getType())) {
-            case DeleteFamily:
-              // family marker
-              delete.deleteFamily(kv.getFamily(), kv.getTimestamp());
-              break;
-            case DeleteColumn:
-              // column marker
-              delete.deleteColumns(kv.getFamily(), kv.getQualifier(),
-                  kv.getTimestamp());
-              break;
-            case Delete:
-              // version marker
-              delete.deleteColumn(kv.getFamily(), kv.getQualifier(),
-                  kv.getTimestamp());
-              break;
-            }
-          }
-          delete(entry.getKey().getTablename(), delete);
-        } else {
-          byte[] table = entry.getKey().getTablename();
-          List<Put> tableList = puts.get(table);
-          if (tableList == null) {
-            tableList = new ArrayList<Put>();
-            puts.put(table, tableList);
-          }
-          // With mini-batching, we need to expect multiple rows per edit
-          byte[] lastKey = kvs.get(0).getRow();
-          Put put = new Put(lastKey, kvs.get(0).getTimestamp());
-          put.setClusterId(entry.getKey().getClusterId());
-          for (KeyValue kv : kvs) {
-            byte[] key = kv.getRow();            
-            if (!Bytes.equals(lastKey, key)) {
-              tableList.add(put);
-              put = new Put(key, kv.getTimestamp());
+        for (KeyValue kv : kvs) {
+          if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
+            if (kv.isDelete()) {
+              del = new Delete(kv.getRow());
+              del.setClusterId(entry.getKey().getClusterId());
+              addToMultiMap(rows, table, del);
+            } else {
+              put = new Put(kv.getRow());
               put.setClusterId(entry.getKey().getClusterId());
+              addToMultiMap(rows, table, put);
             }
+          }
+          if (kv.isDelete()) {
+            del.addDeleteMarker(kv);
+          } else {
             put.add(kv);
-            lastKey = key;
           }
-          tableList.add(put);
+          lastKV = kv;
         }
         totalReplicated++;
       }
-      for(byte [] table : puts.keySet()) {
-        put(table, puts.get(table));
+      for(byte [] table : rows.keySet()) {
+        batch(table, rows.get(table));
       }
       this.metrics.setAgeOfLastAppliedOp(
           entries[entries.length-1].getKey().getWriteTime());
@@ -162,39 +139,40 @@ public class ReplicationSink {
   }
 
   /**
-   * Do the puts and handle the pool
-   * @param tableName table to insert into
-   * @param puts list of puts
-   * @throws IOException
+   * Simple helper to a map from key to (a list of) values
+   * TODO: Make a general utility method
+   * @param map
+   * @param key
+   * @param value
+   * @return
    */
-  private void put(byte[] tableName, List<Put> puts) throws IOException {
-    if (puts.isEmpty()) {
-      return;
-    }
-    HTableInterface table = null;
-    try {
-      table = this.pool.getTable(tableName);
-      table.put(puts);
-      this.metrics.appliedOpsRate.inc(puts.size());
-    } finally {
-      if (table != null) {
-        table.close();
-      }
+  private <K, V> List<V> addToMultiMap(Map<K, List<V>> map, K key, V value) {
+    List<V> values = map.get(key);
+    if (values == null) {
+      values = new ArrayList<V>();
+      map.put(key, values);
     }
+    values.add(value);
+    return values;
   }
 
   /**
-   * Do the delete and handle the pool
-   * @param tableName table to delete in
-   * @param delete the delete to use
+   * Do the changes and handle the pool
+   * @param tableName table to insert into
+   * @param rows list of actions
    * @throws IOException
    */
-  private void delete(byte[] tableName, Delete delete) throws IOException {
+  private void batch(byte[] tableName, List<Row> rows) throws IOException {
+    if (rows.isEmpty()) {
+      return;
+    }
     HTableInterface table = null;
     try {
       table = this.pool.getTable(tableName);
-      table.delete(delete);
-      this.metrics.appliedOpsRate.inc(1);
+      table.batch(rows);
+      this.metrics.appliedOpsRate.inc(rows.size());
+    } catch (InterruptedException ix) {
+      throw new IOException(ix);
     } finally {
       if (table != null) {
         table.close();

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Tue Jan 17 19:38:13 2012
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertArr
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -4046,7 +4047,6 @@ public class TestFromClientSide {
         Bytes.toBytes("a"), Bytes.toBytes("b")
     };
     RowMutation arm = new RowMutation(ROW);
-    arm.add(new Delete(ROW));
     Put p = new Put(ROW);
     p.add(FAMILY, QUALIFIERS[0], VALUE);
     arm.add(p);
@@ -4054,15 +4054,19 @@ public class TestFromClientSide {
 
     Get g = new Get(ROW);
     Result r = t.get(g);
-    // delete was first, row should exist
     assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
 
     arm = new RowMutation(ROW);
+    p = new Put(ROW);
+    p.add(FAMILY, QUALIFIERS[1], VALUE);
     arm.add(p);
-    arm.add(new Delete(ROW));
+    Delete d = new Delete(ROW);
+    d.deleteColumns(FAMILY, QUALIFIERS[0]);
+    arm.add(d);
     t.batch(Arrays.asList((Row)arm));
     r = t.get(g);
-    assertTrue(r.isEmpty());
+    assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
+    assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
   }
 
   @Test

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java Tue Jan 17 19:38:13 2012
@@ -333,6 +333,8 @@ public class TestAtomicOperation extends
           }
         } catch (IOException e) {
           e.printStackTrace();
+          failures.incrementAndGet();
+          fail();
         }
       }
     }