You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/10/26 20:50:14 UTC
svn commit: r1027681 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/io/
src/main/java/org/apache/hadoop/hbase/ipc/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/h...
Author: jgray
Date: Tue Oct 26 18:50:13 2010
New Revision: 1027681
URL: http://svn.apache.org/viewvc?rev=1027681&view=rev
Log:
HBASE-2946 Increment multiple columns in a row at once
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Oct 26 18:50:13 2010
@@ -1096,6 +1096,8 @@ Release 0.21.0 - Unreleased
HBASE-3053 Add ability to have multiple Masters LocalHBaseCluster for
test writing
HBASE-2201 JRuby shell for replication
+ HBASE-2946 Increment multiple columns in a row at once
+
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Oct 26 18:50:13 2010
@@ -650,6 +650,22 @@ public class HTable implements HTableInt
}
@Override
+ public Result increment(final Increment increment) throws IOException {
+ if (!increment.hasFamilies()) {
+ throw new IOException(
+ "Invalid arguments to increment, no columns specified");
+ }
+ return connection.getRegionServerWithRetries(
+ new ServerCallable<Result>(connection, tableName, increment.getRow()) {
+ public Result call() throws IOException {
+ return server.increment(
+ location.getRegionInfo().getRegionName(), increment);
+ }
+ }
+ );
+ }
+
+ @Override
public long incrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, final long amount)
throws IOException {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Tue Oct 26 18:50:13 2010
@@ -19,13 +19,13 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import java.io.IOException;
-import java.util.List;
-
/**
* Used to communicate with a single HBase table.
*
@@ -231,7 +231,7 @@ public interface HTableInterface {
/**
* Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the delete. If the passed value is null, the
+ * value. If it does, it adds the delete. If the passed value is null, the
* check is for the lack of column (ie: non-existance)
*
* @param row to check
@@ -246,6 +246,21 @@ public interface HTableInterface {
byte[] value, Delete delete) throws IOException;
/**
+ * Increments one or more columns within a single row.
+ * <p>
+ * This operation does not appear atomic to readers. Increments are done
+ * under a single row lock, so write operations to a row are synchronized, but
+ * readers do not take row locks so get and scan operations can see this
+ * operation partially completed.
+ *
+ * @param increment object that specifies the columns and amounts to be used
+ * for the increment operations
+ * @throws IOException e
+ * @return values of columns after the increment
+ */
+ public Result increment(final Increment increment) throws IOException;
+
+ /**
* Atomically increments a column value.
* <p>
* Equivalent to {@code {@link #incrementColumnValue(byte[], byte[], byte[],
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java?rev=1027681&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Increment.java Tue Oct 26 18:50:13 2010
@@ -0,0 +1,294 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Used to perform Increment operations on a single row.
+ * <p>
+ * This operation does not appear atomic to readers. Increments are done
+ * under a single row lock, so write operations to a row are synchronized, but
+ * readers do not take row locks so get and scan operations can see this
+ * operation partially completed.
+ * <p>
+ * To increment columns of a row, instantiate an Increment object with the row
+ * to increment. At least one column to increment must be specified using the
+ * {@link #addColumn(byte[], byte[], long)} method.
+ */
+public class Increment implements Writable {
+ private static final byte INCREMENT_VERSION = (byte)1;
+
+ private byte [] row = null;
+ private long lockId = -1L;
+ private boolean writeToWAL = true;
+ private Map<byte [], NavigableMap<byte [], Long>> familyMap =
+ new TreeMap<byte [], NavigableMap<byte [], Long>>(Bytes.BYTES_COMPARATOR);
+
+ /** Constructor for Writable. DO NOT USE */
+ public Increment() {}
+
+ /**
+ * Create a Increment operation for the specified row.
+ * <p>
+ * At least one column must be incremented.
+ * @param row row key
+ */
+ public Increment(byte [] row) {
+ this(row, null);
+ }
+
+ /**
+ * Create a Increment operation for the specified row, using an existing row
+ * lock.
+ * <p>
+ * At least one column must be incremented.
+ * @param row row key
+ * @param rowLock previously acquired row lock, or null
+ */
+ public Increment(byte [] row, RowLock rowLock) {
+ this.row = row;
+ if(rowLock != null) {
+ this.lockId = rowLock.getLockId();
+ }
+ }
+
+ /**
+ * Increment the column from the specific family with the specified qualifier
+ * by the specified amount.
+ * <p>
+ * Overrides previous calls to addColumn for this family and qualifier.
+ * @param family family name
+ * @param qualifier column qualifier
+ * @param amount amount to increment by
+ * @return the Increment object
+ */
+ public Increment addColumn(byte [] family, byte [] qualifier, long amount) {
+ NavigableMap<byte [], Long> set = familyMap.get(family);
+ if(set == null) {
+ set = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+ }
+ set.put(qualifier, amount);
+ familyMap.put(family, set);
+ return this;
+ }
+
+ /* Accessors */
+
+ /**
+ * Method for retrieving the increment's row
+ * @return row
+ */
+ public byte [] getRow() {
+ return this.row;
+ }
+
+ /**
+ * Method for retrieving the increment's RowLock
+ * @return RowLock
+ */
+ public RowLock getRowLock() {
+ return new RowLock(this.row, this.lockId);
+ }
+
+ /**
+ * Method for retrieving the increment's lockId
+ * @return lockId
+ */
+ public long getLockId() {
+ return this.lockId;
+ }
+
+ /**
+ * Method for retrieving whether WAL will be written to or not
+ * @return true if WAL should be used, false if not
+ */
+ public boolean getWriteToWAL() {
+ return this.writeToWAL;
+ }
+
+ /**
+ * Sets whether this operation should write to the WAL or not.
+ * @param writeToWAL true if WAL should be used, false if not
+ * @return this increment operation
+ */
+ public Increment setWriteToWAL(boolean writeToWAL) {
+ this.writeToWAL = writeToWAL;
+ return this;
+ }
+
+ /**
+ * Method for retrieving the keys in the familyMap
+ * @return keys in the current familyMap
+ */
+ public Set<byte[]> familySet() {
+ return this.familyMap.keySet();
+ }
+
+ /**
+ * Method for retrieving the number of families to increment from
+ * @return number of families
+ */
+ public int numFamilies() {
+ return this.familyMap.size();
+ }
+
+ /**
+ * Method for retrieving the number of columns to increment
+ * @return number of columns across all families
+ */
+ public int numColumns() {
+ if (!hasFamilies()) return 0;
+ int num = 0;
+ for (NavigableMap<byte [], Long> family : familyMap.values()) {
+ num += family.size();
+ }
+ return num;
+ }
+
+ /**
+ * Method for checking if any families have been inserted into this Increment
+ * @return true if familyMap is non empty false otherwise
+ */
+ public boolean hasFamilies() {
+ return !this.familyMap.isEmpty();
+ }
+
+ /**
+ * Method for retrieving the increment's familyMap
+ * @return familyMap
+ */
+ public Map<byte[],NavigableMap<byte[], Long>> getFamilyMap() {
+ return this.familyMap;
+ }
+
+ /**
+ * @return String
+ */
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("row=");
+ sb.append(Bytes.toString(this.row));
+ if(this.familyMap.size() == 0) {
+ sb.append(", no columns set to be incremented");
+ return sb.toString();
+ }
+ sb.append(", families=");
+ boolean moreThanOne = false;
+ for(Map.Entry<byte [], NavigableMap<byte[], Long>> entry :
+ this.familyMap.entrySet()) {
+ if(moreThanOne) {
+ sb.append("), ");
+ } else {
+ moreThanOne = true;
+ sb.append("{");
+ }
+ sb.append("(family=");
+ sb.append(Bytes.toString(entry.getKey()));
+ sb.append(", columns=");
+ if(entry.getValue() == null) {
+ sb.append("NONE");
+ } else {
+ sb.append("{");
+ boolean moreThanOneB = false;
+ for(Map.Entry<byte [], Long> column : entry.getValue().entrySet()) {
+ if(moreThanOneB) {
+ sb.append(", ");
+ } else {
+ moreThanOneB = true;
+ }
+ sb.append(Bytes.toString(column.getKey()) + "+=" + column.getValue());
+ }
+ sb.append("}");
+ }
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+ //Writable
+ public void readFields(final DataInput in)
+ throws IOException {
+ int version = in.readByte();
+ if (version > INCREMENT_VERSION) {
+ throw new IOException("unsupported version");
+ }
+ this.row = Bytes.readByteArray(in);
+ this.lockId = in.readLong();
+ int numFamilies = in.readInt();
+ if (numFamilies == 0) {
+ throw new IOException("At least one column required");
+ }
+ this.familyMap =
+ new TreeMap<byte [],NavigableMap<byte [], Long>>(Bytes.BYTES_COMPARATOR);
+ for(int i=0; i<numFamilies; i++) {
+ byte [] family = Bytes.readByteArray(in);
+ boolean hasColumns = in.readBoolean();
+ NavigableMap<byte [], Long> set = null;
+ if(hasColumns) {
+ int numColumns = in.readInt();
+ set = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+ for(int j=0; j<numColumns; j++) {
+ byte [] qualifier = Bytes.readByteArray(in);
+ set.put(qualifier, in.readLong());
+ }
+ } else {
+ throw new IOException("At least one column required per family");
+ }
+ this.familyMap.put(family, set);
+ }
+ }
+
+ public void write(final DataOutput out)
+ throws IOException {
+ out.writeByte(INCREMENT_VERSION);
+ Bytes.writeByteArray(out, this.row);
+ out.writeLong(this.lockId);
+ if (familyMap.size() == 0) {
+ throw new IOException("At least one column required");
+ }
+ out.writeInt(familyMap.size());
+ for(Map.Entry<byte [], NavigableMap<byte [], Long>> entry :
+ familyMap.entrySet()) {
+ Bytes.writeByteArray(out, entry.getKey());
+ NavigableMap<byte [], Long> columnSet = entry.getValue();
+ if(columnSet == null) {
+ throw new IOException("At least one column required per family");
+ } else {
+ out.writeBoolean(true);
+ out.writeInt(columnSet.size());
+ for(Map.Entry<byte [], Long> qualifier : columnSet.entrySet()) {
+ Bytes.writeByteArray(out, qualifier.getKey());
+ out.writeLong(qualifier.getValue());
+ }
+ }
+ }
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Oct 26 18:50:13 2010
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
import org.apache.hadoop.hbase.client.MultiAction;
@@ -191,13 +192,15 @@ public class HbaseObjectWritable impleme
addToMap(NavigableSet.class, code++);
addToMap(ColumnPrefixFilter.class, code++);
-
+
// Multi
addToMap(Row.class, code++);
addToMap(Action.class, code++);
addToMap(MultiAction.class, code++);
addToMap(MultiResponse.class, code++);
-
+
+ addToMap(Increment.class, code++);
+
}
private Class<?> declaredClass;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Oct 26 18:50:13 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NotServin
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
@@ -193,6 +194,18 @@ public interface HRegionInterface extend
byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
throws IOException;
+ /**
+ * Increments one or more columns values in a row. Returns the
+ * updated keys after the increment.
+ * <p>
+ * This operation does not appear atomic to readers. Increments are done
+ * under a row lock but readers do not take row locks.
+ * @param regionName region name
+ * @param increment increment operation
+ * @return incremented cells
+ */
+ public Result increment(byte[] regionName, Increment increment)
+ throws IOException;
//
// remote scanner interface
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 26 18:50:13 2010
@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
@@ -54,14 +55,15 @@ import org.apache.hadoop.hbase.DroppedSn
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowLock;
@@ -2959,6 +2961,102 @@ public class HRegion implements HeapSize
}
/**
+ * Perform one or more increment operations on a row.
+ * <p>
+ * Increments performed are done under row lock but reads do not take locks
+ * out so this can be seen partially complete by gets and scans.
+ * @param increment
+ * @param lockid
+ * @param writeToWAL
+ * @return new keyvalues after increment
+ * @throws IOException
+ */
+ public Result increment(Increment increment, Integer lockid,
+ boolean writeToWAL)
+ throws IOException {
+ // TODO: Use RWCC to make this set of increments atomic to reads
+ byte [] row = increment.getRow();
+ checkRow(row);
+ boolean flush = false;
+ WALEdit walEdits = null;
+ List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
+ List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long size = 0;
+
+ // Lock row
+ startRegionOperation();
+ try {
+ Integer lid = getLock(lockid, row, true);
+ try {
+ // Process each family
+ for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
+ increment.getFamilyMap().entrySet()) {
+
+ Store store = stores.get(family.getKey());
+
+ // Get previous values for all columns in this family
+ Get get = new Get(row);
+ for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
+ get.addColumn(family.getKey(), column.getKey());
+ }
+ List<KeyValue> results = getLastIncrement(get);
+
+ // Iterate the input columns and update existing values if they were
+ // found, otherwise add new column initialized to the increment amount
+ int idx = 0;
+ for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
+ long amount = column.getValue();
+ if (idx < results.size() &&
+ results.get(idx).matchingQualifier(column.getKey())) {
+ amount += Bytes.toLong(results.get(idx).getValue());
+ idx++;
+ }
+
+ // Append new incremented KeyValue to list
+ KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(),
+ now, Bytes.toBytes(amount));
+ kvs.add(newKV);
+
+ // Append update to WAL
+ if (writeToWAL) {
+ if (walEdits == null) {
+ walEdits = new WALEdit();
+ }
+ walEdits.add(newKV);
+ }
+ }
+
+ // Write the KVs for this family into the store
+ size += store.upsert(kvs);
+ allKVs.addAll(kvs);
+ kvs.clear();
+ }
+
+ // Actually write to WAL now
+ if (writeToWAL) {
+ this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
+ walEdits, now);
+ }
+
+ size = this.memstoreSize.addAndGet(size);
+ flush = isFlushSize(size);
+ } finally {
+ releaseRowLock(lid);
+ }
+ } finally {
+ closeRegionOperation();
+ }
+
+ if (flush) {
+ // Request a cache flush. Do it outside update lock.
+ requestFlush();
+ }
+
+ return new Result(allKVs);
+ }
+
+ /**
*
* @param row
* @param family
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 26 18:50:13 2010
@@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
@@ -2293,6 +2294,26 @@ public class HRegionServer implements HR
return this.serverInfo;
}
+
+ @Override
+ public Result increment(byte[] regionName, Increment increment)
+ throws IOException {
+ checkOpen();
+ if (regionName == null) {
+ throw new IOException("Invalid arguments to increment " +
+ "regionName is null");
+ }
+ requestCount.incrementAndGet();
+ try {
+ HRegion region = getRegion(regionName);
+ return region.increment(increment, getLockFromId(increment.getLockId()),
+ increment.getWriteToWAL());
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
/** {@inheritDoc} */
public long incrementColumnValue(byte[] regionName, byte[] row,
byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Tue Oct 26 18:50:13 2010
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionse
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.rmi.UnexpectedException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -370,8 +371,6 @@ public class MemStore implements HeapSiz
try {
KeyValue firstKv = KeyValue.createFirstOnRow(
row, family, qualifier);
- // create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
- KeyValue newKv;
// Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
if (!snSs.isEmpty()) {
@@ -410,46 +409,95 @@ public class MemStore implements HeapSiz
}
}
+ // create or update (upsert) a new KeyValue with
+ // 'now' and a 0 memstoreTS == immediately visible
+ return upsert(Arrays.asList(new KeyValue [] {
+ new KeyValue(row, family, qualifier, now,
+ Bytes.toBytes(newValue))
+ }));
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
- // add the new value now. this might have the same TS as an existing KV, thus confusing
- // readers slightly for a MOMENT until we erase the old one (and thus old value).
- newKv = new KeyValue(row, family, qualifier,
- now,
- Bytes.toBytes(newValue));
- long addedSize = add(newKv);
-
- // remove extra versions.
- ss = kvset.tailSet(firstKv);
- it = ss.iterator();
- while ( it.hasNext() ) {
- KeyValue kv = it.next();
+ /**
+ * Update or insert the specified KeyValues.
+ * <p>
+ * For each KeyValue, insert into MemStore. This will atomically upsert the
+ * value for that row/family/qualifier. If a KeyValue did already exist,
+ * it will then be removed.
+ * <p>
+ * Currently the memstoreTS is kept at 0 so as each insert happens, it will
+ * be immediately visible. May want to change this so it is atomic across
+ * all KeyValues.
+ * <p>
+ * This is called under row lock, so Get operations will still see updates
+ * atomically. Scans will only see each KeyValue update as atomic.
+ *
+ * @param kvs
+ * @return change in memstore size
+ */
+ public long upsert(List<KeyValue> kvs) {
+ this.lock.readLock().lock();
+ try {
+ long size = 0;
+ for (KeyValue kv : kvs) {
+ kv.setMemstoreTS(0);
+ size += upsert(kv);
+ }
+ return size;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
- if (kv == newKv) {
- // ignore the one i just put in (heh)
- continue;
- }
+ /**
+ * Inserts the specified KeyValue into MemStore and deletes any existing
+ * versions of the same row/family/qualifier as the specified KeyValue.
+ * <p>
+ * First, the specified KeyValue is inserted into the Memstore.
+ * <p>
+ * If there are any existing KeyValues in this MemStore with the same row,
+ * family, and qualifier, they are removed.
+ * @param kv
+ * @return change in size of MemStore
+ */
+ private long upsert(KeyValue kv) {
+ // Add the KeyValue to the MemStore
+ long addedSize = add(kv);
+
+ // Iterate the KeyValues after the one just inserted, cleaning up any
+ // other KeyValues with the same row/family/qualifier
+ SortedSet<KeyValue> ss = kvset.tailSet(kv);
+ Iterator<KeyValue> it = ss.iterator();
+ while ( it.hasNext() ) {
+ KeyValue cur = it.next();
- // if this isnt the row we are interested in, then bail:
- if (!firstKv.matchingColumn(family,qualifier) || !firstKv.matchingRow(kv)) {
- break; // rows dont match, bail.
- }
+ if (kv == cur) {
+ // ignore the one just put in
+ continue;
+ }
+ // if this isn't the row we are interested in, then bail
+ if (!kv.matchingRow(cur)) {
+ break;
+ }
- // if the qualifier matches and it's a put, just RM it out of the kvset.
- if (firstKv.matchingQualifier(kv)) {
- // to be extra safe we only remove Puts that have a memstoreTS==0
- if (kv.getType() == KeyValue.Type.Put.getCode()) {
- // false means there was a change, so give us the size.
- addedSize -= heapSizeChange(kv, true);
+ // if the qualifier matches and it's a put, remove it
+ if (kv.matchingQualifier(cur)) {
- it.remove();
- }
+ // to be extra safe we only remove Puts that have a memstoreTS==0
+ if (kv.getType() == KeyValue.Type.Put.getCode() &&
+ kv.getMemstoreTS() == 0) {
+ // false means there was a change, so give us the size.
+ addedSize -= heapSizeChange(kv, true);
+ it.remove();
}
+ } else {
+ // past the column, done
+ break;
}
-
- return addedSize;
- } finally {
- this.lock.readLock().unlock();
}
+ return addedSize;
}
/*
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 26 18:50:13 2010
@@ -1386,6 +1386,30 @@ public class Store implements HeapSize {
}
}
+ /**
+ * Adds or replaces the specified KeyValues.
+ * <p>
+ * For each KeyValue specified, if a cell with the same row, family, and
+ * qualifier exists in MemStore, it will be replaced. Otherwise, it will just
+ * be inserted to MemStore.
+ * <p>
+ * This operation is atomic on each KeyValue (row/family/qualifier) but not
+ * necessarily atomic across all of them.
+ * @param kvs
+ * @return memstore size delta
+ * @throws IOException
+ */
+ public long upsert(List<KeyValue> kvs)
+ throws IOException {
+ this.lock.readLock().lock();
+ try {
+ // TODO: Make this operation atomic w/ RWCC
+ return this.memstore.upsert(kvs);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
public StoreFlusher getStoreFlusher(long cacheFlushId) {
return new StoreFlusherImpl(cacheFlushId);
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Tue Oct 26 18:50:13 2010
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Result;
@@ -588,6 +589,9 @@ public class RemoteHTable implements HTa
throw new IOException("checkAndDelete not supported");
}
+ public Result increment(Increment increment) throws IOException {
+ throw new IOException("Increment not supported");
+ }
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
long amount) throws IOException {
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1027681&r1=1027680&r2=1027681&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Tue Oct 26 18:50:13 2010
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
@@ -60,6 +59,7 @@ import org.apache.hadoop.hbase.filter.Re
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
@@ -261,7 +261,7 @@ public class TestFromClientSide {
* logs to ensure that we're not scanning more regions that we're supposed to.
* Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package.
* @throws IOException
- * @throws InterruptedException
+ * @throws InterruptedException
*/
@Test
public void testFilterAcrossMultipleRegions()
@@ -1506,7 +1506,7 @@ public class TestFromClientSide {
get.addFamily(FAMILIES[0]);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
- assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
+ assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
new long [] {ts[1], ts[2], ts[3]},
new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
0, 2);
@@ -2633,6 +2633,23 @@ public class TestFromClientSide {
equals(value, key.getValue()));
}
+ private void assertIncrementKey(KeyValue key, byte [] row, byte [] family,
+ byte [] qualifier, long value)
+ throws Exception {
+ assertTrue("Expected row [" + Bytes.toString(row) + "] " +
+ "Got row [" + Bytes.toString(key.getRow()) +"]",
+ equals(row, key.getRow()));
+ assertTrue("Expected family [" + Bytes.toString(family) + "] " +
+ "Got family [" + Bytes.toString(key.getFamily()) + "]",
+ equals(family, key.getFamily()));
+ assertTrue("Expected qualifier [" + Bytes.toString(qualifier) + "] " +
+ "Got qualifier [" + Bytes.toString(key.getQualifier()) + "]",
+ equals(qualifier, key.getQualifier()));
+ assertTrue("Expected value [" + value + "] " +
+ "Got value [" + Bytes.toLong(key.getValue()) + "]",
+ Bytes.toLong(key.getValue()) == value);
+ }
+
private void assertNumKeys(Result result, int n) throws Exception {
assertTrue("Expected " + n + " keys but got " + result.size(),
result.size() == n);
@@ -3800,7 +3817,7 @@ public class TestFromClientSide {
table.getConnection().clearRegionCache();
assertEquals("Clearing cache should have 0 cached ", 0,
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
-
+
// A Get is suppose to do a region lookup request
Get g = new Get(Bytes.toBytes("aaa"));
table.get(g);
@@ -3848,5 +3865,76 @@ public class TestFromClientSide {
LOG.info("Finishing testRegionCachePreWarm");
}
+
+ @Test
+ public void testIncrement() throws Exception {
+ LOG.info("Starting testIncrement");
+ final byte [] TABLENAME = Bytes.toBytes("testIncrement");
+ HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
+
+ byte [][] ROWS = new byte [][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
+ Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
+ Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
+ };
+ byte [][] QUALIFIERS = new byte [][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
+ Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
+ Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
+ };
+
+ // Do some simple single-column increments
+
+ // First with old API
+ ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1);
+ ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2);
+ ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3);
+ ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4);
+
+ // Now increment things incremented with old and do some new
+ Increment inc = new Increment(ROW);
+ inc.addColumn(FAMILY, QUALIFIERS[1], 1);
+ inc.addColumn(FAMILY, QUALIFIERS[3], 1);
+ inc.addColumn(FAMILY, QUALIFIERS[4], 1);
+ ht.increment(inc);
+
+ // Verify expected results
+ Result r = ht.get(new Get(ROW));
+ KeyValue [] kvs = r.raw();
+ assertEquals(5, kvs.length);
+ assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1);
+ assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3);
+ assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3);
+ assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5);
+ assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1);
+
+ // Now try multiple columns by different amounts
+ inc = new Increment(ROWS[0]);
+ for (int i=0;i<QUALIFIERS.length;i++) {
+ inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
+ }
+ ht.increment(inc);
+ // Verify
+ r = ht.get(new Get(ROWS[0]));
+ kvs = r.raw();
+ assertEquals(QUALIFIERS.length, kvs.length);
+ for (int i=0;i<QUALIFIERS.length;i++) {
+ assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1);
+ }
+
+ // Re-increment them
+ inc = new Increment(ROWS[0]);
+ for (int i=0;i<QUALIFIERS.length;i++) {
+ inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
+ }
+ ht.increment(inc);
+ // Verify
+ r = ht.get(new Get(ROWS[0]));
+ kvs = r.raw();
+ assertEquals(QUALIFIERS.length, kvs.length);
+ for (int i=0;i<QUALIFIERS.length;i++) {
+ assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1));
+ }
+ }
}