You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/30 00:31:55 UTC

svn commit: r1344034 [1/3] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/thrift/ main/java/org/apache/hadoop/hbase/thrift/generated/ main/resources/org/apache/hadoop/hbase/thrift/ test/java/org/apache/hadoop/hbase/thrift/

Author: stack
Date: Tue May 29 22:31:54 2012
New Revision: 1344034

URL: http://svn.apache.org/viewvc?rev=1344034&view=rev
Log:
HBASE-6043 Add Increment Coalescing in thrift.

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRowResult.java
    hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1344034&r1=1344033&r2=1344034&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java Tue May 29 22:31:54 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.thrift.ge
 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
 import org.apache.hadoop.hbase.thrift.generated.Mutation;
 import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TIncrement;
 import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
 import org.apache.hadoop.hbase.thrift.generated.TScan;
@@ -118,6 +120,7 @@ public class ThriftServerRunner implemen
   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
+  static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
 
   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
   public static final int DEFAULT_LISTEN_PORT = 9090;
@@ -411,6 +414,8 @@ public class ThriftServerRunner implemen
       }
     };
 
+    IncrementCoalescer coalescer = null;
+
     /**
      * Returns a list of all the column families for a given htable.
      *
@@ -437,7 +442,7 @@ public class ThriftServerRunner implemen
      * @throws IOException
      * @throws IOError
      */
-    protected HTable getTable(final byte[] tableName) throws
+    public HTable getTable(final byte[] tableName) throws
         IOException {
       String table = new String(tableName);
       Map<String, HTable> tables = threadLocalTables.get();
@@ -447,7 +452,7 @@ public class ThriftServerRunner implemen
       return tables.get(table);
     }
 
-    protected HTable getTable(final ByteBuffer tableName) throws IOException {
+    public HTable getTable(final ByteBuffer tableName) throws IOException {
       return getTable(getBytes(tableName));
     }
 
@@ -497,6 +502,7 @@ public class ThriftServerRunner implemen
     protected HBaseHandler(final Configuration c) throws IOException {
       this.conf = c;
       scannerMap = new HashMap<Integer, ResultScanner>();
+      this.coalescer = new IncrementCoalescer(this);
     }
 
     /**
@@ -1399,7 +1405,43 @@ public class ThriftServerRunner implemen
     private void initMetrics(ThriftMetrics metrics) {
       this.metrics = metrics;
     }
+
+    @Override
+    public void increment(TIncrement tincrement) throws IOError, TException {
+
+      if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
+        throw new TException("Must supply a table and a row key; can't increment");
+      }
+
+      if (conf.getBoolean(COALESCE_INC_KEY, false)) {
+        this.coalescer.queueIncrement(tincrement);
+        return;
+      }
+
+      try {
+        HTable table = getTable(tincrement.getTable());
+        Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
+        table.increment(inc);
+      } catch (IOException e) {
+        LOG.warn(e.getMessage(), e);
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
+      if (conf.getBoolean(COALESCE_INC_KEY, false)) {
+        this.coalescer.queueIncrements(tincrements);
+        return;
+      }
+      for (TIncrement tinc : tincrements) {
+        increment(tinc);
+      }
+    }
   }
+
+
+
   /**
    * Adds all the attributes into the Operation object
    */

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java?rev=1344034&r1=1344033&r2=1344034&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java Tue May 29 22:31:54 2012
@@ -26,6 +26,7 @@ import java.util.TreeMap;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
 import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TIncrement;
 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -156,4 +158,18 @@ public class ThriftUtilities {
     Result [] result = { in };
     return rowResultFromHBase(result);
   }
+
+  /**
+   * From a {@link TIncrement} create an {@link Increment}.
+   * @param tincrement the Thrift version of an increment
+   * @return an increment that the {@link TIncrement} represented.
+   */
+  public static Increment incrementFromThrift(TIncrement tincrement) {
+    Increment inc = new Increment(tincrement.getRow());
+    byte[][] famAndQf = KeyValue.parseColumn(tincrement.getColumn());
+    if (famAndQf.length <1 ) return null;
+    byte[] qual = famAndQf.length == 1 ? new byte[0]: famAndQf[1];
+    inc.addColumn(famAndQf[0], qual, tincrement.getAmmount());
+    return inc;
+  }
 }