You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2011/10/12 23:42:21 UTC

svn commit: r1182588 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/coprocessor/ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop...

Author: larsh
Date: Wed Oct 12 21:42:20 2011
New Revision: 1182588

URL: http://svn.apache.org/viewvc?rev=1182588&view=rev
Log:
HBASE-4102 atomicAppend: A put that appends to the latest version of a cell

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Append.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
Removed:
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Put.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Oct 12 21:42:20 2011
@@ -14,6 +14,7 @@ Release 0.93.0 - Unreleased
    HBASE-4465  Lazy-seek optimization for StoreFile scanners (mikhail/liyin)
    HBASE-4422  Move block cache parameters and references into single
                CacheConf class (jgray)
+   HBASE-4102  atomicAppend: A put that appends to the latest version of a cell (Lars H)
 
   BUG FIXES
    HBASE-4488  Store could miss rows during flush (Lars H via jgray)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java Wed Oct 12 21:42:20 2011
@@ -406,6 +406,82 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Constructs an empty KeyValue structure, with specified sizes.
+   * This can be used to partially fill up KeyValues.
+   * <p>
+   * Column is split into two fields, family and qualifier.
+   * @param rlength row length
+   * @param flength family length
+   * @param qlength qualifier length
+   * @param timestamp version timestamp
+   * @param type key type
+   * @param vlength value length
+   * @throws IllegalArgumentException
+   */
+  public KeyValue(final int rlength,
+      final int flength,
+      final int qlength,
+      final long timestamp, final Type type,
+      final int vlength) {
+    this.bytes = createEmptyByteArray(rlength,
+        flength, qlength,
+        timestamp, type, vlength);
+    this.length = bytes.length;
+    this.offset = 0;
+  }
+
+  /**
+   * Create an empty byte[] representing a KeyValue
+   * All lengths are preset and can be filled in later.
+   * @param rlength
+   * @param flength
+   * @param qlength
+   * @param timestamp
+   * @param type
+   * @param vlength
+   * @return The newly created byte array.
+   */
+  static byte[] createEmptyByteArray(final int rlength, int flength,
+      int qlength, final long timestamp, final Type type, int vlength) {
+    if (rlength > Short.MAX_VALUE) {
+      throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
+    }
+    if (flength > Byte.MAX_VALUE) {
+      throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
+    }
+    // Qualifier length
+    if (qlength > Integer.MAX_VALUE - rlength - flength) {
+      throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
+    }
+    // Key length
+    long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
+    if (longkeylength > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException("keylength " + longkeylength + " > " +
+        Integer.MAX_VALUE);
+    }
+    int keylength = (int)longkeylength;
+    // Value length
+    if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
+      throw new IllegalArgumentException("Valuer > " +
+          HConstants.MAXIMUM_VALUE_LENGTH);
+    }
+
+    // Allocate right-sized byte array.
+    byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength];
+    // Write the correct size markers
+    int pos = 0;
+    pos = Bytes.putInt(bytes, pos, keylength);
+    pos = Bytes.putInt(bytes, pos, vlength);
+    pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
+    pos += rlength;
+    pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
+    pos += flength + qlength;
+    pos = Bytes.putLong(bytes, pos, timestamp);
+    pos = Bytes.putByte(bytes, pos, type.getCode());
+    return bytes;
+  }
+
+  /**
    * Write KeyValue format into a byte array.
    *
    * @param row row key

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Append.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Append.java?rev=1182588&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Append.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Append.java Wed Oct 12 21:42:20 2011
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Performs Append operations on a single row.
+ * <p>
+ * Note that this operation does not appear atomic to readers. Appends are done
+ * under a single row lock, so write operations to a row are synchronized, but
+ * readers do not take row locks so get and scan operations can see this
+ * operation partially completed.
+ * <p>
+ * To append to a set of columns of a row, instantiate an Append object with the
+ * row to append to. At least one column to append must be specified using the
+ * {@link #add(byte[], byte[], long)} method.
+ */
+public class Append extends Mutation implements Writable {
+  // TODO: refactor to derive from Put?
+  private static final String RETURN_RESULTS = "_rr_";
+  private static final byte APPEND_VERSION = (byte)1;
+
+  /**
+   * @param returnResults
+   *          True (default) if the append operation should return the results.
+   *          A client that is not interested in the result can save network
+   *          bandwidth setting this to false.
+   */
+  public void setReturnResults(boolean returnResults) {
+    setAttribute(RETURN_RESULTS, Bytes.toBytes(returnResults));
+  }
+
+  /**
+   * @return current setting for returnResults
+   */
+  public boolean isReturnResults() {
+    byte[] v = getAttribute(RETURN_RESULTS);
+    return v == null ? true : Bytes.toBoolean(v);
+  }
+
+  /** Constructor for Writable.  DO NOT USE */
+  public Append() {}
+
+  /**
+   * Create a Append operation for the specified row.
+   * <p>
+   * At least one column must be appended to.
+   * @param row row key
+   */
+  public Append(byte[] row) {
+    this.row = Arrays.copyOf(row, row.length);
+  }
+
+  /**
+   * Add the specified column and value to this Append operation.
+   * @param family family name
+   * @param qualifier column qualifier
+   * @param value value to append to specified column
+   * @return this
+   */
+  public Append add(byte [] family, byte [] qualifier, byte [] value) {
+    List<KeyValue> list = familyMap.get(family);
+    if(list == null) {
+      list = new ArrayList<KeyValue>();
+    }
+    list.add(new KeyValue(
+        this.row, family, qualifier, this.ts, KeyValue.Type.Put, value));
+    familyMap.put(family, list);
+    return this;
+  }
+
+  @Override
+  public void readFields(final DataInput in)
+  throws IOException {
+    int version = in.readByte();
+    if (version > APPEND_VERSION) {
+      throw new IOException("version not supported: "+version);
+    }
+    this.row = Bytes.readByteArray(in);
+    this.ts = in.readLong();
+    this.lockId = in.readLong();
+    this.writeToWAL = in.readBoolean();
+    int numFamilies = in.readInt();
+    if (!this.familyMap.isEmpty()) this.familyMap.clear();
+    for(int i=0;i<numFamilies;i++) {
+      byte [] family = Bytes.readByteArray(in);
+      int numKeys = in.readInt();
+      List<KeyValue> keys = new ArrayList<KeyValue>(numKeys);
+      int totalLen = in.readInt();
+      byte [] buf = new byte[totalLen];
+      int offset = 0;
+      for (int j = 0; j < numKeys; j++) {
+        int keyLength = in.readInt();
+        in.readFully(buf, offset, keyLength);
+        keys.add(new KeyValue(buf, offset, keyLength));
+        offset += keyLength;
+      }
+      this.familyMap.put(family, keys);
+    }
+    readAttributes(in);
+  }
+
+  @Override
+  public void write(final DataOutput out)
+  throws IOException {
+    out.writeByte(APPEND_VERSION);
+    Bytes.writeByteArray(out, this.row);
+    out.writeLong(this.ts);
+    out.writeLong(this.lockId);
+    out.writeBoolean(this.writeToWAL);
+    out.writeInt(familyMap.size());
+    for (Map.Entry<byte [], List<KeyValue>> entry : familyMap.entrySet()) {
+      Bytes.writeByteArray(out, entry.getKey());
+      List<KeyValue> keys = entry.getValue();
+      out.writeInt(keys.size());
+      int totalLen = 0;
+      for(KeyValue kv : keys) {
+        totalLen += kv.getLength();
+      }
+      out.writeInt(totalLen);
+      for(KeyValue kv : keys) {
+        out.writeInt(kv.getLength());
+        out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
+      }
+    }
+    writeAttributes(out);
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Oct 12 21:42:20 2011
@@ -740,6 +740,25 @@ public class HTable implements HTableInt
    * {@inheritDoc}
    */
   @Override
+  public Result append(final Append append) throws IOException {
+    if (append.numFamilies() == 0) {
+      throw new IOException(
+          "Invalid arguments to append, no columns specified");
+    }
+    return connection.getRegionServerWithRetries(
+        new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
+          public Result call() throws IOException {
+            return server.append(
+                location.getRegionInfo().getRegionName(), append);
+          }
+        }
+    );    
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public Result increment(final Increment increment) throws IOException {
     if (!increment.hasFamilies()) {
       throw new IOException(

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed Oct 12 21:42:20 2011
@@ -262,6 +262,21 @@ public interface HTableInterface {
       byte[] value, Delete delete) throws IOException;
 
   /**
+   * Appends values to one or more columns within a single row.
+   * <p>
+   * This operation does not appear atomic to readers.  Appends are done
+   * under a single row lock, so write operations to a row are synchronized, but
+   * readers do not take row locks so get and scan operations can see this
+   * operation partially completed.
+   *
+   * @param append object that specifies the columns and amounts to be used
+   *                  for the increment operations
+   * @throws IOException e
+   * @return values of columns after the append operation (maybe null)
+   */
+  public Result append(final Append append) throws IOException;
+
+  /**
    * Increments one or more columns within a single row.
    * <p>
    * This operation does not appear atomic to readers.  Increments are done

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Oct 12 21:42:20 2011
@@ -203,4 +203,22 @@ public abstract class Mutation extends O
     }
     return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG));
   }
+
+  /**
+   * @return the total number of KeyValues
+   */
+  public int size() {
+    int size = 0;
+    for(List<KeyValue> kvList : this.familyMap.values()) {
+      size += kvList.size();
+    }
+    return size;
+  }
+
+  /**
+   * @return the number of different families
+   */
+  public int numFamilies() {
+    return familyMap.size();
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Put.java Wed Oct 12 21:42:20 2011
@@ -318,24 +318,6 @@ public class Put extends Mutation
     return list;
   }
 
-  /**
-   * @return the number of different families included in this put
-   */
-  public int numFamilies() {
-    return familyMap.size();
-  }
-
-  /**
-   * @return the total number of KeyValues that will be added with this put
-   */
-  public int size() {
-    int size = 0;
-    for(List<KeyValue> kvList : this.familyMap.values()) {
-      size += kvList.size();
-    }
-    return size;
-  }
-
   //HeapSize
   public long heapSize() {
     long heapsize = OVERHEAD;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Wed Oct 12 21:42:20 2011
@@ -23,6 +23,7 @@ import com.google.common.collect.Immutab
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
@@ -185,6 +186,16 @@ public abstract class BaseRegionObserver
   }
 
   @Override
+  public void preAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
+      final Append append, final Result result) throws IOException {
+  }
+
+  @Override
+  public void postAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
+      final Append append, final Result result) throws IOException {
+  }
+
+  @Override
   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> e,
       final byte [] row, final byte [] family, final byte [] qualifier,
       final long amount, final boolean writeToWAL) throws IOException {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Wed Oct 12 21:42:20 2011
@@ -394,6 +394,11 @@ public abstract class CoprocessorHost<E 
       }
 
       @Override
+      public Result append(Append append) throws IOException {
+        return table.append(append);
+      }
+
+      @Override
       public Result increment(Increment increment) throws IOException {
         return table.increment(increment);
       }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Wed Oct 12 21:42:20 2011
@@ -23,6 +23,7 @@ import com.google.common.collect.Immutab
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -458,6 +459,38 @@ public interface RegionObserver extends 
     throws IOException;
 
   /**
+   * Called before Append
+   * <p>
+   * Call CoprocessorEnvironment#bypass to skip default actions
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
+   * @param c the environment provided by the region server
+   * @param append Append object
+   * @param result The result to return to the client if default processing
+   * is bypassed. Can be modified. Will not be used if default processing
+   * is not bypassed.
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  void preAppend(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Append append, final Result result)
+    throws IOException;
+
+  /**
+   * Called after Append
+   * <p>
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained
+   * coprocessors
+   * @param c the environment provided by the region server
+   * @param append Append object
+   * @param result the result returned by increment, can be modified
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  void postAppend(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Append append, final Result result)
+    throws IOException;
+
+  /**
    * Called before Increment
    * <p>
    * Call CoprocessorEnvironment#bypass to skip default actions

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Wed Oct 12 21:42:20 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HServerIn
 import org.apache.hadoop.hbase.HServerLoad;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
@@ -237,7 +238,9 @@ public class HbaseObjectWritable impleme
     addToMap(HServerLoad.class, code++);
     
     addToMap(RegionOpeningState.class, code++);
-    
+
+    addToMap(Append.class, code++);
+
   }
 
   private Class<?> declaredClass;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Oct 12 21:42:20 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
@@ -211,6 +212,19 @@ public interface HRegionInterface extend
   throws IOException;
 
   /**
+   * Appends values to one or more columns values in a row. Optionally
+   * Returns the updated keys after the append.
+   * <p>
+   * This operation does not appear atomic to readers. Appends are done
+   * under a row lock but readers do not take row locks.
+   * @param regionName region name
+   * @param append Append operation
+   * @return changed cells (maybe null)
+   */
+  public Result append(byte[] regionName, Append append)
+  throws IOException;
+
+  /**
    * Increments one or more columns values in a row.  Returns the
    * updated keys after the increment.
    * <p>

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Oct 12 21:42:20 2011
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
@@ -3503,6 +3504,149 @@ public class HRegion implements HeapSize
     return results;
   }
 
+  // TODO: There's a lot of boiler plate code identical
+  // to increment... See how to better unify that.
+  /**
+   * 
+   * Perform one or more append operations on a row.
+   * <p>
+   * Appends performed are done under row lock but reads do not take locks out
+   * so this can be seen partially complete by gets and scans.
+   * 
+   * @param append
+   * @param lockid
+   * @param returnResult
+   * @param writeToWAL
+   * @return new keyvalues after increment
+   * @throws IOException
+   */
+  public Result append(Append append, Integer lockid, boolean writeToWAL)
+      throws IOException {
+    // TODO: Use RWCC to make this set of appends atomic to reads
+    byte[] row = append.getRow();
+    checkRow(row, "append");
+    boolean flush = false;
+    WALEdit walEdits = null;
+    List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
+    List<KeyValue> kvs = new ArrayList<KeyValue>(append.size());
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    long size = 0;
+    long txid = 0;
+
+    // Lock row
+    startRegionOperation();
+    this.writeRequestsCount.increment();
+    try {
+      Integer lid = getLock(lockid, row, true);
+      this.updatesLock.readLock().lock();
+      try {
+        // Process each family
+        for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap()
+            .entrySet()) {
+
+          Store store = stores.get(family.getKey());
+
+          // Get previous values for all columns in this family
+          Get get = new Get(row);
+          for (KeyValue kv : family.getValue()) {
+            get.addColumn(family.getKey(), kv.getQualifier());
+          }
+          List<KeyValue> results = get(get, false);
+
+          // Iterate the input columns and update existing values if they were
+          // found, otherwise add new column initialized to the append value
+
+          // Avoid as much copying as possible. Every byte is copied at most
+          // once.
+          // Would be nice if KeyValue had scatter/gather logic
+          int idx = 0;
+          for (KeyValue kv : family.getValue()) {
+            KeyValue newKV;
+            if (idx < results.size()
+                && results.get(idx).matchingQualifier(kv.getBuffer(),
+                    kv.getQualifierOffset(), kv.getQualifierLength())) {
+              KeyValue oldKv = results.get(idx);
+              // allocate an empty kv once
+              newKV = new KeyValue(row.length, kv.getFamilyLength(),
+                  kv.getQualifierLength(), now, KeyValue.Type.Put,
+                  oldKv.getValueLength() + kv.getValueLength());
+              // copy in the value
+              System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
+                  newKV.getBuffer(), newKV.getValueOffset(),
+                  oldKv.getValueLength());
+              System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
+                  newKV.getBuffer(),
+                  newKV.getValueOffset() + oldKv.getValueLength(),
+                  kv.getValueLength());
+              idx++;
+            } else {
+              // allocate an empty kv once
+              newKV = new KeyValue(row.length, kv.getFamilyLength(),
+                  kv.getQualifierLength(), now, KeyValue.Type.Put,
+                  kv.getValueLength());
+              // copy in the value
+              System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
+                  newKV.getBuffer(), newKV.getValueOffset(),
+                  kv.getValueLength());
+            }
+            // copy in row, family, and qualifier
+            System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
+                newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
+            System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
+                newKV.getBuffer(), newKV.getFamilyOffset(),
+                kv.getFamilyLength());
+            System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
+                newKV.getBuffer(), newKV.getQualifierOffset(),
+                kv.getQualifierLength());
+
+            kvs.add(newKV);
+
+            // Append update to WAL
+            if (writeToWAL) {
+              if (walEdits == null) {
+                walEdits = new WALEdit();
+              }
+              walEdits.add(newKV);
+            }
+          }
+
+          // Write the KVs for this family into the store
+          size += store.upsert(kvs);
+          allKVs.addAll(kvs);
+          kvs.clear();
+        }
+
+        // Actually write to WAL now
+        if (writeToWAL) {
+          // Using default cluster id, as this can only happen in the orginating
+          // cluster. A slave cluster receives the final value (not the delta)
+          // as a Put.
+          txid = this.log.appendNoSync(regionInfo,
+              this.htableDescriptor.getName(), walEdits,
+              HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
+        }
+
+        size = this.addAndGetGlobalMemstoreSize(size);
+        flush = isFlushSize(size);
+      } finally {
+        this.updatesLock.readLock().unlock();
+        releaseRowLock(lid);
+      }
+      if (writeToWAL) {
+        this.log.sync(txid); // sync the transaction log outside the rowlock
+      }
+    } finally {
+      closeRegionOperation();
+    }
+
+    if (flush) {
+      // Request a cache flush. Do it outside update lock.
+      requestFlush();
+    }
+
+    return append.isReturnResults() ? new Result(allKVs) : null;
+  }
+
   /**
    *
    * Perform one or more increment operations on a row.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Oct 12 21:42:20 2011
@@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.catalog.C
 import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.catalog.RootLocationEditor;
 import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
@@ -124,7 +125,6 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CompressionTest;
@@ -2833,6 +2833,37 @@ public class HRegionServer implements HR
   }
 
   @Override
+  public Result append(byte[] regionName, Append append)
+  throws IOException {
+    checkOpen();
+    if (regionName == null) {
+      throw new IOException("Invalid arguments to increment " +
+      "regionName is null");
+    }
+    requestCount.incrementAndGet();
+    try {
+      HRegion region = getRegion(regionName);
+      Integer lock = getLockFromId(append.getLockId());
+      Append appVal = append;
+      Result resVal;
+      if (region.getCoprocessorHost() != null) {
+        resVal = region.getCoprocessorHost().preAppend(appVal);
+        if (resVal != null) {
+          return resVal;
+        }
+      }
+      resVal = region.append(appVal, lock, append.getWriteToWAL());
+      if (region.getCoprocessorHost() != null) {
+        region.getCoprocessorHost().postAppend(appVal, resVal);
+      }
+      return resVal;
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  @Override
   public Result increment(byte[] regionName, Increment increment)
   throws IOException {
     checkOpen();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Wed Oct 12 21:42:20 2011
@@ -947,6 +947,34 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * @param append append object
+   * @return result to return to client if default operation should be
+   * bypassed, null otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public Result preAppend(Append append)
+      throws IOException {
+    boolean bypass = false;
+    Result result = new Result();
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env: coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          ((RegionObserver)env.getInstance()).preAppend(ctx, append, result);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        bypass |= ctx.shouldBypass();
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+    return bypass ? result : null;
+  }
+
+  /**
    * @param increment increment object
    * @return result to return to client if default operation should be
    * bypassed, null otherwise
@@ -975,6 +1003,29 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * @param append Append object
+   * @param result the result returned by postAppend
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public void postAppend(final Append append, Result result)
+      throws IOException {
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env: coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          ((RegionObserver)env.getInstance()).postAppend(ctx, append, result);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+  }
+
+  /**
    * @param increment increment object
    * @param result the result returned by postIncrement
    * @throws IOException if an error occurred on the coprocessor

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Wed Oct 12 21:42:20 2011
@@ -596,6 +596,10 @@ public class RemoteHTable implements HTa
     throw new IOException("Increment not supported");
   }
 
+  public Result append(Append append) throws IOException {
+    throw new IOException("Append not supported");
+  }
+
   public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
       long amount) throws IOException {
     throw new IOException("incrementColumnValue not supported");

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1182588&r1=1182587&r2=1182588&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Wed Oct 12 21:42:20 2011
@@ -3913,6 +3913,29 @@ public class TestFromClientSide {
     assertTrue(scan.getFamilyMap().containsKey(FAMILY));
   }
 
+  @Test
+  public void testAppend() throws Exception {
+    LOG.info("Starting testAppend");
+    final byte [] TABLENAME = Bytes.toBytes("testAppend");
+    HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
+    byte[] v1 = Bytes.toBytes("42");
+    byte[] v2 = Bytes.toBytes("23");
+    byte [][] QUALIFIERS = new byte [][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b")
+    };
+    Append a = new Append(ROW);
+    a.add(FAMILY, QUALIFIERS[0], v1);
+    a.add(FAMILY, QUALIFIERS[1], v2);
+    a.setReturnResults(false);
+    assertNullResult(t.append(a));
+
+    a = new Append(ROW);
+    a.add(FAMILY, QUALIFIERS[0], v2);
+    a.add(FAMILY, QUALIFIERS[1], v1);
+    Result r = t.append(a);
+    assertEquals(0, Bytes.compareTo(Bytes.add(v1,v2), r.getValue(FAMILY, QUALIFIERS[0])));
+    assertEquals(0, Bytes.compareTo(Bytes.add(v2,v1), r.getValue(FAMILY, QUALIFIERS[1])));
+  }
  
   @Test
   public void testIncrement() throws Exception {

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1182588&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java Wed Oct 12 21:42:20 2011
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+
+
+/**
+ * Testing of HRegion.incrementColumnValue, HRegion.increment,
+ * and HRegion.append
+ */
+public class TestAtomicOperation extends HBaseTestCase {
+  static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
+
+  HRegion region = null;
+  private final String DIR = HBaseTestingUtility.getTestDir() +
+    "/TestIncrement/";
+
+  private final int MAX_VERSIONS = 2;
+
+  // Test names
+  static final byte[] tableName = Bytes.toBytes("testtable");;
+  static final byte[] qual1 = Bytes.toBytes("qual1");
+  static final byte[] qual2 = Bytes.toBytes("qual2");
+  static final byte[] qual3 = Bytes.toBytes("qual3");
+  static final byte[] value1 = Bytes.toBytes("value1");
+  static final byte[] value2 = Bytes.toBytes("value2");
+  static final byte [] row = Bytes.toBytes("rowA");
+  static final byte [] row2 = Bytes.toBytes("rowB");
+
+  /**
+   * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
+   */
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    EnvironmentEdgeManagerTestHelper.reset();
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // New tests that doesn't spin up a mini cluster but rather just test the
+  // individual code pieces in the HRegion. 
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Test basic append operation.
+   * More tests in
+   * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
+   */
+  public void testAppend() throws IOException {
+    initHRegion(tableName, getName(), fam1);
+    String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
+    " The Universe, and Everything";
+    String v2 = " is... 42.";
+    Append a = new Append(row);
+    a.setReturnResults(false);
+    a.add(fam1, qual1, Bytes.toBytes(v1));
+    a.add(fam1, qual2, Bytes.toBytes(v2));
+    assertNull(region.append(a, null, true));
+    a = new Append(row);
+    a.add(fam1, qual1, Bytes.toBytes(v2));
+    a.add(fam1, qual2, Bytes.toBytes(v1));
+    Result result = region.append(a, null, true);
+    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
+    assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
+  }
+
+  /**
+   * Test one increment command.
+   */
+  public void testIncrementColumnValue() throws IOException {
+    LOG.info("Starting test testIncrementColumnValue");
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 1L;
+    long amount = 3L;
+
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    region.put(put);
+
+    long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
+
+    assertEquals(value+amount, result);
+
+    Store store = region.getStore(fam1);
+    // ICV removes any extra values floating around in there.
+    assertEquals(1, store.memstore.kvset.size());
+    assertTrue(store.memstore.snapshot.isEmpty());
+
+    assertICV(row, fam1, qual1, value+amount);
+  }
+
+  /**
+   * Test multi-threaded increments.
+   */
+  public void testIncrementMultiThreads() throws IOException {
+
+    LOG.info("Starting test testIncrementMultiThreads");
+    initHRegion(tableName, getName(), fam1);
+
+    // create 100 threads, each will increment by its own quantity
+    int numThreads = 100;
+    int incrementsPerThread = 1000;
+    Incrementer[] all = new Incrementer[numThreads];
+    int expectedTotal = 0;
+
+    // create all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i] = new Incrementer(region, i, i, incrementsPerThread);
+      expectedTotal += (i * incrementsPerThread);
+    }
+
+    // run all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i].start();
+    }
+
+    // wait for all threads to finish
+    for (int i = 0; i < numThreads; i++) {
+      try {
+        all[i].join();
+      } catch (InterruptedException e) {
+      }
+    }
+    assertICV(row, fam1, qual1, expectedTotal);
+    LOG.info("testIncrementMultiThreads successfully verified that total is " +
+             expectedTotal);
+  }
+
+
+  private void assertICV(byte [] row,
+                         byte [] familiy,
+                         byte[] qualifier,
+                         long amount) throws IOException {
+    // run a get and see?
+    Get get = new Get(row);
+    get.addColumn(familiy, qualifier);
+    Result result = region.get(get, null);
+    assertEquals(1, result.size());
+
+    KeyValue kv = result.raw()[0];
+    long r = Bytes.toLong(kv.getValue());
+    assertEquals(amount, r);
+  }
+
+  private void initHRegion (byte [] tableName, String callingMethod,
+    byte[] ... families)
+  throws IOException {
+    initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families);
+  }
+
+  private void initHRegion (byte [] tableName, String callingMethod,
+    Configuration conf, byte [] ... families)
+  throws IOException{
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    for(byte [] family : families) {
+      htd.addFamily(new HColumnDescriptor(family));
+    }
+    HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
+    Path path = new Path(DIR + callingMethod);
+    if (fs.exists(path)) {
+      if (!fs.delete(path, true)) {
+        throw new IOException("Failed delete of " + path);
+      }
+    }
+    region = HRegion.createHRegion(info, path, conf, htd);
+  }
+
+  /**
+   * A thread that makes a few increment calls
+   */
+  public static class Incrementer extends Thread {
+
+    private final HRegion region;
+    private final int threadNumber;
+    private final int numIncrements;
+    private final int amount;
+
+    private int count;
+
+    public Incrementer(HRegion region, 
+        int threadNumber, int amount, int numIncrements) {
+      this.region = region;
+      this.threadNumber = threadNumber;
+      this.numIncrements = numIncrements;
+      this.count = 0;
+      this.amount = amount;
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      for (int i=0; i<numIncrements; i++) {
+        try {
+          long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
+          // LOG.info("thread:" + threadNumber + " iter:" + i);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+        count++;
+      }
+    }
+  }
+
+
+}