You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/05/02 22:52:04 UTC

svn commit: r1333196 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hb...

Author: mbautin
Date: Wed May  2 20:52:03 2012
New Revision: 1333196

URL: http://svn.apache.org/viewvc?rev=1333196&view=rev
Log:
[89-fb] Allow atomic put/delete in one call: Port HBASE-3584 and HBASE-5203 to 0.89

Author: aaiyer

Summary:
Initial Patch. Needs clean up.

Out for some initial thoughts on a few issues.

Not ready for thorough review.

Test Plan:
mr test. -- Test Failures on ext-89:
TestRegionStateOnMasterFailure, TestRegionFavoredNodes

Reviewers: kannan, kranganathan

Reviewed By: kannan

CC: HBase Diffs Facebook Group, aaiyer, Kannan, madhuvaidya

Differential Revision: https://reviews.facebook.net/D1713

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java?rev=1333196&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Attributes.java Wed May  2 20:52:03 2012
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.Map;
+
+public interface Attributes {
+  /**
+   * Sets an attribute.
+   * In case value = null attribute is removed from the attributes map.
+   * Attribute names starting with _ indicate system attributes.
+   * @param name attribute name
+   * @param value attribute value
+   */
+  public void setAttribute(String name, byte[] value);
+
+  /**
+   * Gets an attribute
+   * @param name attribute name
+   * @return attribute value if attribute is set, <tt>null</tt> otherwise
+   */
+  public byte[] getAttribute(String name);
+
+  /**
+   * Gets all attributes
+   * @return unmodifiable map of all attributes
+   */
+  public Map<String, byte[]> getAttributesMap();
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java Wed May  2 20:52:03 2012
@@ -66,16 +66,9 @@ import java.util.TreeMap;
  * deleteFamily -- then you need to use the method overrides that take a
  * timestamp.  The constructor timestamp is not referenced.
  */
-public class Delete extends Operation
+public class Delete extends Mutation
   implements Writable, Row, Comparable<Row> {
-  private static final byte DELETE_VERSION = (byte)1;
-
-  private byte [] row = null;
-  // This ts is only used when doing a deleteRow.  Anything less,
-  private long ts;
-  private long lockId = -1L;
-  private final Map<byte [], List<KeyValue>> familyMap =
-    new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
+  private static final byte DELETE_VERSION = (byte)3;
 
   /** Constructor for Writable.  DO NOT USE */
   public Delete() {
@@ -124,19 +117,10 @@ public class Delete extends Operation
     this.ts = d.getTimeStamp();
     this.lockId = d.getLockId();
     this.familyMap.putAll(d.getFamilyMap());
+    this.writeToWAL = d.writeToWAL;
   }
 
-  public int compareTo(final Row d) {
-    return Bytes.compareTo(this.getRow(), d.getRow());
-  }
 
-  /**
-   * Method to check if the familyMap is empty
-   * @return true if empty, false otherwise
-   */
-  public boolean isEmpty() {
-    return familyMap.isEmpty();
-  }
 
   /**
    * Delete all versions of all columns of the specified family.
@@ -243,102 +227,6 @@ public class Delete extends Operation
     return this.familyMap;
   }
 
-  /**
-   *  Method for retrieving the delete's row
-   * @return row
-   */
-  public byte [] getRow() {
-    return this.row;
-  }
-
-  /**
-   * Method for retrieving the delete's RowLock
-   * @return RowLock
-   */
-  public RowLock getRowLock() {
-    return new RowLock(this.row, this.lockId);
-  }
-
-  /**
-   * Method for retrieving the delete's lock ID.
-   *
-   * @return The lock ID.
-   */
-  public long getLockId() {
-	return this.lockId;
-  }
-
-  /**
-   * Method for retrieving the delete's timestamp
-   * @return timestamp
-   */
-  public long getTimeStamp() {
-    return this.ts;
-  }
-
-  /**
-   * Compile the column family (i.e. schema) information
-   * into a Map. Useful for parsing and aggregation by debugging,
-   * logging, and administration tools.
-   * @return Map
-   */
-  @Override
-  public Map<String, Object> getFingerprint() {
-    Map<String, Object> map = new HashMap<String, Object>();
-    List<String> families = new ArrayList<String>();
-    // ideally, we would also include table information, but that information
-    // is not stored in each Operation instance.
-    map.put("families", families);
-    for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
-      families.add(Bytes.toStringBinary(entry.getKey()));
-    }
-    return map;
-  }
-
-  /**
-   * Compile the details beyond the scope of getFingerprint (row, columns,
-   * timestamps, etc.) into a Map along with the fingerprinted information.
-   * Useful for debugging, logging, and administration tools.
-   * @param maxCols a limit on the number of columns output prior to truncation
-   * @return Map
-   */
-  @Override
-  public Map<String, Object> toMap(int maxCols) {
-    // we start with the fingerprint map and build on top of it.
-    Map<String, Object> map = getFingerprint();
-    // replace the fingerprint's simple list of families with a
-    // map from column families to lists of qualifiers and kv details
-    Map<String, List<Map<String, Object>>> columns =
-      new HashMap<String, List<Map<String, Object>>>();
-    map.put("families", columns);
-    map.put("row", Bytes.toStringBinary(this.row));
-    int colCount = 0;
-    // iterate through all column families affected by this Delete
-    for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
-      // map from this family to details for each kv affected within the family
-      List<Map<String, Object>> qualifierDetails =
-        new ArrayList<Map<String, Object>>();
-      columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
-      colCount += entry.getValue().size();
-      if (maxCols <= 0) {
-        continue;
-      }
-      // add details for each kv
-      for (KeyValue kv : entry.getValue()) {
-        if (--maxCols <= 0 ) {
-          continue;
-        }
-        Map<String, Object> kvMap = kv.toStringMap();
-        // row and family information are already available in the bigger map
-        kvMap.remove("row");
-        kvMap.remove("family");
-        qualifierDetails.add(kvMap);
-      }
-    }
-    map.put("totalColumns", colCount);
-    return map;
-  }
-
   //Writable
   public void readFields(final DataInput in) throws IOException {
     int version = in.readByte();
@@ -348,6 +236,9 @@ public class Delete extends Operation
     this.row = Bytes.readByteArray(in);
     this.ts = in.readLong();
     this.lockId = in.readLong();
+    if (version > 2) {
+      this.writeToWAL = in.readBoolean();
+    }
     this.familyMap.clear();
     int numFamilies = in.readInt();
     for(int i=0;i<numFamilies;i++) {
@@ -361,6 +252,9 @@ public class Delete extends Operation
       }
       this.familyMap.put(family, list);
     }
