You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/20 21:47:23 UTC
svn commit: r1388183 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase:
client/Delete.java regionserver/HRegionThriftServer.java
thrift/ThriftServerRunner.java
Author: mbautin
Date: Thu Sep 20 19:47:23 2012
New Revision: 1388183
URL: http://svn.apache.org/viewvc?rev=1388183&view=rev
Log:
[HBASE-6836] [89-fb] Parallel deletes in HBase Thrift server
Author: mbautin
Summary: We need to expose server-side parallel batch deletes through the Thrift server.
Test Plan:
Unit tests
C++ client test
Reviewers: aaiyer, nzhang, kranganathan
Reviewed By: aaiyer
Differential Revision: https://reviews.facebook.net/D5541
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1388183&r1=1388182&r2=1388183&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Delete.java Thu Sep 20 19:47:23 2012
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
+import com.google.common.base.Preconditions;
+
/**
* Used to perform Delete operations on a single row.
* <p>
@@ -121,8 +123,6 @@ public class Delete extends Mutation
this.writeToWAL = d.writeToWAL;
}
-
-
/**
* Delete all versions of all columns of the specified family.
* <p>
@@ -330,4 +330,16 @@ public class Delete extends Mutation
return map;
}
+ /**
+ * Modify this delete object to delete from all column families in the row at the given
+ * timestamp or older. If column families have been added already,
+ * an {@link IllegalArgumentException} is thrown.
+ * @param timestamp will delete data at this timestamp or older
+ */
+ public void deleteRow(long timestamp) {
+ Preconditions.checkArgument(familyMap.isEmpty(),
+ "Cannot delete entire row, column families already specified");
+ ts = timestamp;
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1388183&r1=1388182&r2=1388183&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Thu Sep 20 19:47:23 2012
@@ -218,6 +218,21 @@ public class HRegionThriftServer extends
}
}
+ /**
+ * Process a delete request. If the region name is set, using the shortcircuit optimization.
+ */
+ @Override
+ protected void processMultiDelete(ByteBuffer tableName, ByteBuffer regionName,
+ List<Delete> deletes) throws IOException, IOError {
+ if (Bytes.isNonEmpty(regionName)) {
+ metrics.incDirectCalls();
+ rs.delete(Bytes.getBytes(regionName), deletes);
+ } else {
+ metrics.incIndirectCalls();
+ super.processMultiDelete(tableName, regionName, deletes);
+ }
+ }
+
@Override
public Map<ByteBuffer, Long> getLastFlushTimes() throws TException {
Map<ByteBuffer, Long> regionToFlushTime = new HashMap<ByteBuffer, Long>();
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1388183&r1=1388182&r2=1388183&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java Thu Sep 20 19:47:23 2012
@@ -398,7 +398,7 @@ public class ThriftServerRunner implemen
/**
* Retrieve timestamp from the given mutation Thrift object. If the mutation timestamp is not set
* or is set to {@link HConstants#LATEST_TIMESTAMP}, the default timestamp is used.
- * @param m mutation a mutation object optionally specifying a timestamp
+ * @param m a mutation object optionally specifying a timestamp
* @param defaultTimestamp default timestamp to use if the mutation does not specify timestamp
* @return the effective mutation timestamp
*/
@@ -834,14 +834,7 @@ public class ThriftServerRunner implemen
public void deleteAllTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
long timestamp, ByteBuffer regionName) throws IOError {
try {
- Delete delete = new Delete(getBytes(row));
- byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], timestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
- }
- processDelete(tableName, regionName, delete);
+ processDelete(tableName, regionName, createDelete(row, column, timestamp));
} catch (IOException e) {
throw convertIOException(e);
}
@@ -853,6 +846,7 @@ public class ThriftServerRunner implemen
deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, regionName);
}
+ @Override
public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp,
ByteBuffer regionName)
throws IOError {
@@ -909,56 +903,72 @@ public class ThriftServerRunner implemen
regionName);
}
- @Override
- public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
- List<Mutation> mutations, long timestamp,
- Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
- throws IOError, IllegalArgument {
- try {
- byte[] rowBytes = getBytes(row);
- Put put = null;
- Delete delete = null;
-
- boolean firstMutation = true;
- boolean writeToWAL = false;
- for (Mutation m : mutations) {
- byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
- long effectiveTimestamp = getMutationTimestamp(m, timestamp);
- if (m.isDelete) {
- if (delete == null) {
- delete = new Delete(rowBytes);
- }
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], effectiveTimestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], effectiveTimestamp);
- }
- } else {
- if (put == null) {
- put = new Put(rowBytes, timestamp, null);
- }
- put.add(famAndQf[0], getQualifier(famAndQf), effectiveTimestamp, getBytes(m.value));
+ private void mutateRowsHelper(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations,
+ long timestamp, ByteBuffer regionName, List<Put> puts, List<Delete> deletes)
+ throws IllegalArgument, IOError, IOException {
+ byte[] rowBytes = getBytes(row);
+ Put put = null;
+ Delete delete = null;
+
+ boolean firstMutation = true;
+ boolean writeToWAL = false;
+ for (Mutation m : mutations) {
+ byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
+
+ // If this mutation has timestamp set, it takes precedence, otherwise we use the
+ // timestamp provided in the argument.
+ long effectiveTimestamp = getMutationTimestamp(m, timestamp);
+
+ if (m.isDelete) {
+ if (delete == null) {
+ delete = new Delete(rowBytes);
+ }
+ updateDelete(delete, famAndQf, effectiveTimestamp);
+ } else {
+ if (put == null) {
+ put = new Put(rowBytes, timestamp, null);
}
+ put.add(famAndQf[0], getQualifier(famAndQf), effectiveTimestamp, getBytes(m.value));
+ }
- if (firstMutation) {
- // Remember the first mutation's writeToWAL status.
- firstMutation = false;
- writeToWAL = m.writeToWAL;
- } else {
- // Make sure writeToWAL status is consistent in all mutations.
- if (m.writeToWAL != writeToWAL) {
- throw new IllegalArgument("Mutations with contradicting writeToWal settings");
- }
+ if (firstMutation) {
+ // Remember the first mutation's writeToWAL status.
+ firstMutation = false;
+ writeToWAL = m.writeToWAL;
+ } else {
+ // Make sure writeToWAL status is consistent in all mutations.
+ if (m.writeToWAL != writeToWAL) {
+ throw new IllegalArgument("Mutations with contradicting writeToWal settings");
}
}
- if (delete != null) {
- delete.setWriteToWAL(writeToWAL);
+ }
+
+ if (delete != null) {
+ delete.setWriteToWAL(writeToWAL);
+ if (deletes != null) {
+ deletes.add(delete);
+ } else {
processDelete(tableName, regionName, delete);
}
- if (put != null) {
- put.setWriteToWAL(writeToWAL);
+ }
+
+ if (put != null) {
+ put.setWriteToWAL(writeToWAL);
+ if (puts != null) {
+ puts.add(put);
+ } else {
processPut(tableName, regionName, put);
}
+ }
+ }
+
+ @Override
+ public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
+ List<Mutation> mutations, long timestamp,
+ Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
+ throws IOError, IllegalArgument {
+ try {
+ mutateRowsHelper(tableName, row, mutations, timestamp, regionName, null, null);
} catch (IOException e) {
throw convertIOException(e);
} catch (IllegalArgumentException e) {
@@ -979,80 +989,23 @@ public class ThriftServerRunner implemen
ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes, ByteBuffer regionName)
throws IOError, IllegalArgument, TException {
- List<Put> puts = null;
- List<Delete> deletes = null;
+ List<Put> puts = new ArrayList<Put>();
+ List<Delete> deletes = new ArrayList<Delete>();
if (metrics != null) {
metrics.incNumBatchMutateRowKeys(rowBatches.size());
}
- for (BatchMutation batch : rowBatches) {
- byte[] row = getBytes(batch.row);
- List<Mutation> mutations = batch.mutations;
- Delete delete = null;
- Put put = null;
- boolean firstMutation = true;
- boolean writeToWAL = false;
- for (Mutation m : mutations) {
- byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
-
- // If this mutation has timestamp set, it takes precedence, otherwise we use the
- // timestamp provided in the argument.
- long effectiveTimestamp = getMutationTimestamp(m, timestamp);
-
- if (m.isDelete) {
- if (delete == null) {
- delete = new Delete(row);
- }
- // no qualifier, family only.
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], effectiveTimestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], effectiveTimestamp);
- }
- } else {
- if (put == null) {
- put = new Put(row, timestamp, null);
- }
- put.add(famAndQf[0], getQualifier(famAndQf), effectiveTimestamp,
- getBytes(m.value));
- }
- if (firstMutation) {
- // Remember the first mutation's writeToWAL status.
- firstMutation = false;
- writeToWAL = m.writeToWAL;
- } else {
- // Make sure writeToWAL status is consistent in all mutations in this batch.
- if (m.writeToWAL != writeToWAL) {
- throw new IllegalArgument("Mutations with contradicting writeToWal settings in " +
- "the same batch");
- }
- }
- }
- if (delete != null) {
- delete.setWriteToWAL(writeToWAL);
- if (deletes == null) {
- deletes = new ArrayList<Delete>();
- }
- deletes.add(delete);
- }
- if (put != null) {
- put.setWriteToWAL(writeToWAL);
- if (puts == null) {
- puts = new ArrayList<Put>();
- }
- puts.add(put);
+ try {
+ for (BatchMutation batch : rowBatches) {
+ mutateRowsHelper(tableName, batch.row, batch.mutations, timestamp, regionName,
+ puts, deletes);
}
- }
- HTable table = null;
- try {
if (puts != null) {
processMultiPut(tableName, regionName, puts);
}
if (deletes != null) {
- for (Delete del : deletes) {
- processDelete(tableName, regionName, del);
- }
+ processMultiDelete(tableName, regionName, deletes);
}
} catch (IOException e) {
throw convertIOException(e);
@@ -1099,11 +1052,7 @@ public class ThriftServerRunner implemen
byte[][] famAndQf = KeyValue.parseColumn(Bytes.toBytesRemaining(m.column));
if (m.isDelete) {
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], effectiveTimestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], effectiveTimestamp);
- }
+ updateDelete(delete, famAndQf, effectiveTimestamp);
} else {
byte[] valueBytes = getBytes(m.value);
put.add(famAndQf[0], getQualifier(famAndQf), effectiveTimestamp, valueBytes);
@@ -1521,14 +1470,17 @@ public class ThriftServerRunner implemen
protected Result[] processMultiGet(ByteBuffer tableName, ByteBuffer regionName, List<Get> gets)
throws IOException, IOError {
- HTable table = getTable(tableName);
- return table.get(gets);
+ return getTable(tableName).get(gets);
}
protected void processMultiPut(ByteBuffer tableName, ByteBuffer regionName, List<Put> puts)
throws IOException, IOError {
- HTable table = getTable(tableName);
- table.put(puts);
+ getTable(tableName).put(puts);
+ }
+
+ protected void processMultiDelete(ByteBuffer tableName, ByteBuffer regionName,
+ List<Delete> deletes) throws IOException, IOError {
+ getTable(tableName).delete(deletes);
}
@Override
@@ -1575,4 +1527,35 @@ public class ThriftServerRunner implemen
}
return HConstants.EMPTY_BYTE_ARRAY;
}
+
+ /**
+ * Update the given delete object.
+ *
+ * @param delete the delete object to update
+ * @param famAndQf family and qualifier. null or empty family means "delete from all CFs".
+ * @param timestamp Delete at this timestamp and older.
+ */
+ private static void updateDelete(Delete delete, byte[][] famAndQf, long timestamp) {
+ if (famAndQf.length == 1) {
+ // Column qualifier not specified.
+ if (famAndQf[0].length == 0) {
+ // Delete from all column families in the row.
+ delete.deleteRow(timestamp);
+ } else {
+ // Delete from all columns in the given column family
+ delete.deleteFamily(famAndQf[0], timestamp);
+ }
+ } else {
+ // Delete only from the specific column
+ delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
+ }
+ }
+
+ private static Delete createDelete(ByteBuffer row, ByteBuffer column, long timestamp) {
+ Delete delete = new Delete(getBytes(row));
+ byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+ updateDelete(delete, famAndQf, timestamp);
+ return delete;
+ }
+
}