You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/20 21:47:23 UTC

svn commit: r1388183 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: client/Delete.java regionserver/HRegionThriftServer.java thrift/ThriftServerRunner.java

Author: mbautin
Date: Thu Sep 20 19:47:23 2012
New Revision: 1388183

URL: http://svn.apache.org/viewvc?rev=1388183&view=rev
Log:
[HBASE-6836] [89-fb] Parallel deletes in HBase Thrift server

Author: mbautin

Summary: We need to expose server-side parallel batch deletes through the Thrift server.

Test Plan:
Unit tests
C++ client test

Reviewers: aaiyer, nzhang, kranganathan

Reviewed By: aaiyer

Differential Revision: https://reviews.facebook.net/D5541

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1388183&r1=1388182&r2=1388183&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java Thu Sep 20 19:47:23 2012
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Used to perform Delete operations on a single row.
  * <p>
@@ -121,8 +123,6 @@ public class Delete extends Mutation
     this.writeToWAL = d.writeToWAL;
   }
 
-
-
   /**
    * Delete all versions of all columns of the specified family.
    * <p>
@@ -330,4 +330,16 @@ public class Delete extends Mutation
     return map;
   }
 
+  /**
+   * Modify this delete object to delete from all column families in the row at the given
+   * timestamp or older. If column families have been added already, 
+   * an {@link IllegalArgumentException} is thrown.
+   * @param timestamp will delete data at this timestamp or older 
+   */
+  public void deleteRow(long timestamp) {
+    Preconditions.checkArgument(familyMap.isEmpty(), 
+        "Cannot delete entire row, column families already specified");
+    ts = timestamp;
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1388183&r1=1388182&r2=1388183&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Thu Sep 20 19:47:23 2012
@@ -218,6 +218,21 @@ public class HRegionThriftServer extends
       }
     }
 
+    /**
+     * Process a delete request. If the region name is set, using the shortcircuit optimization.
+     */
+    @Override
+    protected void processMultiDelete(ByteBuffer tableName, ByteBuffer regionName,
+        List<Delete> deletes) throws IOException, IOError {
+      if (Bytes.isNonEmpty(regionName)) {
+        metrics.incDirectCalls();
+        rs.delete(Bytes.getBytes(regionName), deletes);
+      } else {
+        metrics.incIndirectCalls();
+        super.processMultiDelete(tableName, regionName, deletes);
+      }
+    }
+
     @Override
     public Map<ByteBuffer, Long> getLastFlushTimes() throws TException {
       Map<ByteBuffer, Long> regionToFlushTime = new HashMap<ByteBuffer, Long>();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1388183&r1=1388182&r2=1388183&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java Thu Sep 20 19:47:23 2012
@@ -398,7 +398,7 @@ public class ThriftServerRunner implemen
   /**
    * Retrieve timestamp from the given mutation Thrift object. If the mutation timestamp is not set
    * or is set to {@link HConstants#LATEST_TIMESTAMP}, the default timestamp is used.
-   * @param m mutation a mutation object optionally specifying a timestamp
+   * @param m a mutation object optionally specifying a timestamp
    * @param defaultTimestamp default timestamp to use if the mutation does not specify timestamp
    * @return the effective mutation timestamp
    */
@@ -834,14 +834,7 @@ public class ThriftServerRunner implemen
     public void deleteAllTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
         long timestamp, ByteBuffer regionName) throws IOError {
       try {
-        Delete delete  = new Delete(getBytes(row));
-        byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
-        if (famAndQf.length == 1) {
-          delete.deleteFamily(famAndQf[0], timestamp);
-        } else {
-          delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
-        }
-        processDelete(tableName, regionName, delete);
+        processDelete(tableName, regionName, createDelete(row, column, timestamp));
       } catch (IOException e) {
         throw convertIOException(e);
       }
@@ -853,6 +846,7 @@ public class ThriftServerRunner implemen
       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, regionName);
     }
 
+    @Override
     public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp,
         ByteBuffer regionName)
         throws IOError {
@@ -909,56 +903,72 @@ public class ThriftServerRunner implemen
           regionName);
     }
 
