You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/30 00:31:55 UTC
svn commit: r1344034 [1/3] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/thrift/
main/java/org/apache/hadoop/hbase/thrift/generated/
main/resources/org/apache/hadoop/hbase/thrift/
test/java/org/apache/hadoop/hbase/thrift/
Author: stack
Date: Tue May 29 22:31:54 2012
New Revision: 1344034
URL: http://svn.apache.org/viewvc?rev=1344034&view=rev
Log:
HBASE-6043 Add Increment Coalescing in thrift.
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRowResult.java
hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1344034&r1=1344033&r2=1344034&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java Tue May 29 22:31:54 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.thrift.ge
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.Mutation;
import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.hadoop.hbase.thrift.generated.TScan;
@@ -118,6 +120,7 @@ public class ThriftServerRunner implemen
static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
+ static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
public static final int DEFAULT_LISTEN_PORT = 9090;
@@ -411,6 +414,8 @@ public class ThriftServerRunner implemen
}
};
+ IncrementCoalescer coalescer = null;
+
/**
* Returns a list of all the column families for a given htable.
*
@@ -437,7 +442,7 @@ public class ThriftServerRunner implemen
* @throws IOException
* @throws IOError
*/
- protected HTable getTable(final byte[] tableName) throws
+ public HTable getTable(final byte[] tableName) throws
IOException {
String table = new String(tableName);
Map<String, HTable> tables = threadLocalTables.get();
@@ -447,7 +452,7 @@ public class ThriftServerRunner implemen
return tables.get(table);
}
- protected HTable getTable(final ByteBuffer tableName) throws IOException {
+ public HTable getTable(final ByteBuffer tableName) throws IOException {
return getTable(getBytes(tableName));
}
@@ -497,6 +502,7 @@ public class ThriftServerRunner implemen
protected HBaseHandler(final Configuration c) throws IOException {
this.conf = c;
scannerMap = new HashMap<Integer, ResultScanner>();
+ this.coalescer = new IncrementCoalescer(this);
}
/**
@@ -1399,7 +1405,43 @@ public class ThriftServerRunner implemen
private void initMetrics(ThriftMetrics metrics) {
this.metrics = metrics;
}
+
+ @Override
+ public void increment(TIncrement tincrement) throws IOError, TException {
+
+ if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
+ throw new TException("Must supply a table and a row key; can't increment");
+ }
+
+ if (conf.getBoolean(COALESCE_INC_KEY, false)) {
+ this.coalescer.queueIncrement(tincrement);
+ return;
+ }
+
+ try {
+ HTable table = getTable(tincrement.getTable());
+ Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
+ table.increment(inc);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
+ if (conf.getBoolean(COALESCE_INC_KEY, false)) {
+ this.coalescer.queueIncrements(tincrements);
+ return;
+ }
+ for (TIncrement tinc : tincrements) {
+ increment(tinc);
+ }
+ }
}
+
+
+
/**
* Adds all the attributes into the Operation object
*/
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java?rev=1344034&r1=1344033&r2=1344034&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java Tue May 29 22:31:54 2012
@@ -26,6 +26,7 @@ import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.hadoop.hbase.util.Bytes;
@@ -156,4 +158,18 @@ public class ThriftUtilities {
Result [] result = { in };
return rowResultFromHBase(result);
}
+
+ /**
+ * From a {@link TIncrement} create an {@link Increment}.
+ * @param tincrement the Thrift version of an increment
+ * @return an increment that the {@link TIncrement} represented.
+ */
+ public static Increment incrementFromThrift(TIncrement tincrement) {
+ Increment inc = new Increment(tincrement.getRow());
+ byte[][] famAndQf = KeyValue.parseColumn(tincrement.getColumn());
+ if (famAndQf.length <1 ) return null;
+ byte[] qual = famAndQf.length == 1 ? new byte[0]: famAndQf[1];
+ inc.addColumn(famAndQf[0], qual, tincrement.getAmmount());
+ return inc;
+ }
}