You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/03/09 23:13:44 UTC

svn commit: r751872 - in /hadoop/hbase/branches/0.19: ./ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/regionserver/

Author: stack
Date: Mon Mar  9 22:13:44 2009
New Revision: 751872

URL: http://svn.apache.org/viewvc?rev=751872&view=rev
Log:
HBASE-803 Atomic increment operations

Modified:
    hadoop/hbase/branches/0.19/CHANGES.txt
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hadoop/hbase/branches/0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/CHANGES.txt?rev=751872&r1=751871&r2=751872&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19/CHANGES.txt Mon Mar  9 22:13:44 2009
@@ -42,6 +42,7 @@
    HBASE-1240  Would be nice if RowResult could be comparable
                (Erik Holstad via Stack)
    HBASE-1229  Apply HADOOP-5369 to HBase MapFile (Ben Maurer via Stack)
+   HBASE-803   Atomic increment operations (Ryan Rawson and Jon Gray via Stack)
 
 Release 0.19.0
   INCOMPATIBLE CHANGES

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=751872&r1=751871&r2=751872&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/client/HTable.java Mon Mar  9 22:13:44 2009
@@ -1480,6 +1480,18 @@
   public ArrayList<BatchUpdate> getWriteBuffer() {
     return writeBuffer;
   }
+  
+  public long incrementColumnValue(final byte [] row, final byte [] column,
+      final int amount) throws IOException {
+    return connection.getRegionServerWithRetries(
+        new ServerCallable<Long>(connection, tableName, row) {
+          public Long call() throws IOException {
+            return server.incrementColumnValue(
+                location.getRegionInfo().getRegionName(), row, column, amount);
+          }
+        }
+    );
+  }
 
   /**
    * Implements the scanner interface for the HBase client.

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java?rev=751872&r1=751871&r2=751872&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java Mon Mar  9 22:13:44 2009
@@ -65,7 +65,8 @@
    * <li>Version 13: HBASE-847</li>
    * <li>Version 14: HBASE-900</li>
    * <li>Version 15: HRegionInterface.exists</li>
+   * <li>Version 16: Added incrementColumnValue.</li>
    * </ul>
    */
-  public static final long versionID = 15L;
+  public static final long versionID = 16L;
 }

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=751872&r1=751871&r2=751872&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Mon Mar  9 22:13:44 2009
@@ -292,4 +292,18 @@
    */
   public void unlockRow(final byte [] regionName, final long lockId)
   throws IOException;
+  
+  /**
+   * Atomically increments a column value. If the column value isn't long-like, this could
+   * throw an exception.
+   * 
+   * @param regionName
+   * @param row
+   * @param column
+   * @param amount
+   * @return new incremented column value
+   * @throws IOException
+   */
+  public long incrementColumnValue(byte [] regionName, byte [] row,
+      byte [] column, long amount) throws IOException;
 }

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=751872&r1=751871&r2=751872&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Mar  9 22:13:44 2009
@@ -44,6 +44,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ColumnNameParseException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -2614,4 +2615,78 @@
       }
     }
   }
+
+  public long incrementColumnValue(byte[] row, byte[] column, long amount) throws IOException {
+    checkRow(row);
+    checkColumn(column);
+    
+    Integer lid = obtainRowLock(row);
+    splitsAndClosesLock.readLock().lock();
+    try {
+      HStoreKey hsk = new HStoreKey(row, column);
+      long ts = System.currentTimeMillis();
+      byte [] value = null;
+      long newval; // the new value.
+
+      HStore store = getStore(column);
+
+      List<Cell> c;
+      // Try the memcache first.
+      store.lock.readLock().lock();
+      try {
+        c = store.memcache.get(hsk, 1);
+      } finally {
+        store.lock.readLock().unlock();
+      }
+      if (c.size() == 1) {
+        // Use the memcache timestamp value.
+        LOG.debug("Overwriting the memcache value for " + Bytes.toString(row) + "/" + Bytes.toString(column));
+        ts = c.get(0).getTimestamp();
+        value = c.get(0).getValue();
+      } else if (c.size() > 1) {
+        throw new DoNotRetryIOException("more than 1 value returned in incrementColumnValue from memcache");
+      }
+
+      if (value == null) {
+        // Check the store (including disk) for the previous value.
+        Cell[] cell = store.get(hsk, 1);
+        if (cell != null && cell.length == 1) {
+          LOG.debug("Using HFile previous value for " + Bytes.toString(row) + "/" + Bytes.toString(column));
+          value = cell[0].getValue();
+        } else if (cell != null && c.size() > 1) {
+          throw new DoNotRetryIOException("more than 1 value returned in incrementColumnValue from Store");
+        }
+      }
+      
+      if (value == null) {
+        // Doesn't exist
+        LOG.debug("Creating new counter value for " + Bytes.toString(row) + "/"+ Bytes.toString(column));
+        newval = amount;
+      } else {
+        newval = incrementBytes(value, amount);
+      }
+
+      BatchUpdate b = new BatchUpdate(row, ts);
+      b.put(column, Bytes.toBytes(newval));
+      batchUpdate(b, lid, true);
+      return newval;
+    } finally {
+      splitsAndClosesLock.readLock().unlock();
+      releaseRowLock(lid);
+    }
+  }
+
+  private long incrementBytes(byte[] value, long amount) throws IOException {
+    // Hopefully this doesn't happen too often.
+    if (value.length < Bytes.SIZEOF_LONG) {
+      byte [] newvalue = new byte[Bytes.SIZEOF_LONG];
+      System.arraycopy(value, 0, newvalue, newvalue.length - value.length, value.length);
+      value = newvalue;
+    } else if (value.length > Bytes.SIZEOF_LONG) {
+      throw new DoNotRetryIOException("Increment Bytes - value too big: " + value.length);
+    }
+    long v = Bytes.toLong(value);
+    v += amount;
+    return v;
+  }
 }

Modified: hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=751872&r1=751871&r2=751872&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/branches/0.19/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Mar  9 22:13:44 2009
@@ -2274,4 +2274,34 @@
         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
     doMain(args, regionServerClass);
   }
+
+  /** {@inheritDoc} */
+  public long incrementColumnValue(byte[] regionName, byte[] row,
+      byte[] column, long amount) throws IOException {
+    checkOpen();
+    
+    NullPointerException npe = null;
+    if (regionName == null) {
+      npe = new NullPointerException("regionName is null");
+    } else if (row == null) {
+      npe = new NullPointerException("row is null");
+    } else if (column == null) {
+      npe = new NullPointerException("column is null");
+    }
+    if (npe != null) {
+      IOException io = new IOException(
+          "Invalid arguments to incrementColumnValue", npe);
+      throw io;
+    }
+    requestCount.incrementAndGet();
+    try {
+      HRegion region = getRegion(regionName);
+      return region.incrementColumnValue(row, column, amount);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+    
+    
+  }
 }