-    @Override
-    public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
-        List<Mutation> mutations, long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
-    throws IOError, IllegalArgument {
-      try {
-        byte[] rowBytes = getBytes(row);
-        Put put = null;
-        Delete delete = null;
-
-        boolean firstMutation = true;
-        boolean writeToWAL = false;
-        for (Mutation m : mutations) {
-          byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
-          long effectiveTimestamp = getMutationTimestamp(m, timestamp);
-          if (m.isDelete) {
-            if (delete == null) {
-              delete = new Delete(rowBytes);
-            }
-            if (famAndQf.length == 1) {
-              delete.deleteFamily(famAndQf[0], effectiveTimestamp);
-            } else {
-              delete.deleteColumns(famAndQf[0], famAndQf[1], effectiveTimestamp);
-            }
-          } else {
-            if (put == null) {
-              put = new Put(rowBytes, timestamp, null);
-            }
-            put.add(famAndQf[0], getQualifier(famAndQf), effectiveTimestamp, getBytes(m.value));
+    private void mutateRowsHelper(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations,
+        long timestamp, ByteBuffer regionName, List<Put> puts, List<Delete> deletes)
+        throws IllegalArgument, IOError, IOException {
+      byte[] rowBytes = getBytes(row);
+      Put put = null;
+      Delete delete = null;
+
+      boolean firstMutation = true;
+      boolean writeToWAL = false;
+      for (Mutation m : mutations) {
+        byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
+        
+        // If this mutation has timestamp set, it takes precedence, otherwise we use the
+        // timestamp provided in the argument.
+        long effectiveTimestamp = getMutationTimestamp(m, timestamp);
+        
+        if (m.isDelete) {
+          if (delete == null) {
+            delete = new Delete(rowBytes);
+          }
+          updateDelete(delete, famAndQf, effectiveTimestamp);
+        } else {
+          if (put == null) {
+            put = new Put(rowBytes, timestamp, null);
           }
+          put.add(famAndQf[0], getQualifier(famAndQf), effectiveTimestamp, getBytes(m.value));
+        }
 
-          if (firstMutation) {
-            // Remember the first mutation's writeToWAL status.
-            firstMutation = false;
-            writeToWAL = m.writeToWAL;
-          } else {
-            // Make sure writeToWAL status is consistent in all mutations.
-            if (m.writeToWAL != writeToWAL) {
-              throw new IllegalArgument("Mutations with contradicting writeToWal settings");
-            }
+        if (firstMutation) {
+          // Remember the first mutation's writeToWAL status.
+          firstMutation = false;
+          writeToWAL = m.writeToWAL;
+        } else {
+          // Make sure writeToWAL status is consistent in all mutations.
+          if (m.writeToWAL != writeToWAL) {
+            throw new IllegalArgument("Mutations with contradicting writeToWal settings");
           }
         }
-        if (delete != null) {
-          delete.setWriteToWAL(writeToWAL);
+      }
+      
+      if (delete != null) {
+        delete.setWriteToWAL(writeToWAL);
+        if (deletes != null) {
+          deletes.add(delete);
+        } else {
           processDelete(tableName, regionName, delete);
         }
-        if (put != null) {
-          put.setWriteToWAL(writeToWAL);
+      }
+      
+      if (put != null) {
+        put.setWriteToWAL(writeToWAL);
+        if (puts != null) {
+          puts.add(put);
+        } else {
           processPut(tableName, regionName, put);
         }
+      }
+    }
+
+    @Override
+    public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
+        List<Mutation> mutations, long timestamp,
+        Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
+    throws IOError, IllegalArgument {
+      try {
+        mutateRowsHelper(tableName, row, mutations, timestamp, regionName, null, null);
       } catch (IOException e) {
         throw convertIOException(e);
       } catch (IllegalArgumentException e) {
@@ -979,80 +989,23 @@ public class ThriftServerRunner implemen
         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
         Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
         throws IOError, IllegalArgument, TException {
-      List<Put> puts = null;
-      List<Delete> deletes = null;
+      List<Put> puts = new ArrayList<Put>();
+      List<Delete> deletes = new ArrayList<Delete>();
       if (metrics != null) {
         metrics.incNumBatchMutateRowKeys(rowBatches.size());
       }
 
-      for (BatchMutation batch : rowBatches) {
-        byte[] row = getBytes(batch.row);
-        List<Mutation> mutations = batch.mutations;
-        Delete delete = null;
-        Put put = null;
-        boolean firstMutation = true;
-        boolean writeToWAL = false;
-        for (Mutation m : mutations) {
-          byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
-
-          // If this mutation has timestamp set, it takes precedence, otherwise we use the
-          // timestamp provided in the argument.
-          long effectiveTimestamp = getMutationTimestamp(m, timestamp);
-
-          if (m.isDelete) {
-            if (delete == null) {
-              delete = new Delete(row);
-            }
-            // no qualifier, family only.
-            if (famAndQf.length == 1) {
-              delete.deleteFamily(famAndQf[0], effectiveTimestamp);
-            } else {
-              delete.deleteColumns(famAndQf[0], famAndQf[1], effectiveTimestamp);
-            }
-          } else {
-            if (put == null) {
-              put = new Put(row, timestamp, null);
-            }
-            put.add(famAndQf[0], getQualifier(famAndQf), effectiveTimestamp,
-                getBytes(m.value));
-          }
-          if (firstMutation) {
-            // Remember the first mutation's writeToWAL status.
-            firstMutation = false;
-            writeToWAL = m.writeToWAL;
-          } else {
-            // Make sure writeToWAL status is consistent in all mutations in this batch.
-            if (m.writeToWAL != writeToWAL) {
-              throw new IllegalArgument("Mutations with contradicting writeToWal settings in " +
-                  "the same batch");
-            }
-          }
-        }
-        if (delete != null) {
-          delete.setWriteToWAL(writeToWAL);
-          if (deletes == null) {
-            deletes = new ArrayList<Delete>();
-          }
-          deletes.add(delete);
-        }
-        if (put != null) {
-          put.setWriteToWAL(writeToWAL);
-          if (puts == null) {
-            puts = new ArrayList<Put>();
-          }
-          puts.add(put);
+      try {
+        for (BatchMutation batch : rowBatches) {
+          mutateRowsHelper(tableName, batch.row, batch.mutations, timestamp, regionName,
+              puts, deletes);
         }
-      }
 
-      HTable table = null;
-      try {
         if (puts != null) {
           processMultiPut(tableName, regionName, puts);
         }
         if (deletes != null) {
-          for (Delete del : deletes) {
-            processDelete(tableName, regionName, del);
-          }
+          processMultiDelete(tableName, regionName, deletes);
         }
       } catch (IOException e) {
         throw convertIOException(e);
@@ -1099,11 +1052,7 @@ public class ThriftServerRunner implemen
 
           byte[][] famAndQf = KeyValue.parseColumn(Bytes.toBytesRemaining(m.column));
           if (m.isDelete) {
-            if (famAndQf.length == 1) {
-              delete.deleteFamily(famAndQf[0], effectiveTimestamp);
-            } else {
-              delete.deleteColumns(famAndQf[0], famAndQf[1], effectiveTimestamp);
-            }
+            updateDelete(delete, famAndQf, effectiveTimestamp);
           } else {
             byte[] valueBytes = getBytes(m.value);
             put.add(famAndQf[0], getQualifier(famAndQf), effectiveTimestamp, valueBytes);
@@ -1521,14 +1470,17 @@ public class ThriftServerRunner implemen
 
     protected Result[] processMultiGet(ByteBuffer tableName, ByteBuffer regionName, List<Get> gets)
         throws IOException, IOError {
-      HTable table = getTable(tableName);
-      return table.get(gets);
+      return getTable(tableName).get(gets);
     }
 
     protected void processMultiPut(ByteBuffer tableName, ByteBuffer regionName, List<Put> puts)
         throws IOException, IOError {
-      HTable table = getTable(tableName);
-      table.put(puts);
+      getTable(tableName).put(puts);
+    }
+
+    protected void processMultiDelete(ByteBuffer tableName, ByteBuffer regionName,
+        List<Delete> deletes) throws IOException, IOError {
+      getTable(tableName).delete(deletes);
     }
 
     @Override
@@ -1575,4 +1527,35 @@ public class ThriftServerRunner implemen
     }
     return HConstants.EMPTY_BYTE_ARRAY;
   }
+
+  /**
+   * Update the given delete object.
+   * 
+   * @param delete the delete object to update
+   * @param famAndQf family and qualifier. null or empty family means "delete from all CFs".
+   * @param timestamp Delete at this timestamp and older.
+   */
+  private static void updateDelete(Delete delete, byte[][] famAndQf, long timestamp) { 
+    if (famAndQf.length == 1) {
+      // Column qualifier not specified.
+      if (famAndQf[0].length == 0) {
+        // Delete from all column families in the row. 
+        delete.deleteRow(timestamp);
+      } else {
+        // Delete from all columns in the given column family
+        delete.deleteFamily(famAndQf[0], timestamp);
+      }
+    } else {
+      // Delete only from the specific column
+      delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
+    }
+  }
+
+  private static Delete createDelete(ByteBuffer row, ByteBuffer column, long timestamp) {
+    Delete delete  = new Delete(getBytes(row));
+    byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+    updateDelete(delete, famAndQf, timestamp);
+    return delete;
+  }
+
 }