You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/10/26 20:50:14 UTC

svn commit: r1027681 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/h...

Author: jgray
Date: Tue Oct 26 18:50:13 2010
New Revision: 1027681

URL: http://svn.apache.org/viewvc?rev=1027681&view=rev
Log:
HBASE-2946  Increment multiple columns in a row at once

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Oct 26 18:50:13 2010
@@ -1096,6 +1096,8 @@ Release 0.21.0 - Unreleased
    HBASE-3053  Add ability to have multiple Masters LocalHBaseCluster for
                test writing
    HBASE-2201  JRuby shell for replication
+   HBASE-2946  Increment multiple columns in a row at once
+
 
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Oct 26 18:50:13 2010
@@ -650,6 +650,22 @@ public class HTable implements HTableInt
   }
 
   @Override
+  public Result increment(final Increment increment) throws IOException {
+    if (!increment.hasFamilies()) {
+      throw new IOException(
+          "Invalid arguments to increment, no columns specified");
+    }
+    return connection.getRegionServerWithRetries(
+        new ServerCallable<Result>(connection, tableName, increment.getRow()) {
+          public Result call() throws IOException {
+            return server.increment(
+                location.getRegionInfo().getRegionName(), increment);
+          }
+        }
+    );
+  }
+
+  @Override
   public long incrementColumnValue(final byte [] row, final byte [] family,
       final byte [] qualifier, final long amount)
   throws IOException {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Tue Oct 26 18:50:13 2010
@@ -19,13 +19,13 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 
-import java.io.IOException;
-import java.util.List;
-
 /**
  * Used to communicate with a single HBase table.
  *
@@ -231,7 +231,7 @@ public interface HTableInterface {
 
   /**
    * Atomically checks if a row/family/qualifier value matches the expected
-   * value. If it does, it adds the delete.  If the passed value is null, the 
+   * value. If it does, it adds the delete.  If the passed value is null, the
    * check is for the lack of column (ie: non-existance)
    *
    * @param row to check
@@ -246,6 +246,21 @@ public interface HTableInterface {
       byte[] value, Delete delete) throws IOException;
 
   /**
+   * Increments one or more columns within a single row.
+   * <p>
+   * This operation does not appear atomic to readers.  Increments are done
+   * under a single row lock, so write operations to a row are synchronized, but
+   * readers do not take row locks so get and scan operations can see this
+   * operation partially completed.
+   *
+   * @param increment object that specifies the columns and amounts to be used
+   *                  for the increment operations
+   * @throws IOException e
+   * @return values of columns after the increment
+   */
+  public Result increment(final Increment increment) throws IOException;
+
+  /**
    * Atomically increments a column value.
    * <p>
    * Equivalent to {@code {@link #incrementColumnValue(byte[], byte[], byte[],

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java?rev=1027681&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java Tue Oct 26 18:50:13 2010
@@ -0,0 +1,294 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Used to perform Increment operations on a single row.
+ * <p>
+ * This operation does not appear atomic to readers.  Increments are done
+ * under a single row lock, so write operations to a row are synchronized, but
+ * readers do not take row locks so get and scan operations can see this
+ * operation partially completed.
+ * <p>
+ * To increment columns of a row, instantiate an Increment object with the row
+ * to increment.  At least one column to increment must be specified using the
+ * {@link #addColumn(byte[], byte[], long)} method.
+ */
+public class Increment implements Writable {
+  private static final byte INCREMENT_VERSION = (byte)1;
+
+  private byte [] row = null;
+  private long lockId = -1L;
+  private boolean writeToWAL = true;
+  private Map<byte [], NavigableMap<byte [], Long>> familyMap =
+    new TreeMap<byte [], NavigableMap<byte [], Long>>(Bytes.BYTES_COMPARATOR);
+
+  /** Constructor for Writable.  DO NOT USE */
+  public Increment() {}
+
+  /**
+   * Create a Increment operation for the specified row.
+   * <p>
+   * At least one column must be incremented.
+   * @param row row key
+   */
+  public Increment(byte [] row) {
+    this(row, null);
+  }
+
+  /**
+   * Create a Increment operation for the specified row, using an existing row
+   * lock.
+   * <p>
+   * At least one column must be incremented.
+   * @param row row key
+   * @param rowLock previously acquired row lock, or null
+   */
+  public Increment(byte [] row, RowLock rowLock) {
+    this.row = row;
+    if(rowLock != null) {
+      this.lockId = rowLock.getLockId();
+    }
+  }
+
+  /**
+   * Increment the column from the specific family with the specified qualifier
+   * by the specified amount.
+   * <p>
+   * Overrides previous calls to addColumn for this family and qualifier.
+   * @param family family name
+   * @param qualifier column qualifier
+   * @param amount amount to increment by
+   * @return the Increment object
+   */
+  public Increment addColumn(byte [] family, byte [] qualifier, long amount) {
+    NavigableMap<byte [], Long> set = familyMap.get(family);
+    if(set == null) {
+      set = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+    }
+    set.put(qualifier, amount);
+    familyMap.put(family, set);
+    return this;
+  }
+
+  /* Accessors */
+
+  /**
+   * Method for retrieving the increment's row
+   * @return row
+   */
+  public byte [] getRow() {
+    return this.row;
+  }
+
+  /**
+   * Method for retrieving the increment's RowLock
+   * @return RowLock
+   */
+  public RowLock getRowLock() {
+    return new RowLock(this.row, this.lockId);
+  }
+
+  /**
+   * Method for retrieving the increment's lockId
+   * @return lockId
+   */
+  public long getLockId() {
+    return this.lockId;
+  }
+
+  /**
+   * Method for retrieving whether WAL will be written to or not
+   * @return true if WAL should be used, false if not
+   */
+  public boolean getWriteToWAL() {
+    return this.writeToWAL;
+  }
+
+  /**
+   * Sets whether this operation should write to the WAL or not.
+   * @param writeToWAL true if WAL should be used, false if not
+   * @return this increment operation
+   */
+  public Increment setWriteToWAL(boolean writeToWAL) {
+    this.writeToWAL = writeToWAL;
+    return this;
+  }
+
+  /**
+   * Method for retrieving the keys in the familyMap
+   * @return keys in the current familyMap
+   */
+  public Set<byte[]> familySet() {
+    return this.familyMap.keySet();
+  }
+
+  /**
+   * Method for retrieving the number of families to increment from
+   * @return number of families
+   */
+  public int numFamilies() {
+    return this.familyMap.size();
+  }
+
+  /**
+   * Method for retrieving the number of columns to increment
+   * @return number of columns across all families
+   */
+  public int numColumns() {
+    if (!hasFamilies()) return 0;
+    int num = 0;
+    for (NavigableMap<byte [], Long> family : familyMap.values()) {
+      num += family.size();
+    }
+    return num;
+  }
+
+  /**
+   * Method for checking if any families have been inserted into this Increment
+   * @return true if familyMap is non empty false otherwise
+   */
+  public boolean hasFamilies() {
+    return !this.familyMap.isEmpty();
+  }
+
+  /**
+   * Method for retrieving the increment's familyMap
+   * @return familyMap
+   */
+  public Map<byte[],NavigableMap<byte[], Long>> getFamilyMap() {
+    return this.familyMap;
+  }
+
+  /**
+   * @return String
+   */
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("row=");
+    sb.append(Bytes.toString(this.row));
+    if(this.familyMap.size() == 0) {
+      sb.append(", no columns set to be incremented");
+      return sb.toString();
+    }
+    sb.append(", families=");
+    boolean moreThanOne = false;
+    for(Map.Entry<byte [], NavigableMap<byte[], Long>> entry :
+      this.familyMap.entrySet()) {
+      if(moreThanOne) {
+        sb.append("), ");
+      } else {
+        moreThanOne = true;
+        sb.append("{");
+      }
+      sb.append("(family=");
+      sb.append(Bytes.toString(entry.getKey()));
+      sb.append(", columns=");
+      if(entry.getValue() == null) {
+        sb.append("NONE");
+      } else {
+        sb.append("{");
+        boolean moreThanOneB = false;
+        for(Map.Entry<byte [], Long> column : entry.getValue().entrySet()) {
+          if(moreThanOneB) {
+            sb.append(", ");
+          } else {
+            moreThanOneB = true;
+          }
+          sb.append(Bytes.toString(column.getKey()) + "+=" + column.getValue());
+        }
+        sb.append("}");
+      }
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+
+  //Writable
+  public void readFields(final DataInput in)
+  throws IOException {
+    int version = in.readByte();
+    if (version > INCREMENT_VERSION) {
+      throw new IOException("unsupported version");
+    }
+    this.row = Bytes.readByteArray(in);
+    this.lockId = in.readLong();
+    int numFamilies = in.readInt();
+    if (numFamilies == 0) {
+      throw new IOException("At least one column required");
+    }
+    this.familyMap =
+      new TreeMap<byte [],NavigableMap<byte [], Long>>(Bytes.BYTES_COMPARATOR);
+    for(int i=0; i<numFamilies; i++) {
+      byte [] family = Bytes.readByteArray(in);
+      boolean hasColumns = in.readBoolean();
+      NavigableMap<byte [], Long> set = null;
+      if(hasColumns) {
+        int numColumns = in.readInt();
+        set = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+        for(int j=0; j<numColumns; j++) {
+          byte [] qualifier = Bytes.readByteArray(in);
+          set.put(qualifier, in.readLong());
+        }
+      } else {
+        throw new IOException("At least one column required per family");
+      }
+      this.familyMap.put(family, set);
+    }
+  }
+
+  public void write(final DataOutput out)
+  throws IOException {
+    out.writeByte(INCREMENT_VERSION);
+    Bytes.writeByteArray(out, this.row);
+    out.writeLong(this.lockId);
+    if (familyMap.size() == 0) {
+      throw new IOException("At least one column required");
+    }
+    out.writeInt(familyMap.size());
+    for(Map.Entry<byte [], NavigableMap<byte [], Long>> entry :
+      familyMap.entrySet()) {
+      Bytes.writeByteArray(out, entry.getKey());
+      NavigableMap<byte [], Long> columnSet = entry.getValue();
+      if(columnSet == null) {
+        throw new IOException("At least one column required per family");
+      } else {
+        out.writeBoolean(true);
+        out.writeInt(columnSet.size());
+        for(Map.Entry<byte [], Long> qualifier : columnSet.entrySet()) {
+          Bytes.writeByteArray(out, qualifier.getKey());
+          out.writeLong(qualifier.getValue());
+        }
+      }
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Oct 26 18:50:13 2010
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.client.MultiPutResponse;
 import org.apache.hadoop.hbase.client.MultiAction;
@@ -191,13 +192,15 @@ public class HbaseObjectWritable impleme
 
     addToMap(NavigableSet.class, code++);
     addToMap(ColumnPrefixFilter.class, code++);
-    
+
     // Multi
     addToMap(Row.class, code++);
     addToMap(Action.class, code++);
     addToMap(MultiAction.class, code++);
     addToMap(MultiResponse.class, code++);
-    
+
+    addToMap(Increment.class, code++);
+
   }
 
   private Class<?> declaredClass;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Oct 26 18:50:13 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NotServin
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.MultiAction;
 import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.client.MultiPutResponse;
@@ -193,6 +194,18 @@ public interface HRegionInterface extend
       byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
   throws IOException;
 
+  /**
+   * Increments one or more columns values in a row.  Returns the
+   * updated keys after the increment.
+   * <p>
+   * This operation does not appear atomic to readers.  Increments are done
+   * under a row lock but readers do not take row locks.
+   * @param regionName region name
+   * @param increment increment operation
+   * @return incremented cells
+   */
+  public Result increment(byte[] regionName, Increment increment)
+  throws IOException;
 
   //
   // remote scanner interface

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 26 18:50:13 2010
@@ -32,6 +32,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Random;
 import java.util.Set;
@@ -54,14 +55,15 @@ import org.apache.hadoop.hbase.DroppedSn
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RowLock;
@@ -2959,6 +2961,102 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * Perform one or more increment operations on a row.
+   * <p>
+   * Increments performed are done under row lock but reads do not take locks
+   * out so this can be seen partially complete by gets and scans.
+   * @param increment
+   * @param lockid
+   * @param writeToWAL
+   * @return new keyvalues after increment
+   * @throws IOException
+   */
+  public Result increment(Increment increment, Integer lockid,
+      boolean writeToWAL)
+  throws IOException {
+    // TODO: Use RWCC to make this set of increments atomic to reads
+    byte [] row = increment.getRow();
+    checkRow(row);
+    boolean flush = false;
+    WALEdit walEdits = null;
+    List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
+    List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    long size = 0;
+
+    // Lock row
+    startRegionOperation();
+    try {
+      Integer lid = getLock(lockid, row, true);
+      try {
+        // Process each family
+        for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
+          increment.getFamilyMap().entrySet()) {
+
+          Store store = stores.get(family.getKey());
+
+          // Get previous values for all columns in this family
+          Get get = new Get(row);
+          for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
+            get.addColumn(family.getKey(), column.getKey());
+          }
+          List<KeyValue> results = getLastIncrement(get);
+
+          // Iterate the input columns and update existing values if they were
+          // found, otherwise add new column initialized to the increment amount
+          int idx = 0;
+          for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
+            long amount = column.getValue();
+            if (idx < results.size() &&
+                results.get(idx).matchingQualifier(column.getKey())) {
+              amount += Bytes.toLong(results.get(idx).getValue());
+              idx++;
+            }
+
+            // Append new incremented KeyValue to list
+            KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(),
+                now, Bytes.toBytes(amount));
+            kvs.add(newKV);
+
+            // Append update to WAL
+            if (writeToWAL) {
+              if (walEdits == null) {
+                walEdits = new WALEdit();
+              }
+              walEdits.add(newKV);
+            }
+          }
+
+          // Write the KVs for this family into the store
+          size += store.upsert(kvs);
+          allKVs.addAll(kvs);
+          kvs.clear();
+        }
+
+        // Actually write to WAL now
+        if (writeToWAL) {
+          this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
+            walEdits, now);
+        }
+
+        size = this.memstoreSize.addAndGet(size);
+        flush = isFlushSize(size);
+      } finally {
+        releaseRowLock(lid);
+      }
+    } finally {
+      closeRegionOperation();
+    }
+
+    if (flush) {
+      // Request a cache flush.  Do it outside update lock.
+      requestFlush();
+    }
+
+    return new Result(allKVs);
+  }
+
+  /**
    *
    * @param row
    * @param family

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 26 18:50:13 2010
@@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.MultiAction;
 import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.client.MultiPutResponse;
@@ -2293,6 +2294,26 @@ public class HRegionServer implements HR
     return this.serverInfo;
   }
 
+
+  @Override
+  public Result increment(byte[] regionName, Increment increment)
+  throws IOException {
+    checkOpen();
+    if (regionName == null) {
+      throw new IOException("Invalid arguments to increment " +
+      "regionName is null");
+    }
+    requestCount.incrementAndGet();
+    try {
+      HRegion region = getRegion(regionName);
+      return region.increment(increment, getLockFromId(increment.getLockId()),
+          increment.getWriteToWAL());
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
   /** {@inheritDoc} */
   public long incrementColumnValue(byte[] regionName, byte[] row,
       byte[] family, byte[] qualifier, long amount, boolean writeToWAL)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Tue Oct 26 18:50:13 2010
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionse
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.rmi.UnexpectedException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -370,8 +371,6 @@ public class MemStore implements HeapSiz
     try {
       KeyValue firstKv = KeyValue.createFirstOnRow(
           row, family, qualifier);
-      // create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
-      KeyValue newKv;
       // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
       SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
       if (!snSs.isEmpty()) {
@@ -410,46 +409,95 @@ public class MemStore implements HeapSiz
         }
       }
 
