You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/10/23 18:04:46 UTC
svn commit: r1401332 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/coprocessor/example/
test/java/org/apache/hadoop/hbase/coprocessor/example/
Author: tedyu
Date: Tue Oct 23 16:04:46 2012
New Revision: 1401332
URL: http://svn.apache.org/viewvc?rev=1401332&view=rev
Log:
HBASE-6942 Endpoint implementation for bulk delete rows (Anoop)
Added:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java?rev=1401332&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java Tue Oct 23 16:04:46 2012
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkDeleteProtocol {
+ private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete";
+ private static final Log LOG = LogFactory.getLog(BulkDeleteEndpoint.class);
+
+ @Override
+ public BulkDeleteResponse delete(Scan scan, byte deleteType, Long timestamp,
+ int rowBatchSize) {
+ long totalRowsDeleted = 0L;
+ long totalVersionsDeleted = 0L;
+ BulkDeleteResponse response = new BulkDeleteResponse();
+ HRegion region = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
+ boolean hasMore = true;
+ RegionScanner scanner = null;
+ if (scan.getFilter() == null && deleteType == DeleteType.ROW) {
+ // What we need is just the rowkeys. So only 1st KV from any row is enough.
+ // Only when it is a row delete, we can apply this filter
+ // In other types we rely on the scan to know which all columns to be deleted.
+ scan.setFilter(new FirstKeyOnlyFilter());
+ }
+ // When the delete is based on some conditions so that Filters are available in the scan,
+ // we assume that the scan is perfect having necessary column(s) only.
+ try {
+ scanner = region.getScanner(scan);
+ while (hasMore) {
+ List<List<KeyValue>> deleteRows = new ArrayList<List<KeyValue>>(rowBatchSize);
+ for (int i = 0; i < rowBatchSize; i++) {
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ hasMore = scanner.next(results);
+ if (results.size() > 0) {
+ deleteRows.add(results);
+ }
+ if (!hasMore) {
+ // There are no more rows.
+ break;
+ }
+ }
+ if (deleteRows.size() > 0) {
+ Pair<Mutation, Integer>[] deleteWithLockArr = new Pair[deleteRows.size()];
+ int i = 0;
+ for (List<KeyValue> deleteRow : deleteRows) {
+ Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp);
+ deleteWithLockArr[i++] = new Pair<Mutation, Integer>(delete, null);
+ }
+ OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr);
+ for (i = 0; i < opStatus.length; i++) {
+ if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
+ break;
+ }
+ totalRowsDeleted++;
+ if (deleteType == DeleteType.VERSION) {
+ byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute(
+ NO_OF_VERSIONS_TO_DELETE);
+ if (versionsDeleted != null) {
+ totalVersionsDeleted += Bytes.toInt(versionsDeleted);
+ }
+ }
+ }
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.error(ioe);
+ response.setIoException(ioe);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException ioe) {
+ LOG.error(ioe);
+ }
+ }
+ }
+ response.setRowsDeleted(totalRowsDeleted);
+ response.setVersionsDeleted(totalVersionsDeleted);
+ return response;
+ }
+
+ private Delete createDeleteMutation(List<KeyValue> deleteRow, byte deleteType, Long timestamp) {
+ long ts;
+ if (timestamp == null) {
+ ts = HConstants.LATEST_TIMESTAMP;
+ } else {
+ ts = timestamp;
+ }
+ // We just need the rowkey. Get it from 1st KV.
+ byte[] row = deleteRow.get(0).getRow();
+ Delete delete = new Delete(row, ts, null);
+ if (deleteType != DeleteType.ROW) {
+ switch (deleteType) {
+ case DeleteType.FAMILY:
+ Set<byte[]> families = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ for (KeyValue kv : deleteRow) {
+ if (families.add(kv.getFamily())) {
+ delete.deleteFamily(kv.getFamily(), ts);
+ }
+ }
+ break;
+
+ case DeleteType.COLUMN:
+ Set<Column> columns = new HashSet<Column>();
+ for (KeyValue kv : deleteRow) {
+ Column column = new Column(kv.getFamily(), kv.getQualifier());
+ if (columns.add(column)) {
+ // Making deleteColumns() calls more than once for the same cf:qualifier is not correct
+ // Every call to deleteColumns() will add a new KV to the familymap which will finally
+ // get written to the memstore as part of delete().
+ delete.deleteColumns(column.family, column.qualifier, ts);
+ }
+ }
+ break;
+
+ case DeleteType.VERSION:
+ // When some timestamp was passed to the delete() call only one version of the column (with
+ // given timestamp) will be deleted. If no timestamp passed, it will delete N versions.
+ // How many versions will get deleted depends on the Scan being passed. All the KVs that
+ // the scan fetched will get deleted.
+ int noOfVersionsToDelete = 0;
+ if (timestamp == null) {
+ for (KeyValue kv : deleteRow) {
+ delete.deleteColumn(kv.getFamily(), kv.getQualifier(), kv.getTimestamp());
+ noOfVersionsToDelete++;
+ }
+ } else {
+ columns = new HashSet<Column>();
+ for (KeyValue kv : deleteRow) {
+ Column column = new Column(kv.getFamily(), kv.getQualifier());
+ // Only one version of particular column getting deleted.
+ if (columns.add(column)) {
+ delete.deleteColumn(column.family, column.qualifier, ts);
+ noOfVersionsToDelete++;
+ }
+ }
+ }
+ delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete));
+ }
+ }
+ return delete;
+ }
+
+ private static class Column {
+ private byte[] family;
+ private byte[] qualifier;
+
+ public Column(byte[] family, byte[] qualifier) {
+ this.family = family;
+ this.qualifier = qualifier;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Column)) {
+ return false;
+ }
+ Column column = (Column) other;
+ return Bytes.equals(this.family, column.family)
+ && Bytes.equals(this.qualifier, column.qualifier);
+ }
+
+ @Override
+ public int hashCode() {
+ int h = 31;
+ h = h + 13 * Bytes.hashCode(this.family);
+ h = h + 13 * Bytes.hashCode(this.qualifier);
+ return h;
+ }
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java?rev=1401332&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java Tue Oct 23 16:04:46 2012
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+/**
+ * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with
+ * conditions(filters) etc.
+ * </br> Example: <code><pre>
+ * Scan scan = new Scan();
+ * // set scan properties(rowkey range, filters, timerange etc).
+ * HTable ht = ...;
+ * long noOfDeletedRows = 0L;
+ * Batch.Call<BulkDeleteProtocol, BulkDeleteResponse> callable =
+ * new Batch.Call<BulkDeleteProtocol, BulkDeleteResponse>() {
+ * public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException {
+ * return instance.deleteRows(scan, BulkDeleteProtocol.DeleteType, timestamp, rowBatchSize);
+ * }
+ * };
+ * Map<byte[], BulkDeleteResponse> result = ht.coprocessorExec(BulkDeleteProtocol.class,
+ * scan.getStartRow(), scan.getStopRow(), callable);
+ * for (BulkDeleteResponse response : result.values()) {
+ * noOfDeletedRows = response.getRowsDeleted();
+ * }
+ * </pre></code>
+ */
+public interface BulkDeleteProtocol extends CoprocessorProtocol {
+
+ public interface DeleteType {
+ /**
+ * Delete full row
+ */
+ byte ROW = 0;
+ /**
+ * Delete full family(s).
+ * Which family(s) to be deleted will be determined by the Scan.
+ * Scan need to select all the families which need to be deleted.
+ */
+ byte FAMILY = 1;
+ /**
+ * Delete full column(s).
+ * Which column(s) to be deleted will be determined by the Scan.
+ * Scan need to select all the qualifiers which need to be deleted.
+ */
+ byte COLUMN = 2;
+ /**
+ * Delete one or more version(s) of column(s).
+ * Which column(s) and version(s) to be deleted will be determined by the Scan.
+ * Scan need to select all the qualifiers and its versions which need to be deleted.
+ * When a timestamp is passed only one version at that timestamp will be deleted(even if scan
+ * fetches many versions)
+ */
+ byte VERSION = 3;
+ }
+
+ /**
+ *
+ * @param scan
+ * @param deleteType
+ * @param timestamp
+ * @param rowBatchSize
+ * The number of rows which need to be accumulated by scan and delete as one batch
+ * @return
+ */
+ BulkDeleteResponse delete(Scan scan, byte deleteType, Long timestamp, int rowBatchSize);
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java?rev=1401332&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java Tue Oct 23 16:04:46 2012
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Wrapper class which returns the result of the bulk deletion operation happened at the server for
+ * a region. This includes the total number of rows deleted and/or any {@link IOException} which is
+ * happened while doing the operation. It will also include total number of versions deleted, when
+ * the delete type is VERSION.
+ */
+public class BulkDeleteResponse implements Serializable {
+ private static final long serialVersionUID = -8192337710525997237L;
+ private long rowsDeleted;
+ private IOException ioException;
+ private long versionsDeleted;
+
+ public BulkDeleteResponse() {
+
+ }
+
+ public void setRowsDeleted(long rowsDeleted) {
+ this.rowsDeleted = rowsDeleted;
+ }
+
+ public long getRowsDeleted() {
+ return rowsDeleted;
+ }
+
+ public void setIoException(IOException ioException) {
+ this.ioException = ioException;
+ }
+
+ public IOException getIoException() {
+ return ioException;
+ }
+
+ public long getVersionsDeleted() {
+ return versionsDeleted;
+ }
+
+ public void setVersionsDeleted(long versionsDeleted) {
+ this.versionsDeleted = versionsDeleted;
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java?rev=1401332&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java Tue Oct 23 16:04:46 2012
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.example.BulkDeleteProtocol.DeleteType;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestBulkDeleteProtocol {
+ private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
+ private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
+ private static final byte[] QUALIFIER1 = Bytes.toBytes("c1");
+ private static final byte[] QUALIFIER2 = Bytes.toBytes("c2");
+ private static final byte[] QUALIFIER3 = Bytes.toBytes("c3");
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ BulkDeleteEndpoint.class.getName());
+ TEST_UTIL.startMiniCluster(2);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testBulkDeleteEndpoint() throws Throwable {
+ byte[] tableName = Bytes.toBytes("testBulkDeleteEndpoint");
+ HTable ht = createTable(tableName);
+ List<Put> puts = new ArrayList<Put>(100);
+ for (int j = 0; j < 100; j++) {
+ byte[] rowkey = Bytes.toBytes(j);
+ puts.add(createPut(rowkey, "v1"));
+ }
+ ht.put(puts);
+ // Deleting all the rows.
+ long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 500, DeleteType.ROW,
+ null);
+ assertEquals(100, noOfRowsDeleted);
+
+ int rows = 0;
+ for (Result result : ht.getScanner(new Scan())) {
+ rows++;
+ }
+ assertEquals(0, rows);
+ }
+
+ @Test
+ public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion()
+ throws Throwable {
+ byte[] tableName = Bytes
+ .toBytes("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion");
+ HTable ht = createTable(tableName);
+ List<Put> puts = new ArrayList<Put>(100);
+ for (int j = 0; j < 100; j++) {
+ byte[] rowkey = Bytes.toBytes(j);
+ puts.add(createPut(rowkey, "v1"));
+ }
+ ht.put(puts);
+ // Deleting all the rows.
+ long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null);
+ assertEquals(100, noOfRowsDeleted);
+
+ int rows = 0;
+ for (Result result : ht.getScanner(new Scan())) {
+ rows++;
+ }
+ assertEquals(0, rows);
+ }
+
+ private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize,
+ final byte deleteType, final Long timeStamp) throws Throwable {
+ HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ long noOfDeletedRows = 0L;
+ Batch.Call<BulkDeleteProtocol, BulkDeleteResponse> callable =
+ new Batch.Call<BulkDeleteProtocol, BulkDeleteResponse>() {
+ public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException {
+ return instance.delete(scan, deleteType, timeStamp, rowBatchSize);
+ }
+ };
+ Map<byte[], BulkDeleteResponse> result = ht.coprocessorExec(BulkDeleteProtocol.class,
+ scan.getStartRow(), scan.getStopRow(), callable);
+ for (BulkDeleteResponse response : result.values()) {
+ noOfDeletedRows += response.getRowsDeleted();
+ }
+ return noOfDeletedRows;
+ }
+
+ @Test
+ public void testBulkDeleteWithConditionBasedDelete() throws Throwable {
+ byte[] tableName = Bytes.toBytes("testBulkDeleteWithConditionBasedDelete");
+ HTable ht = createTable(tableName);
+ List<Put> puts = new ArrayList<Put>(100);
+ for (int j = 0; j < 100; j++) {
+ byte[] rowkey = Bytes.toBytes(j);
+ String value = (j % 10 == 0) ? "v1" : "v2";
+ puts.add(createPut(rowkey, value));
+ }
+ ht.put(puts);
+ Scan scan = new Scan();
+ FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
+ SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3,
+ CompareOp.EQUAL, Bytes.toBytes("v1"));
+ //fl.addFilter(new FirstKeyOnlyFilter());
+ fl.addFilter(scvf);
+ scan.setFilter(fl);
+ // Deleting all the rows where cf1:c1=v1
+ long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null);
+ assertEquals(10, noOfRowsDeleted);
+
+ int rows = 0;
+ for (Result result : ht.getScanner(new Scan())) {
+ rows++;
+ }
+ assertEquals(90, rows);
+ }
+
+ @Test
+ public void testBulkDeleteColumn() throws Throwable {
+ byte[] tableName = Bytes.toBytes("testBulkDeleteColumn");
+ HTable ht = createTable(tableName);
+ List<Put> puts = new ArrayList<Put>(100);
+ for (int j = 0; j < 100; j++) {
+ byte[] rowkey = Bytes.toBytes(j);
+ String value = (j % 10 == 0) ? "v1" : "v2";
+ puts.add(createPut(rowkey, value));
+ }
+ ht.put(puts);
+ Scan scan = new Scan ();
+ scan.addColumn(FAMILY1, QUALIFIER2);
+ // Delete the column cf1:col2
+ long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null);
+ assertEquals(100, noOfRowsDeleted);
+
+ int rows = 0;
+ for (Result result : ht.getScanner(new Scan())) {
+ assertEquals(2, result.getFamilyMap(FAMILY1).size());
+ assertTrue(result.getColumn(FAMILY1, QUALIFIER2).isEmpty());
+ assertEquals(1, result.getColumn(FAMILY1, QUALIFIER1).size());
+ assertEquals(1, result.getColumn(FAMILY1, QUALIFIER3).size());
+ rows++;
+ }
+ assertEquals(100, rows);
+ }
+
+ @Test
+ public void testBulkDeleteFamily() throws Throwable {
+ byte[] tableName = Bytes.toBytes("testBulkDeleteFamily");
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(FAMILY1));
+ htd.addFamily(new HColumnDescriptor(FAMILY2));
+ TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
+ HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ List<Put> puts = new ArrayList<Put>(100);
+ for (int j = 0; j < 100; j++) {
+ Put put = new Put(Bytes.toBytes(j));
+ put.add(FAMILY1, QUALIFIER1, "v1".getBytes());
+ put.add(FAMILY2, QUALIFIER2, "v2".getBytes());
+ puts.add(put);
+ }
+ ht.put(puts);
+ Scan scan = new Scan ();
+ scan.addFamily(FAMILY1);
+ // Delete the column family cf1
+ long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null);
+ assertEquals(100, noOfRowsDeleted);
+ int rows = 0;
+ for (Result result : ht.getScanner(new Scan())) {
+ assertTrue(result.getFamilyMap(FAMILY1).isEmpty());
+ assertEquals(1, result.getColumn(FAMILY2, QUALIFIER2).size());
+ rows++;
+ }
+ assertEquals(100, rows);
+ }
+
+ @Test
+ public void testBulkDeleteColumnVersion() throws Throwable {
+ byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersion");
+ HTable ht = createTable(tableName);
+ List<Put> puts = new ArrayList<Put>(100);
+ for (int j = 0; j < 100; j++) {
+ Put put = new Put(Bytes.toBytes(j));
+ byte[] value = "v1".getBytes();
+ put.add(FAMILY1, QUALIFIER1, 1234L, value);
+ put.add(FAMILY1, QUALIFIER2, 1234L, value);
+ put.add(FAMILY1, QUALIFIER3, 1234L, value);
+ // Latest version values
+ value = "v2".getBytes();
+ put.add(FAMILY1, QUALIFIER1, value);
+ put.add(FAMILY1, QUALIFIER2, value);
+ put.add(FAMILY1, QUALIFIER3, value);
+ put.add(FAMILY1, null, value);
+ puts.add(put);
+ }
+ ht.put(puts);
+ Scan scan = new Scan ();
+ scan.addFamily(FAMILY1);
+ // Delete the latest version values of all the columns in family cf1.
+ long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION,
+ HConstants.LATEST_TIMESTAMP);
+ assertEquals(100, noOfRowsDeleted);
+ int rows = 0;
+ scan = new Scan ();
+ scan.setMaxVersions();
+ for (Result result : ht.getScanner(scan)) {
+ assertEquals(3, result.getFamilyMap(FAMILY1).size());
+ List<KeyValue> column = result.getColumn(FAMILY1, QUALIFIER1);
+ assertEquals(1, column.size());
+ assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
+
+ column = result.getColumn(FAMILY1, QUALIFIER2);
+ assertEquals(1, column.size());
+ assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
+
+ column = result.getColumn(FAMILY1, QUALIFIER3);
+ assertEquals(1, column.size());
+ assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
+ rows++;
+ }
+ assertEquals(100, rows);
+ }
+
+ @Test
+ public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable {
+ byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersionBasedOnTS");
+ HTable ht = createTable(tableName);
+ List<Put> puts = new ArrayList<Put>(100);
+ for (int j = 0; j < 100; j++) {
+ Put put = new Put(Bytes.toBytes(j));
+ // TS = 1000L
+ byte[] value = "v1".getBytes();
+ put.add(FAMILY1, QUALIFIER1, 1000L, value);
+ put.add(FAMILY1, QUALIFIER2, 1000L, value);
+ put.add(FAMILY1, QUALIFIER3, 1000L, value);
+ // TS = 1234L
+ value = "v2".getBytes();
+ put.add(FAMILY1, QUALIFIER1, 1234L, value);
+ put.add(FAMILY1, QUALIFIER2, 1234L, value);
+ put.add(FAMILY1, QUALIFIER3, 1234L, value);
+ // Latest version values
+ value = "v3".getBytes();
+ put.add(FAMILY1, QUALIFIER1, value);
+ put.add(FAMILY1, QUALIFIER2, value);
+ put.add(FAMILY1, QUALIFIER3, value);
+ puts.add(put);
+ }
+ ht.put(puts);
+ Scan scan = new Scan ();
+ scan.addColumn(FAMILY1, QUALIFIER3);
+ // Delete the column cf1:c3's one version at TS=1234
+ long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L);
+ assertEquals(100, noOfRowsDeleted);
+ int rows = 0;
+ scan = new Scan ();
+ scan.setMaxVersions();
+ for (Result result : ht.getScanner(scan)) {
+ assertEquals(3, result.getFamilyMap(FAMILY1).size());
+ assertEquals(3, result.getColumn(FAMILY1, QUALIFIER1).size());
+ assertEquals(3, result.getColumn(FAMILY1, QUALIFIER2).size());
+ List<KeyValue> column = result.getColumn(FAMILY1, QUALIFIER3);
+ assertEquals(2, column.size());
+ assertTrue(Bytes.equals("v3".getBytes(), column.get(0).getValue()));
+ assertTrue(Bytes.equals("v1".getBytes(), column.get(1).getValue()));
+ rows++;
+ }
+ assertEquals(100, rows);
+ }
+
+ @Test
+ public void testBulkDeleteWithNumberOfVersions() throws Throwable {
+ byte[] tableName = Bytes.toBytes("testBulkDeleteWithNumberOfVersions");
+ HTable ht = createTable(tableName);
+ List<Put> puts = new ArrayList<Put>(100);
+ for (int j = 0; j < 100; j++) {
+ Put put = new Put(Bytes.toBytes(j));
+ // TS = 1000L
+ byte[] value = "v1".getBytes();
+ put.add(FAMILY1, QUALIFIER1, 1000L, value);
+ put.add(FAMILY1, QUALIFIER2, 1000L, value);
+ put.add(FAMILY1, QUALIFIER3, 1000L, value);
+ // TS = 1234L
+ value = "v2".getBytes();
+ put.add(FAMILY1, QUALIFIER1, 1234L, value);
+ put.add(FAMILY1, QUALIFIER2, 1234L, value);
+ put.add(FAMILY1, QUALIFIER3, 1234L, value);
+ // TS = 2000L
+ value = "v3".getBytes();
+ put.add(FAMILY1, QUALIFIER1, 2000L, value);
+ put.add(FAMILY1, QUALIFIER2, 2000L, value);
+ put.add(FAMILY1, QUALIFIER3, 2000L, value);
+ // Latest version values
+ value = "v4".getBytes();
+ put.add(FAMILY1, QUALIFIER1, value);
+ put.add(FAMILY1, QUALIFIER2, value);
+ put.add(FAMILY1, QUALIFIER3, value);
+ puts.add(put);
+ }
+ ht.put(puts);
+
+ // Delete all the versions of columns cf1:c1 and cf1:c2 falling with the time range
+ // [1000,2000)
+ final Scan scan = new Scan();
+ scan.addColumn(FAMILY1, QUALIFIER1);
+ scan.addColumn(FAMILY1, QUALIFIER2);
+ scan.setTimeRange(1000L, 2000L);
+ scan.setMaxVersions();
+
+ long noOfDeletedRows = 0L;
+ long noOfVersionsDeleted = 0L;
+ Batch.Call<BulkDeleteProtocol, BulkDeleteResponse> callable =
+ new Batch.Call<BulkDeleteProtocol, BulkDeleteResponse>() {
+ public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException {
+ return instance.delete(scan, DeleteType.VERSION, null, 500);
+ }
+ };
+ Map<byte[], BulkDeleteResponse> result = ht.coprocessorExec(BulkDeleteProtocol.class,
+ scan.getStartRow(), scan.getStopRow(), callable);
+ for (BulkDeleteResponse response : result.values()) {
+ noOfDeletedRows += response.getRowsDeleted();
+ noOfVersionsDeleted += response.getVersionsDeleted();
+ }
+ assertEquals(100, noOfDeletedRows);
+ assertEquals(400, noOfVersionsDeleted);
+
+ int rows = 0;
+ Scan scan1 = new Scan ();
+ scan1.setMaxVersions();
+ for (Result res : ht.getScanner(scan1)) {
+ assertEquals(3, res.getFamilyMap(FAMILY1).size());
+ List<KeyValue> column = res.getColumn(FAMILY1, QUALIFIER1);
+ assertEquals(2, column.size());
+ assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue()));
+ assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue()));
+ column = res.getColumn(FAMILY1, QUALIFIER2);
+ assertEquals(2, column.size());
+ assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue()));
+ assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue()));
+ assertEquals(4, res.getColumn(FAMILY1, QUALIFIER3).size());
+ rows++;
+ }
+ assertEquals(100, rows);
+ }
+
+ private HTable createTable(byte[] tableName) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1);
+ hcd.setMaxVersions(10);// Just setting 10 as I am not testing with more than 10 versions here
+ htd.addFamily(hcd);
+ TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
+ HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ return ht;
+ }
+
+ private Put createPut(byte[] rowkey, String value) throws IOException {
+ Put put = new Put(rowkey);
+ put.add(FAMILY1, QUALIFIER1, value.getBytes());
+ put.add(FAMILY1, QUALIFIER2, value.getBytes());
+ put.add(FAMILY1, QUALIFIER3, value.getBytes());
+ return put;
+ }
+}
\ No newline at end of file