+    if (version > 1) {
+      readAttributes(in);
+    }
   }
 
   public void write(final DataOutput out) throws IOException {
@@ -368,6 +262,7 @@ public class Delete extends Operation
     Bytes.writeByteArray(out, this.row);
     out.writeLong(this.ts);
     out.writeLong(this.lockId);
+    out.writeBoolean(this.writeToWAL);
     out.writeInt(familyMap.size());
     for(Map.Entry<byte [], List<KeyValue>> entry : familyMap.entrySet()) {
       Bytes.writeByteArray(out, entry.getKey());
@@ -377,6 +272,7 @@ public class Delete extends Operation
         kv.write(out);
       }
     }
+    writeAttributes(out);
   }
 
   /**
@@ -408,4 +304,17 @@ public class Delete extends Operation
   }
 
 
+  /**
+   * Compile the column family (i.e. schema) information
+   * into a Map. Useful for parsing and aggregation by debugging,
+   * logging, and administration tools.
+   * @return Map
+   */
+  @Override
+  public Map<String, Object> getFingerprint() {
+    Map<String, Object> map = super.getFingerprint();
+    map.put("operation", "Delete");
+    return map;
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java Wed May  2 20:52:03 2012
@@ -66,7 +66,8 @@ import java.util.TreeSet;
  * <p>
  * To add a filter, execute {@link #setFilter(Filter) setFilter}.
  */
-public class Get extends Operation implements Writable, Row, Comparable<Row> {
+public class Get extends OperationWithAttributes
+  implements Writable, Row, Comparable<Row> {
   private static final byte GET_VERSION = (byte)3;
 
   private byte [] row = null;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed May  2 20:52:03 2012
@@ -195,7 +195,7 @@ public interface HConnection extends Clo
    * @throws IOException if a remote or network exception occurs
    * @throws RuntimeException other unspecified error
    */
-  public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable) 
+  public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
   throws IOException, RuntimeException;
 
   /**
@@ -236,6 +236,10 @@ public interface HConnection extends Clo
   public void processBatchOfPuts(List<Put> list, final byte[] tableName)
   throws IOException;
 
+    public int processBatchOfRowMutations(final List<RowMutations> list,
+      final byte[] tableName)
+    throws IOException;
+
   /**
    * Enable or disable region cache prefetch for the table. It will be
    * applied for the given table's all HTable instances within this

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed May  2 20:52:03 2012
@@ -1525,6 +1525,31 @@ public class HConnectionManager {
       return results;
     }
 
+    public int processBatchOfRowMutations(final List<RowMutations> list,
+      final byte[] tableName)
+    throws IOException {
+      if (list.isEmpty()) return 0;
+      Batch<Object> b = new Batch<Object>(this) {
+        @SuppressWarnings("unchecked")
+        @Override
+        int doCall(final List<? extends Row> currentList, final byte [] row,
+            final byte[] tableName, Object whatevs)
+        throws IOException, RuntimeException {
+          final List<RowMutations> mutations = (List<RowMutations>)currentList;
+          getRegionServerWithRetries(new ServerCallable<Void>(this.c,
+                tableName, row) {
+              public Void call() throws IOException {
+                server.mutateRow(location.getRegionInfo().getRegionName(),
+                  mutations);
+                return null;
+              }
+            });
+            return -1;
+          }
+        };
+      return b.process(list, tableName, new Object());
+      }
+
     public int processBatchOfDeletes(final List<Delete> list,
       final byte[] tableName)
     throws IOException {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed May  2 20:52:03 2012
@@ -668,6 +668,28 @@ public class HTable implements HTableInt
   }
 
   /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void mutateRow(final RowMutations arm) throws IOException {
+    connection.getRegionServerWithRetries(
+	    new ServerCallable<Void>(connection, tableName, arm.getRow()) {
+	      public Void call() throws IOException {
+	        server.mutateRow(location.getRegionInfo().getRegionName(), arm);
+	        return null;
+	      }
+	    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void mutateRow(final List<RowMutations> armList) throws IOException {
+	  connection.processBatchOfRowMutations(armList, this.tableName);
+  }
+
+  /**
    * Test for the existence of columns in the table, as specified in the Get.<p>
    *
    * This will return true if the Get matches one or more keys, false if not.<p>

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed May  2 20:52:03 2012
@@ -265,6 +265,18 @@ public interface HTableInterface {
       long amount, boolean writeToWAL) throws IOException;
 
   /**
+   * Performs multiple mutations atomically on a single row. Currently
+   * {@link Put} and {@link Delete} are supported.
+   *
+   * @param arm object that specifies the set of mutations to perform
+   * atomically
+   * @throws IOException
+   */
+  public void mutateRow(final RowMutations arm) throws IOException;
+
+  public void mutateRow(List<RowMutations> armList) throws IOException;
+  
+  /**
    * Tells whether or not 'auto-flush' is turned on.
    *
    * @return {@code true} if 'auto-flush' is enabled (default), meaning
@@ -310,4 +322,5 @@ public interface HTableInterface {
    * @see #unlockRow
    */
   void unlockRow(RowLock rl) throws IOException;
+
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1333196&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed May  2 20:52:03 2012
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public abstract class Mutation extends OperationWithAttributes {
+  // Attribute used in Mutations to indicate the originating cluster.
+  private static final String CLUSTER_ID_ATTR = "_c.id_";
+
+  protected byte [] row = null;
+  protected long ts = HConstants.LATEST_TIMESTAMP;
+  protected long lockId = -1L;
+  protected boolean writeToWAL = true;
+  protected Map<byte [], List<KeyValue>> familyMap =
+      new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
+
+  /**
+   * Compile the column family (i.e. schema) information
+   * into a Map. Useful for parsing and aggregation by debugging,
+   * logging, and administration tools.
+   * @return Map
+   */
+  @Override
+  public Map<String, Object> getFingerprint() {
+    Map<String, Object> map = new HashMap<String, Object>();
+    List<String> families = new ArrayList<String>();
+    // ideally, we would also include table information, but that information
+    // is not stored in each Operation instance.
+    map.put("families", families);
+    for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
+      families.add(Bytes.toStringBinary(entry.getKey()));
+    }
+    return map;
+  }
+
+  /**
+   * Compile the details beyond the scope of getFingerprint (row, columns,
+   * timestamps, etc.) into a Map along with the fingerprinted information.
+   * Useful for debugging, logging, and administration tools.
+   * @param maxCols a limit on the number of columns output prior to truncation
+   * @return Map
+   */
+  @Override
+  public Map<String, Object> toMap(int maxCols) {
+    // we start with the fingerprint map and build on top of it.
+    Map<String, Object> map = getFingerprint();
+    // replace the fingerprint's simple list of families with a
+    // map from column families to lists of qualifiers and kv details
+    Map<String, List<Map<String, Object>>> columns =
+      new HashMap<String, List<Map<String, Object>>>();
+    map.put("families", columns);
+    map.put("row", Bytes.toStringBinary(this.row));
+    int colCount = 0;
+    // iterate through all column families affected
+    for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
+      // map from this family to details for each kv affected within the family
+      List<Map<String, Object>> qualifierDetails =
+        new ArrayList<Map<String, Object>>();
+      columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
+      colCount += entry.getValue().size();
+      if (maxCols <= 0) {
+        continue;
+      }
+      // add details for each kv
+      for (KeyValue kv : entry.getValue()) {
+        if (--maxCols <= 0 ) {
+          continue;
+        }
+        Map<String, Object> kvMap = kv.toStringMap();
+        // row and family information are already available in the bigger map
+        kvMap.remove("row");
+        kvMap.remove("family");
+        qualifierDetails.add(kvMap);
+      }
+    }
+    map.put("totalColumns", colCount);
+    return map;
+  }
+
+  /**
+   * @return true if edits should be applied to WAL, false if not
+   */
+  public boolean getWriteToWAL() {
+    return this.writeToWAL;
+  }
+
+  /**
+   * Set whether this Delete should be written to the WAL or not.
+   * Not writing the WAL means you may lose edits on server crash.
+   * @param write true if edits should be written to WAL, false if not
+   */
+  public void setWriteToWAL(boolean write) {
+    this.writeToWAL = write;
+  }
+
+  /**
+   * Method for retrieving the put's familyMap
+   * @return familyMap
+   */
+  public Map<byte [], List<KeyValue>> getFamilyMap() {
+    return this.familyMap;
+  }
+
+  /**
+   * Method for setting the put's familyMap
+   */
+  public void setFamilyMap(Map<byte [], List<KeyValue>> map) {
+    this.familyMap = map;
+  }
+
+  /**
+   * Method to check if the familyMap is empty
+   * @return true if empty, false otherwise
+   */
+  public boolean isEmpty() {
+    return familyMap.isEmpty();
+  }
+
+  /**
+   * Method for retrieving the delete's row
+   * @return row
+   */
+  public byte [] getRow() {
+    return this.row;
+  }
+
+  public int compareTo(final Row d) {
+    return Bytes.compareTo(this.getRow(), d.getRow());
+  }
+
+  /**
+   * Method for retrieving the delete's RowLock
+   * @return RowLock
+   */
+  public RowLock getRowLock() {
+    return new RowLock(this.row, this.lockId);
+  }
+
+  /**
+   * Method for retrieving the delete's lock ID.
+   *
+   * @return The lock ID.
+   */
+  public long getLockId() {
+  return this.lockId;
+  }
+
+  /**
+   * Method for retrieving the timestamp
+   * @return timestamp
+   */
+  public long getTimeStamp() {
+    return this.ts;
+  }
+
+  /**
+   * @return the total number of KeyValues
+   */
+  public int size() {
+    int size = 0;
+    for(List<KeyValue> kvList : this.familyMap.values()) {
+      size += kvList.size();
+    }
+    return size;
+  }
+
+  /**
+   * @return the number of different families
+   */
+  public int numFamilies() {
+    return familyMap.size();
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java?rev=1333196&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java Wed May  2 20:52:03 2012
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.WritableUtils;
+
+public abstract class OperationWithAttributes extends Operation implements Attributes {
+  // a opaque blob of attributes
+  private Map<String, byte[]> attributes;
+
+  public void setAttribute(String name, byte[] value) {
+    if (attributes == null && value == null) {
+      return;
+    }
+
+    if (attributes == null) {
+      attributes = new HashMap<String, byte[]>();
+    }
+
+    if (value == null) {
+      attributes.remove(name);
+      if (attributes.isEmpty()) {
+        this.attributes = null;
+      }
+    } else {
+      attributes.put(name, value);
+    }
+  }
+
+  public byte[] getAttribute(String name) {
+    if (attributes == null) {
+      return null;
+    }
+
+    return attributes.get(name);
+  }
+
+  public Map<String, byte[]> getAttributesMap() {
+    if (attributes == null) {
+      return Collections.emptyMap();
+    }
+    return Collections.unmodifiableMap(attributes);
+  }
+
+  protected long getAttributeSize() {
+    long size = 0;
+    if (attributes != null) {
+      size += ClassSize.align(this.attributes.size() * ClassSize.MAP_ENTRY);
+      for(Map.Entry<String, byte[]> entry : this.attributes.entrySet()) {
+        size += ClassSize.align(ClassSize.STRING + entry.getKey().length());
+        size += ClassSize.align(ClassSize.ARRAY + entry.getValue().length);
+      }
+    }
+    return size;
+  }
+
+  protected void writeAttributes(final DataOutput out) throws IOException {
+    if (this.attributes == null) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(this.attributes.size());
+      for (Map.Entry<String, byte[]> attr : this.attributes.entrySet()) {
+        WritableUtils.writeString(out, attr.getKey());
+        Bytes.writeByteArray(out, attr.getValue());
+      }
+    }
+  }
+
+  protected void readAttributes(final DataInput in) throws IOException {
+    int numAttributes = in.readInt();
+    if (numAttributes > 0) {
+      this.attributes = new HashMap<String, byte[]>();
+      for(int i=0; i<numAttributes; i++) {
+        String name = WritableUtils.readString(in);
+        byte[] value = Bytes.readByteArray(in);
+        this.attributes.put(name, value);
+      }
+    }
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Put.java Wed May  2 20:52:03 2012
@@ -45,20 +45,12 @@ import java.util.TreeMap;
  * for each column to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or
  * {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp.
  */
-public class Put extends Operation
+public class Put extends Mutation
   implements HeapSize, Writable, Row, Comparable<Row> {
-  private static final byte PUT_VERSION = (byte)1;
-
-  private byte [] row = null;
-  private long timestamp = HConstants.LATEST_TIMESTAMP;
-  private long lockId = -1L;
-  private boolean writeToWAL = true;
-
-  private Map<byte [], List<KeyValue>> familyMap =
-    new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
+  private static final byte PUT_VERSION = (byte)2;
 
   private static final long OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + ClassSize.REFERENCE +
+      ClassSize.OBJECT + 2 * ClassSize.REFERENCE +
       2 * Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN +
       ClassSize.REFERENCE + ClassSize.TREEMAP);
 
@@ -103,7 +95,7 @@ public class Put extends Operation
       throw new IllegalArgumentException("Row key is invalid");
     }
     this.row = Arrays.copyOf(row, row.length);
-    this.timestamp = ts;
+    this.ts = ts;
     if(rowLock != null) {
       this.lockId = rowLock.getLockId();
     }
@@ -114,7 +106,7 @@ public class Put extends Operation
    * @param putToCopy put to copy
    */
   public Put(Put putToCopy) {
-    this(putToCopy.getRow(), putToCopy.timestamp, putToCopy.getRowLock());
+    this(putToCopy.getRow(), putToCopy.ts, putToCopy.getRowLock());
     this.familyMap =
       new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
     for(Map.Entry<byte [], List<KeyValue>> entry :
@@ -132,7 +124,7 @@ public class Put extends Operation
    * @return this
    */
   public Put add(byte [] family, byte [] qualifier, byte [] value) {
-    return add(family, qualifier, this.timestamp, value);
+    return add(family, qualifier, this.ts, value);
   }
 
   /**
@@ -199,7 +191,7 @@ public class Put extends Operation
    * existing KeyValue object in the family map.
    */
   public boolean has(byte [] family, byte [] qualifier) {
-  return has(family, qualifier, this.timestamp, new byte[0], true, true);
+  return has(family, qualifier, this.ts, new byte[0], true, true);
   }
 
   /**
@@ -229,7 +221,7 @@ public class Put extends Operation
    * existing KeyValue object in the family map.
    */
   public boolean has(byte [] family, byte [] qualifier, byte [] value) {
-    return has(family, qualifier, this.timestamp, value, true, false);
+    return has(family, qualifier, this.ts, value, true, false);
   }
 
   /**
@@ -328,154 +320,6 @@ public class Put extends Operation
     return list;
   }
 
-  /**
-   * Method for retrieving the put's familyMap
-   * @return familyMap
-   */
-  public Map<byte [], List<KeyValue>> getFamilyMap() {
-    return this.familyMap;
-  }
-
-  /**
-   * Method for retrieving the put's row
-   * @return row
-   */
-  public byte [] getRow() {
-    return this.row;
-  }
-
-  /**
-   * Method for retrieving the put's RowLock
-   * @return RowLock
-   */
-  public RowLock getRowLock() {
-    return new RowLock(this.row, this.lockId);
-  }
-
-  /**
-   * Method for retrieving the put's lockId
-   * @return lockId
-   */
-  public long getLockId() {
-  	return this.lockId;
-  }
-
-  /**
-   * Method to check if the familyMap is empty
-   * @return true if empty, false otherwise
-   */
-  public boolean isEmpty() {
-    return familyMap.isEmpty();
-  }
-
-  /**
-   * @return Timestamp
-   */
-  public long getTimeStamp() {
-    return this.timestamp;
-  }
-
-  /**
-   * @return the number of different families included in this put
-   */
-  public int numFamilies() {
-    return familyMap.size();
-  }
-
-  /**
-   * @return the total number of KeyValues that will be added with this put
-   */
-  public int size() {
-    int size = 0;
-    for(List<KeyValue> kvList : this.familyMap.values()) {
-      size += kvList.size();
-    }
-    return size;
-  }
-
-  /**
-   * @return true if edits should be applied to WAL, false if not
-   */
-  public boolean getWriteToWAL() {
-    return this.writeToWAL;
-  }
-
-  /**
-   * Set whether this Put should be written to the WAL or not.
-   * Not writing the WAL means you may lose edits on server crash.
-   * @param write true if edits should be written to WAL, false if not
-   */
-  public void setWriteToWAL(boolean write) {
-    this.writeToWAL = write;
-  }
-
-  /**
-   * Compile the column family (i.e. schema) information
-   * into a Map. Useful for parsing and aggregation by debugging,
-   * logging, and administration tools.
-   * @return Map
-   */
-  @Override
-  public Map<String, Object> getFingerprint() {
-    Map<String, Object> map = new HashMap<String, Object>();
-    List<String> families = new ArrayList<String>();
-    // ideally, we would also include table information, but that information
-    // is not stored in each Operation instance.
-    map.put("families", families);
-    for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
-      families.add(Bytes.toStringBinary(entry.getKey()));
-    }
-    return map;
-  }
-
-  /**
-   * Compile the details beyond the scope of getFingerprint (row, columns,
-   * timestamps, etc.) into a Map along with the fingerprinted information.
-   * Useful for debugging, logging, and administration tools.
-   * @param maxCols a limit on the number of columns output prior to truncation
-   * @return Map
-   */
-  @Override
-  public Map<String, Object> toMap(int maxCols) {
-    // we start with the fingerprint map and build on top of it.
-    Map<String, Object> map = getFingerprint();
-    // replace the fingerprint's simple list of families with a
-    // map from column families to lists of qualifiers and kv details
-    Map<String, List<Map<String, Object>>> columns =
-      new HashMap<String, List<Map<String, Object>>>();
-    map.put("families", columns);
-    map.put("row", Bytes.toStringBinary(this.row));
-    int colCount = 0;
-    // iterate through all column families affected by this Put
-    for (Map.Entry<byte [], List<KeyValue>> entry : this.familyMap.entrySet()) {
-      // map from this family to details for each kv affected within the family
-      List<Map<String, Object>> qualifierDetails =
-        new ArrayList<Map<String, Object>>();
-      columns.put(Bytes.toStringBinary(entry.getKey()), qualifierDetails);
-      colCount += entry.getValue().size();
-      if (maxCols <= 0) {
-        continue;
-      }
-      // add details for each kv
-      for (KeyValue kv : entry.getValue()) {
-        if (--maxCols <= 0 ) {
-          continue;
-        }
-        Map<String, Object> kvMap = kv.toStringMap();
-        // row and family information are already available in the bigger map
-        kvMap.remove("row");
-        kvMap.remove("family");
-        qualifierDetails.add(kvMap);
-      }
-    }
-    map.put("totalColumns", colCount);
-    return map;
-  }
-
-  public int compareTo(Row p) {
-    return Bytes.compareTo(this.getRow(), p.getRow());
-  }
-
   //HeapSize
   public long heapSize() {
     long heapsize = OVERHEAD;
@@ -502,6 +346,8 @@ public class Put extends Operation
         heapsize += kv.heapSize();
       }
     }
+    heapsize += getAttributeSize();
+
     return ClassSize.align((int)heapsize);
   }
 
@@ -513,7 +359,7 @@ public class Put extends Operation
       throw new IOException("version not supported");
     }
     this.row = Bytes.readByteArray(in);
-    this.timestamp = in.readLong();
+    this.ts = in.readLong();
     this.lockId = in.readLong();
     this.writeToWAL = in.readBoolean();
     int numFamilies = in.readInt();
@@ -533,13 +379,16 @@ public class Put extends Operation
       }
       this.familyMap.put(family, keys);
     }