+      // create or update (upsert) a new KeyValue with
+      // 'now' and a 0 memstoreTS == immediately visible
+      return upsert(Arrays.asList(new KeyValue [] {
+          new KeyValue(row, family, qualifier, now,
+              Bytes.toBytes(newValue))
+      }));
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
 
-      // add the new value now. this might have the same TS as an existing KV, thus confusing
-      // readers slightly for a MOMENT until we erase the old one (and thus old value).
-      newKv = new KeyValue(row, family, qualifier,
-          now,
-          Bytes.toBytes(newValue));
-      long addedSize = add(newKv);
-
-      // remove extra versions.
-      ss = kvset.tailSet(firstKv);
-      it = ss.iterator();
-      while ( it.hasNext() ) {
-        KeyValue kv = it.next();
+  /**
+   * Update or insert the specified KeyValues.
+   * <p>
+   * For each KeyValue, insert into MemStore.  This will atomically upsert the
+   * value for that row/family/qualifier.  If a KeyValue did already exist,
+   * it will then be removed.
+   * <p>
+   * Currently the memstoreTS is kept at 0 so as each insert happens, it will
+   * be immediately visible.  May want to change this so it is atomic across
+   * all KeyValues.
+   * <p>
+   * This is called under row lock, so Get operations will still see updates
+   * atomically.  Scans will only see each KeyValue update as atomic.
+   *
+   * @param kvs
+   * @return change in memstore size
+   */
+  public long upsert(List<KeyValue> kvs) {
+   this.lock.readLock().lock();
+    try {
+      long size = 0;
+      for (KeyValue kv : kvs) {
+        kv.setMemstoreTS(0);
+        size += upsert(kv);
+      }
+      return size;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
 
-        if (kv == newKv) {
-          // ignore the one i just put in (heh)
-          continue;
-        }
+  /**
+   * Inserts the specified KeyValue into MemStore and deletes any existing
+   * versions of the same row/family/qualifier as the specified KeyValue.
+   * <p>
+   * First, the specified KeyValue is inserted into the Memstore.
+   * <p>
+   * If there are any existing KeyValues in this MemStore with the same row,
+   * family, and qualifier, they are removed.
+   * @param kv
+   * @return change in size of MemStore
+   */
+  private long upsert(KeyValue kv) {
+    // Add the KeyValue to the MemStore
+    long addedSize = add(kv);
+
+    // Iterate the KeyValues after the one just inserted, cleaning up any
+    // other KeyValues with the same row/family/qualifier
+    SortedSet<KeyValue> ss = kvset.tailSet(kv);
+    Iterator<KeyValue> it = ss.iterator();
+    while ( it.hasNext() ) {
+      KeyValue cur = it.next();
 
-        // if this isnt the row we are interested in, then bail:
-        if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv)) {
-          break; // rows dont match, bail.
-        }
+      if (kv == cur) {
+        // ignore the one just put in
+        continue;
+      }
+      // if this isn't the row we are interested in, then bail
+      if (!kv.matchingRow(cur)) {
+        break;
+      }
 
-        // if the qualifier matches and it's a put, just RM it out of the kvset.
-        if (firstKv.matchingQualifier(kv)) {
-          // to be extra safe we only remove Puts that have a memstoreTS==0
-          if (kv.getType() == KeyValue.Type.Put.getCode()) {
-            // false means there was a change, so give us the size.
-            addedSize -= heapSizeChange(kv, true);
+      // if the qualifier matches and it's a put, remove it
+      if (kv.matchingQualifier(cur)) {
 
-            it.remove();
-          }
+        // to be extra safe we only remove Puts that have a memstoreTS==0
+        if (kv.getType() == KeyValue.Type.Put.getCode() &&
+            kv.getMemstoreTS() == 0) {
+          // false means there was a change, so give us the size.
+          addedSize -= heapSizeChange(kv, true);
+          it.remove();
         }
+      } else {
+        // past the column, done
+        break;
       }
-
-      return addedSize;
-    } finally {
-      this.lock.readLock().unlock();
     }
+    return addedSize;
   }
 
   /*

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 26 18:50:13 2010
@@ -1386,6 +1386,30 @@ public class Store implements HeapSize {
     }
   }
 
+  /**
+   * Adds or replaces the specified KeyValues.
+   * <p>
+   * For each KeyValue specified, if a cell with the same row, family, and
+   * qualifier exists in MemStore, it will be replaced.  Otherwise, it will just
+   * be inserted to MemStore.
+   * <p>
+   * This operation is atomic on each KeyValue (row/family/qualifier) but not
+   * necessarily atomic across all of them.
+   * @param kvs
+   * @return memstore size delta
+   * @throws IOException
+   */
+  public long upsert(List<KeyValue> kvs)
+      throws IOException {
+    this.lock.readLock().lock();
+    try {
+      // TODO: Make this operation atomic w/ RWCC
+      return this.memstore.upsert(kvs);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
   public StoreFlusher getStoreFlusher(long cacheFlushId) {
     return new StoreFlusherImpl(cacheFlushId);
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Tue Oct 26 18:50:13 2010
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Result;
@@ -588,6 +589,9 @@ public class RemoteHTable implements HTa
     throw new IOException("checkAndDelete not supported");
   }
 
+  public Result increment(Increment increment) throws IOException {
+    throw new IOException("Increment not supported");
+  }
 
   public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
       long amount) throws IOException {

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Tue Oct 26 18:50:13 2010
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
@@ -60,6 +59,7 @@ import org.apache.hadoop.hbase.filter.Re
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -261,7 +261,7 @@ public class TestFromClientSide {
    * logs to ensure that we're not scanning more regions that we're supposed to.
    * Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package.
    * @throws IOException
-   * @throws InterruptedException 
+   * @throws InterruptedException
    */
   @Test
   public void testFilterAcrossMultipleRegions()
@@ -1506,7 +1506,7 @@ public class TestFromClientSide {
     get.addFamily(FAMILIES[0]);
     get.setMaxVersions(Integer.MAX_VALUE);
     result = ht.get(get);
-    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
         new long [] {ts[1], ts[2], ts[3]},
         new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
         0, 2);
@@ -2633,6 +2633,23 @@ public class TestFromClientSide {
         equals(value, key.getValue()));
   }
 
+  private void assertIncrementKey(KeyValue key, byte [] row, byte [] family,
+      byte [] qualifier, long value)
+  throws Exception {
+    assertTrue("Expected row [" + Bytes.toString(row) + "] " +
+        "Got row [" + Bytes.toString(key.getRow()) +"]",
+        equals(row, key.getRow()));
+    assertTrue("Expected family [" + Bytes.toString(family) + "] " +
+        "Got family [" + Bytes.toString(key.getFamily()) + "]",
+        equals(family, key.getFamily()));
+    assertTrue("Expected qualifier [" + Bytes.toString(qualifier) + "] " +
+        "Got qualifier [" + Bytes.toString(key.getQualifier()) + "]",
+        equals(qualifier, key.getQualifier()));
+    assertTrue("Expected value [" + value + "] " +
+        "Got value [" + Bytes.toLong(key.getValue()) + "]",
+        Bytes.toLong(key.getValue()) == value);
+  }
+
   private void assertNumKeys(Result result, int n) throws Exception {
     assertTrue("Expected " + n + " keys but got " + result.size(),
         result.size() == n);
@@ -3800,7 +3817,7 @@ public class TestFromClientSide {
     table.getConnection().clearRegionCache();
     assertEquals("Clearing cache should have 0 cached ", 0,
         HConnectionManager.getCachedRegionCount(conf, TABLENAME));
-    
+
     // A Get is suppose to do a region lookup request
     Get g = new Get(Bytes.toBytes("aaa"));
     table.get(g);
@@ -3848,5 +3865,76 @@ public class TestFromClientSide {
 
     LOG.info("Finishing testRegionCachePreWarm");
   }
+
+  @Test
+  public void testIncrement() throws Exception {
+    LOG.info("Starting testIncrement");
+    final byte [] TABLENAME = Bytes.toBytes("testIncrement");
+    HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
+
+    byte [][] ROWS = new byte [][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
+        Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
+        Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
+    };
+    byte [][] QUALIFIERS = new byte [][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
+        Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
+        Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
+    };
+
+    // Do some simple single-column increments
+
+    // First with old API
+    ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1);
+    ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2);
+    ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3);
+    ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4);
+
+    // Now increment things incremented with old and do some new
+    Increment inc = new Increment(ROW);
+    inc.addColumn(FAMILY, QUALIFIERS[1], 1);
+    inc.addColumn(FAMILY, QUALIFIERS[3], 1);
+    inc.addColumn(FAMILY, QUALIFIERS[4], 1);
+    ht.increment(inc);
+
+    // Verify expected results
+    Result r = ht.get(new Get(ROW));
+    KeyValue [] kvs = r.raw();
+    assertEquals(5, kvs.length);
+    assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1);
+    assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3);
+    assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3);
+    assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5);
+    assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1);
+
+    // Now try multiple columns by different amounts
+    inc = new Increment(ROWS[0]);
+    for (int i=0;i<QUALIFIERS.length;i++) {
+      inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
+    }
+    ht.increment(inc);
+    // Verify
+    r = ht.get(new Get(ROWS[0]));
+    kvs = r.raw();
+    assertEquals(QUALIFIERS.length, kvs.length);
+    for (int i=0;i<QUALIFIERS.length;i++) {
+      assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1);
+    }
+
+    // Re-increment them
+    inc = new Increment(ROWS[0]);
+    for (int i=0;i<QUALIFIERS.length;i++) {
+      inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
+    }
+    ht.increment(inc);
+    // Verify
+    r = ht.get(new Get(ROWS[0]));
+    kvs = r.raw();
+    assertEquals(QUALIFIERS.length, kvs.length);
+    for (int i=0;i<QUALIFIERS.length;i++) {
+      assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1));
+    }
+  }
 }