+    if (version > 1) {
+      readAttributes(in);
+    }
   }
 
   public void write(final DataOutput out)
   throws IOException {
     out.writeByte(PUT_VERSION);
     Bytes.writeByteArray(out, this.row);
-    out.writeLong(this.timestamp);
+    out.writeLong(this.ts);
     out.writeLong(this.lockId);
     out.writeBoolean(this.writeToWAL);
     out.writeInt(familyMap.size());
@@ -557,10 +406,24 @@ public class Put extends Operation
         out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
       }
     }
+    writeAttributes(out);
   }
 
   /**
-   * Add the specified column and value, with the specified timestamp as
+   * Compile the column family (i.e. schema) information
+   * into a Map. Useful for parsing and aggregation by debugging,
+   * logging, and administration tools.
+   * @return Map
+   */
+  @Override
+  public Map<String, Object> getFingerprint() {
+    Map<String, Object> map = super.getFingerprint();
+    map.put("operation", "Put");
+    return map;
+  }
+
+  /**
+    * Add the specified column and value, with the specified timestamp as
    * its version to this Put operation.
    * @param column Old style column name with family and qualifier put together
    * with a colon.
@@ -572,5 +435,6 @@ public class Put extends Operation
   public Put add(byte [] column, long ts, byte [] value) {
     byte [][] parts = KeyValue.parseColumn(column);
     return add(parts[0], parts[1], ts, value);
+
   }
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java?rev=1333196&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java Wed May  2 20:52:03 2012
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Performs multiple mutations atomically on a single row.
+ * Currently {@link Put} and {@link Delete} are supported.
+ *
+ * The mutations are performed in the order in which they
+ * were added.
+ */
+public class RowMutations extends Operation implements Row {
+  private List<Mutation> mutations = new ArrayList<Mutation>();
+  protected byte [] row;
+  private static final byte VERSION = (byte)1;
+
+  /** Constructor for Writable. DO NOT USE */
+  public RowMutations() {}
+
+  /**
+   * Create an atomic mutation for the specified row.
+   * @param row row key
+   */
+  public RowMutations(byte [] row) {
+    if(row == null || row.length > HConstants.MAX_ROW_LENGTH) {
+      throw new IllegalArgumentException("Row key is invalid");
+    }
+    this.row = Arrays.copyOf(row, row.length);
+  }
+
+  /**
+   * Add a {@link Put} operation to the list of mutations
+   * @param p The {@link Put} to add
+   * @throws IOException
+   */
+  public void add(Put p) throws IOException {
+    internalAdd(p);
+  }
+
+  /**
+   * Add a {@link Delete} operation to the list of mutations
+   * @param d The {@link Delete} to add
+   * @throws IOException
+   */
+  public void add(Delete d) throws IOException {
+    internalAdd(d);
+  }
+
+  private void internalAdd(Mutation m) throws IOException {
+    int res = Bytes.compareTo(this.row, m.getRow());
+    if(res != 0) {
+      throw new IOException("The row in the recently added Put/Delete " +
+          Bytes.toStringBinary(m.getRow()) + " doesn't match the original one " +
+          Bytes.toStringBinary(this.row));
+    }
+    mutations.add(m);
+  }
+
+  @Override
+  public void readFields(final DataInput in) throws IOException {
+    int version = in.readByte();
+    if (version > VERSION) {
+      throw new IOException("version not supported");
+    }
+    this.row = Bytes.readByteArray(in);
+    int numMutations = in.readInt();
+    mutations.clear();
+    for(int i=0; i<numMutations; i++) {
+      mutations.add((Mutation) HbaseObjectWritable.readObject(in, null));
+    }
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeByte(VERSION);
+    Bytes.writeByteArray(out, this.row);
+    out.writeInt(mutations.size());
+    for (Mutation m : mutations) {
+      HbaseObjectWritable.writeObject(out, m, m.getClass(), null);
+    }
+  }
+
+  @Override
+  public int compareTo(Row i) {
+    return Bytes.compareTo(this.getRow(), i.getRow());
+  }
+
+  @Override
+  public byte[] getRow() {
+    return row;
+  }
+
+  /**
+   * @return An unmodifiable list of the current mutations.
+   */
+  public List<Mutation> getMutations() {
+    return Collections.unmodifiableList(mutations);
+  }
+  
+  /**
+   * Compile the column family (i.e. schema) information
+   * into a Map. Useful for parsing and aggregation by debugging,
+   * logging, and administration tools.
+   * 
+   * e.g: {num-delete=1, num-put=1, row=testRow}
+   * @return Map
+   */
+  @Override
+  public Map<String, Object> getFingerprint() {
+    Map<String, Object> map = new HashMap<String, Object>();
+    List<String> mutationsList = new ArrayList<String>();
+    // ideally, we would also include table information, but that information
+    // is not stored in each Operation instance.
+    map.put("row", Bytes.toStringBinary(this.row));
+    int deleteCnt = 0, putCnt = 0;
+    for (Mutation mod: this.mutations) {
+    	if (mod instanceof Put) {
+    		putCnt++;
+    	}
+    	else {
+    		deleteCnt++;
+    	}
+    }
+    map.put("num-put", putCnt);
+    map.put("num-delete", deleteCnt);
+    return map;
+  }
+
+  /**
+   * Compile the details beyond the scope of getFingerprint (row, columns,
+   * timestamps, etc.) into a Map along with the fingerprinted information.
+   * Useful for debugging, logging, and administration tools.
+   * @param maxCols a limit on the number of columns output prior to truncation
+   * e.g:
+   *       {mutations=
+   *             [Delete:{"operation":"Delete","totalColumns":1,"families":{...},"row":"testRow"},
+   *              Put:{"operation":"Put","totalColumns":1,"families":{...},"row":"testRow"}],
+   *        "num-delete"=1, 
+   *        "num-put"=1,
+   *        "row"="testRow"}
+   * @return Map
+   */
+  @Override
+  public Map<String, Object> toMap(int maxCols) {
+    // we start with the fingerprint map and build on top of it.
+    Map<String, Object> map = getFingerprint();
+    // replace the fingerprint's simple list of families with a 
+    // map from column families to lists of qualifiers and kv details
+    List<Map<String, Object>> mutationDetails =
+      new ArrayList<Map<String, Object>>();
+    map.put("row", Bytes.toStringBinary(this.row));
+    map.put("mutations", mutationDetails);
+    // iterate through all the mutations add the map for the mutation
+    int count = 0;
+    for (Mutation mod: this.mutations) {
+     mutationDetails.add(mod.toMap(maxCols));
+     if (++count > maxCols)
+       break;
+    }
+    return map;
+  }
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Wed May  2 20:52:03 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HServerAd
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.MultiPut;
@@ -160,6 +161,8 @@ public class HbaseObjectWritable impleme
     addToMap(Result[].class, code++);
     addToMap(Scan.class, code++);
 
+    addToMap(RowMutations.class, code++);
+
     addToMap(WhileMatchFilter.class, code++);
     addToMap(PrefixFilter.class, code++);
     addToMap(PageFilter.class, code++);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed May  2 20:52:03 2012
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.Restartable;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.MultiPut;
@@ -264,6 +265,12 @@ public interface HRegionInterface extend
   public long openScanner(final byte [] regionName, final Scan scan)
   throws IOException;
 
+  public void mutateRow(byte[] regionName, RowMutations arm)
+      throws IOException;
+
+  public void mutateRow(byte[] regionName, List<RowMutations> armList)
+      throws IOException;
+
   /**
    * Get the next set of values
    * @param scannerId clientId passed to openScanner

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed May  2 20:52:03 2012
@@ -72,11 +72,15 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
@@ -1712,6 +1716,60 @@ public class HRegion implements HeapSize
     }
   }
 
+  /**
+   * Setup a Delete object with correct timestamps.
+   * Caller should the row and region locks.
+   * @param delete
+   * @param now
+   * @throws IOException
+   */
+  private void prepareDeleteTimestamps(Map<byte[], List<KeyValue>> familyMap, byte[] byteNow)
+      throws IOException {
+    for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
+
+      byte[] family = e.getKey();
+      List<KeyValue> kvs = e.getValue();
+      Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+
+      for (KeyValue kv: kvs) {
+        //  Check if time is LATEST, change to time of most recent addition if so
+        //  This is expensive.
+        if (kv.isLatestTimestamp() && kv.isDeleteType()) {
+          byte[] qual = kv.getQualifier();
+          if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
+
+          Integer count = kvCount.get(qual);
+          if (count == null) {
+            kvCount.put(qual, 1);
+          } else {
+            kvCount.put(qual, count + 1);
+          }
+          count = kvCount.get(qual);
+
+          Get get = new Get(kv.getRow());
+          get.setMaxVersions(count);
+          get.addColumn(family, qual);
+
+          List<KeyValue> result = get(get);
+
+          if (result.size() < count) {
+            // Nothing to delete
+            kv.updateLatestStamp(byteNow);
+            continue;
+          }
+          if (result.size() > count) {
+            throw new RuntimeException("Unexpected size: " + result.size());
+          }
+          KeyValue getkv = result.get(count - 1);
+          Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
+              getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+        } else {
+          kv.updateLatestStamp(byteNow);
+        }
+      }
+    }
+  }
+
 
   /**
    * @param familyMap map of family to edits for the given family.
@@ -1729,50 +1787,7 @@ public class HRegion implements HeapSize
     updatesLock.readLock().lock();
 
     try {
-
-      for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
-
-        byte[] family = e.getKey();
-        List<KeyValue> kvs = e.getValue();
-        Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-
-        for (KeyValue kv: kvs) {
-          //  Check if time is LATEST, change to time of most recent addition if so
-          //  This is expensive.
-          if (kv.isLatestTimestamp() && kv.isDeleteType()) {
-            byte[] qual = kv.getQualifier();
-            if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
-
-            Integer count = kvCount.get(qual);
-            if (count == null) {
-              kvCount.put(qual, 1);
-            } else {
-              kvCount.put(qual, count + 1);
-            }
-            count = kvCount.get(qual);
-
-            Get get = new Get(kv.getRow());
-            get.setMaxVersions(count);
-            get.addColumn(family, qual);
-
-            List<KeyValue> result = get(get);
-
-            if (result.size() < count) {
-              // Nothing to delete
-              kv.updateLatestStamp(byteNow);
-              continue;
-            }
-            if (result.size() > count) {
-              throw new RuntimeException("Unexpected size: " + result.size());
-            }
-            KeyValue getkv = result.get(count - 1);
-            Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
-                getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
-          } else {
-            kv.updateLatestStamp(byteNow);
-          }
-        }
-      }
+      prepareDeleteTimestamps(familyMap, byteNow);
 
       if (writeToWAL) {
         // write/sync to WAL should happen before we touch memstore.
@@ -2311,11 +2326,16 @@ public class HRegion implements HeapSize
    * new entries.
    */
   private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap) {
+	  return applyFamilyMapToMemstore(familyMap, null);
+  }
+  
+  private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap,
+		  MultiVersionConsistencyControl.WriteEntry writeEntryToUse) {
     long start = EnvironmentEdgeManager.currentTimeMillis();
     MultiVersionConsistencyControl.WriteEntry w = null;
     long size = 0;
     try {
-      w = mvcc.beginMemstoreInsert();
+	  w = (writeEntryToUse == null)? mvcc.beginMemstoreInsert(): writeEntryToUse;
 
       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
         byte[] family = e.getKey();
@@ -2328,12 +2348,15 @@ public class HRegion implements HeapSize
         }
       }
     } finally {
-      long now = EnvironmentEdgeManager.currentTimeMillis();
-      HRegion.memstoreInsertTime.addAndGet(now - start);
-      start = now;
-      mvcc.completeMemstoreInsert(w);
-      now = EnvironmentEdgeManager.currentTimeMillis();
-      HRegion.mvccWaitTime.addAndGet(now - start);
+      if (writeEntryToUse == null) {
+	      long now = EnvironmentEdgeManager.currentTimeMillis();
+	      HRegion.memstoreInsertTime.addAndGet(now - start);
+	      start = now;
+	      mvcc.completeMemstoreInsert(w);
+	      now = EnvironmentEdgeManager.currentTimeMillis();
+	      HRegion.mvccWaitTime.addAndGet(now - start);
+      }
+      // else the calling function will take care of the mvcc completion and metrics.
     }
 
     return size;
@@ -3655,6 +3678,92 @@ public class HRegion implements HeapSize
     return results;
   }
 
+
+  public void mutateRow(RowMutations rm) throws IOException {
+    boolean flush = false;
+
+    Integer lid = null;
+    
+    splitsAndClosesLock.readLock().lock();
+    try {
+      // 1. run all pre-hooks before the atomic operation
+      // NOT required for 0.89
+
+      // one WALEdit is used for all edits.
+      WALEdit walEdit = new WALEdit();
+
+      // 2. acquire the row lock
+      lid = getLock(null, rm.getRow(), true);
+
+      // 3. acquire the region lock
+      this.updatesLock.readLock().lock();
+
+      // 4. Get a mvcc write number
+      MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
+
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      byte[] byteNow = Bytes.toBytes(now);
+      try {
+        // 5. Check mutations and apply edits to a single WALEdit
+        for (Mutation m : rm.getMutations()) {
+          if (m instanceof Put) {
+            Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
+            checkFamilies(familyMap.keySet());
+            checkTimestamps(familyMap, now);
+            updateKVTimestamps(familyMap.values(), byteNow);
+          } else if (m instanceof Delete) {
+            Delete d = (Delete) m;
+            prepareDelete(d);
+            Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
+            prepareDeleteTimestamps(familyMap, byteNow);
+          } else {
+            throw new DoNotRetryIOException(
+                "Action must be Put or Delete. But was: "
+                    + m.getClass().getName());
+          }
+          if (m.getWriteToWAL()) {
+            addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
+          }
+        }
+
+        // 6. append/sync all edits at once
+        // TODO: Do batching as in doMiniBatchPut
+        this.log.append(regionInfo, this.getTableDesc().getName(), walEdit, now);
+
+        // 7. apply to memstore
+        long addedSize = 0;
+        for (Mutation m : rm.getMutations()) {
+          addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
+        }
+        flush = isFlushSize(this.incMemoryUsage(addedSize));
+      } finally {
+        // 8. roll mvcc forward
+	      long start = now;
+	      now = EnvironmentEdgeManager.currentTimeMillis();
+	      HRegion.memstoreInsertTime.addAndGet(now - start);
+	
+	      mvcc.completeMemstoreInsert(w);
+	      now = EnvironmentEdgeManager.currentTimeMillis();
+	      HRegion.mvccWaitTime.addAndGet(now - start);
+
+        // 9. release region lock
+        this.updatesLock.readLock().unlock();
+      }
+      // 10. run all coprocessor post hooks, after region lock is released
+      // NOT required in 0.89. coprocessors are not supported.
+      
+    } finally {
+      if (lid != null) {
+        // 11. release the row lock
+        releaseRowLock(lid);
+      }
+      if (flush) {
+        // 12. Flush cache if needed. Do it outside update lock.
+        requestFlush();
+      }
+      splitsAndClosesLock.readLock().unlock();
+    }
+  }
   /**
    *
    * @param row
@@ -3979,4 +4088,5 @@ public class HRegion implements HeapSize
        if (bc != null) bc.shutdown();
      }
   }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed May  2 20:52:03 2012
@@ -88,6 +88,10 @@ import org.apache.hadoop.hbase.Stoppable
 import org.apache.hadoop.hbase.UnknownRowLockException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HMsg.Type;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.MultiPut;
@@ -2039,6 +2043,35 @@ public class HRegionServer implements HR
   }
 
   @Override
+  public void mutateRow(byte[] regionName, List<RowMutations> armList)
+      throws IOException {
+    checkOpen();
+    if (regionName == null) {
+      throw new IOException("Invalid arguments to atomicMutation " +
+      "regionName is null");
+    }
+    requestCount.incrementAndGet();
+    try {
+      HRegion region = getRegion(regionName);
+      if (!region.getRegionInfo().isMetaTable()) {
+        this.cacheFlusher.reclaimMemStoreMemory();
+      }
+      for (RowMutations arm: armList) {
+        this.requestCount.incrementAndGet();
+        region.mutateRow(arm);
+      }
+    } catch (Throwable t) {
+      throw convertThrowableToIOE(cleanup(t));
+    }
+  }
+
+  @Override
+  public void mutateRow(byte[] regionName, RowMutations arm)
+      throws IOException {
+	  mutateRow(regionName, Collections.singletonList(arm));
+  }
+
+  @Override
   public boolean exists(byte [] regionName, Get get) throws IOException {
     checkOpen();
     requestCount.incrementAndGet();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Wed May  2 20:52:03 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -71,7 +72,7 @@ public class RemoteHTable implements HTa
   final long sleepTime;
 
   @SuppressWarnings("unchecked")
-  protected String buildRowSpec(final byte[] row, final Map familyMap, 
+  protected String buildRowSpec(final byte[] row, final Map familyMap,
       final long startTime, final long endTime, final int maxVersions) {
     StringBuffer sb = new StringBuffer();
     sb.append('/');
@@ -144,7 +145,7 @@ public class RemoteHTable implements HTa
         byte[][] split = KeyValue.parseColumn(cell.getColumn());
         byte[] column = split[0];
         byte[] qualifier = split.length > 1 ? split[1] : null;
-        kvs.add(new KeyValue(row.getKey(), column, qualifier, 
+        kvs.add(new KeyValue(row.getKey(), column, qualifier,
           cell.getTimestamp(), cell.getValue()));
       }
       results.add(new Result(kvs));
@@ -238,7 +239,7 @@ public class RemoteHTable implements HTa
         TableSchemaModel schema = new TableSchemaModel();
         schema.getObjectFromMessage(response.getBody());
         return schema.getTableDescriptor();
-      case 509: 
+      case 509:
         try {
           Thread.sleep(sleepTime);
         } catch (InterruptedException e) { }
@@ -306,7 +307,7 @@ public class RemoteHTable implements HTa
     sb.append('/');
     if (accessToken != null) {
       sb.append(accessToken);
-      sb.append('/');      
+      sb.append('/');
     }
     sb.append(Bytes.toStringBinary(name));
     sb.append('/');
@@ -364,7 +365,7 @@ public class RemoteHTable implements HTa
     sb.append('/');
     if (accessToken != null) {
       sb.append(accessToken);
-      sb.append('/');      
+      sb.append('/');
     }
     sb.append(Bytes.toStringBinary(name));
     sb.append("/$multiput"); // can be any nonexistent row
@@ -495,7 +496,7 @@ public class RemoteHTable implements HTa
       }
       return results[0];
     }
-    
+
     class Iter implements Iterator<Result> {
 
       Result cache;
@@ -529,7 +530,7 @@ public class RemoteHTable implements HTa
       public void remove() {
         throw new RuntimeException("remove() not supported");
       }
-      
+
     }
 
     @Override
@@ -602,4 +603,15 @@ public class RemoteHTable implements HTa
     throw new IOException("incrementColumnValue not supported");
   }
 
+
+  @Override
+  public void mutateRow(RowMutations arm) throws IOException {
+    throw new IOException("atomicMutation not supported");
+  }
+
+  @Override
+  public void mutateRow(List<RowMutations> armList)
+		throws IOException {
+    throw new IOException("atomicMutation not supported");
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java Wed May  2 20:52:03 2012
@@ -29,11 +29,13 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Ignore;
@@ -77,6 +79,137 @@ public class TestAcidGuarantees {
     conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
     util = new HBaseTestingUtility(conf);
   }
+  /**
+   * Thread that does random full-row writes into a table.
+   */
+  public static class RowMutationsWriter extends RepeatingTestThread {
+    Random rand = new Random();
+    byte data[] = new byte[10];
+    byte targetRows[][];
+    byte targetFamilies[][];
+    HTable table;
+    AtomicLong numWritten = new AtomicLong();
+
+    public RowMutationsWriter(TestContext ctx, byte targetRows[][],
+                           byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.targetRows = targetRows;
+      this.targetFamilies = targetFamilies;
+      table = new HTable(ctx.getConf(), TABLE_NAME);
+    }
+    
+    public void doAnAction() throws Exception {
+      // Pick a random row to write into
+      byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
+      RowMutations mutation = new RowMutations(targetRow);
+
+      long writeNumber = numWritten.incrementAndGet();
+      /*
+       * Do a mutation in which we write a random value in to col-0 and
+       * 100-that value in col 1; and delete the rest of the columns
+       * Should maintain the invariant that the sum of the two columns is
+       * always 100.
+       */
+      for (byte[] family : targetFamilies) {
+	    long value = rand.nextInt();
+        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+          byte qualifier[] = Bytes.toBytes("col" + i);
+          if ((i + writeNumber) % NUM_COLS_TO_CHECK == 0) {
+		      Put p = new Put(targetRow);
+	          p.add(family, qualifier, writeNumber, Bytes.toBytes(value));
+	          mutation.add(p);
+          }
+          else if ((i + writeNumber) % NUM_COLS_TO_CHECK == 1) {
+		      Put p = new Put(targetRow);
+	          p.add(family, qualifier, writeNumber, Bytes.toBytes(100 - value));
+	          mutation.add(p);
+          }
+          else {
+		      Delete d = new Delete(targetRow);
+	          d.deleteColumns(family, qualifier, writeNumber);
+	          mutation.add(d);
+          }
+        }
+      }
+      table.mutateRow(mutation);
+    }
+  }
+
+  /**
+   * Thread that does single-row reads in a table, looking for partially
+   * completed rows.
+   */
+  public static class RowMutationsGetReader extends RepeatingTestThread {
+    byte targetRow[];
+    byte targetFamilies[][];
+    HTable table;
+    int numVerified = 0;
+    AtomicLong numRead = new AtomicLong();
+    // initialized to true if we ever get a non-null result
+    boolean gotResults = false;
+
+    public RowMutationsGetReader(TestContext ctx, byte targetRow[],
+                           byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.targetRow = targetRow;
+      this.targetFamilies = targetFamilies;
+      table = new HTable(ctx.getConf(), TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      Get g = new Get(targetRow);
+      Result res = table.get(g);
+      if (res.getRow() == null) {
+        // Trying to verify but we didn't find the row - the writing
+        // thread probably just hasn't started writing yet, so we can
+        // ignore this action -- only if we have never received values
+    	// before this.
+    	  
+    	if (gotResults)
+    		gotFailure(res);
+    	
+        return;
+      }
+      
+      gotResults = true;
+
+      for (byte[] family : targetFamilies) {
+    	long sum = 0;
+        int countNonNull = 0;
+        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+          byte qualifier[] = Bytes.toBytes("col" + i);
+          byte thisValue[] = res.getValue(family, qualifier);
+          if (thisValue != null) {
+        	  countNonNull++;
+        	  sum += Bytes.toLong(thisValue);
+          }
+          numVerified++;
+        }
+        if (sum != 100 || countNonNull != 2)
+            gotFailure(res);
+      }
+      numRead.getAndIncrement();
+    }
+
+    private void gotFailure(Result res) {
+      StringBuilder msg = new StringBuilder();
+      msg.append("Failed after ").append(numVerified).append("!");
+      msg.append("Expecting 2 KV each for " + targetFamilies.length 
+    		  + " families. Summing up to 100.");
+      msg.append("Got:\n");
+      if (res.list() == null)
+    	  msg.append("None");
+      else {
+        for (KeyValue kv : res.list()) {
+          msg.append(kv.toString());
+          msg.append(" val= (long)");
+          msg.append(Bytes.toLong(kv.getValue()));
+          msg.append("\n");
+        }
+        throw new RuntimeException(msg.toString());
+      }
+    }
+  }
 
   /**
    * Thread that does random full-row writes into a table.
@@ -291,6 +424,63 @@ public class TestAcidGuarantees {
     }
   }
 
+  
+  @Test
+  public void testMutations() throws Exception {
+    util.startMiniCluster(1);
+    try {
+	  long millisToRun = 5000;
+      int numWriters = 5;
+      int numGetters = 5;
+      int numUniqueRows = 1;
+      
+      createTableIfMissing();
+      TestContext ctx = new TestContext(util.getConfiguration());
+
+      byte rows[][] = new byte[numUniqueRows][];
+      for (int i = 0; i < numUniqueRows; i++) {
+        rows[i] = Bytes.toBytes("test_row_" + i);
+      }
+
+      List<RowMutationsWriter> writers = Lists.newArrayList();
+      for (int i = 0; i < numWriters; i++) {
+        RowMutationsWriter writer = new RowMutationsWriter(
+            ctx, rows, FAMILIES);
+        writers.add(writer);
+        ctx.addThread(writer);
+      }
+    
+      // Add a flusher
+      ctx.addThread(new RepeatingTestThread(ctx) {
+        public void doAnAction() throws Exception {
+          util.flush();
+        }
+      });
+
+      List<RowMutationsGetReader> getters = Lists.newArrayList();
+      for (int i = 0; i < numGetters; i++) {
+        RowMutationsGetReader getter = new RowMutationsGetReader(
+            ctx, rows[i % numUniqueRows], FAMILIES);
+        getters.add(getter);
+        ctx.addThread(getter);
+      }
+
+      ctx.startThreads();
+      ctx.waitFor(millisToRun);
+      ctx.stop();
+
+      LOG.info("Finished test. Writers:");
+      for (RowMutationsWriter writer : writers) {
+        LOG.info("  wrote " + writer.numWritten.get());
+      }
+      LOG.info("Readers:");
+      for (RowMutationsGetReader reader : getters) {
+        LOG.info("  read " + reader.numRead.get());
+      }
+    } finally {
+      util.shutdownMiniCluster();
+    }
+  }
   @Test
   public void testGetAtomicity() throws Exception {
     util.startMiniCluster(1);

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Wed May  2 20:52:03 2012
@@ -33,6 +33,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -4290,5 +4291,33 @@ public class TestFromClientSide {
     assertTrue(addrAfter.getPort() != addrCache.getPort());
     assertEquals(addrAfter.getPort(), addrNoCache.getPort());
   }
+
+  @Test
+  public void testAtomicRowMutation() throws Exception {
+    LOG.info("Starting testAtomicRowMutation");
+    final byte [] TABLENAME = Bytes.toBytes("testAtomicRowMutation");
+    HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
+    byte [][] QUALIFIERS = new byte [][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b")
+    };
+    RowMutations arm = new RowMutations(ROW);
+//    arm.add(new Delete(ROW));
+    Put p = new Put(ROW);
+    p.add(FAMILY, QUALIFIERS[0], VALUE);
+    arm.add(p);
+    t.mutateRow(arm);
+
+    Get g = new Get(ROW);
+    Result r = t.get(g);
+    // delete was first, row should exist
+    assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
+
+    arm = new RowMutations(ROW);
+    arm.add(p);
+    arm.add(new Delete(ROW));
+    t.mutateRow(Arrays.asList((RowMutations)arm));
+    r = t.get(g);
+    assertTrue(r.isEmpty());
+  }
 }
 

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java?rev=1333196&r1=1333195&r2=1333196&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestOperation.java Wed May  2 20:52:03 2012
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -363,6 +364,25 @@ public class TestOperation {
     kvMap = (Map) familyInfo.get(0);
     assertEquals("Qualifier incorrect in Delete.toJSON()",
         Bytes.toStringBinary(QUALIFIER), kvMap.get("qualifier"));
+    
+    // produce a RowMutations operation
+    RowMutations rmut = new RowMutations(ROW);
+    rmut.add(delete);
+    rmut.add(put);
+    // get its JSON representation, and parse it
+    json = rmut.toJSON();
+    parsedJSON = mapper.readValue(json, HashMap.class);
+    // check for the row
+    assertEquals("row absent in RowMutations.toJSON()",
+        Bytes.toStringBinary(ROW), parsedJSON.get("row"));
+    // check for the family and the qualifier.
+    List<Map<String, Object>> mutationDetails = ((ArrayList)parsedJSON.get("mutations"));
+    assertEquals("Expecting to find one delete and one put",
+        2, mutationDetails.size());
+    assertEquals("Should find one Put",
+        1, parsedJSON.get("num-put"));
+    assertEquals("Should find one Delete",
+        1, parsedJSON.get("num-delete"));
   }
 
   /**