You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/10/23 18:04:46 UTC

svn commit: r1401332 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/coprocessor/example/ test/java/org/apache/hadoop/hbase/coprocessor/example/

Author: tedyu
Date: Tue Oct 23 16:04:46 2012
New Revision: 1401332

URL: http://svn.apache.org/viewvc?rev=1401332&view=rev
Log:
HBASE-6942 Endpoint implementation for bulk delete rows (Anoop)


Added:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java?rev=1401332&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java Tue Oct 23 16:04:46 2012
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkDeleteProtocol {
+  private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete";
+  private static final Log LOG = LogFactory.getLog(BulkDeleteEndpoint.class);
+  
+  @Override
+  public BulkDeleteResponse delete(Scan scan, byte deleteType, Long timestamp,
+      int rowBatchSize) {
+    long totalRowsDeleted = 0L;
+    long totalVersionsDeleted = 0L;
+    BulkDeleteResponse response = new BulkDeleteResponse();
+    HRegion region = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
+    boolean hasMore = true;
+    RegionScanner scanner = null;
+    if (scan.getFilter() == null && deleteType == DeleteType.ROW) {
+      // What we need is just the rowkeys. So only 1st KV from any row is enough.
+      // Only when it is a row delete, we can apply this filter
+      // In other types we rely on the scan to know which all columns to be deleted.
+      scan.setFilter(new FirstKeyOnlyFilter());
+    }
+    // When the delete is based on some conditions so that Filters are available in the scan,
+    // we assume that the scan is perfect having necessary column(s) only.
+    try {
+      scanner = region.getScanner(scan);
+      while (hasMore) {
+        List<List<KeyValue>> deleteRows = new ArrayList<List<KeyValue>>(rowBatchSize);
+        for (int i = 0; i < rowBatchSize; i++) {
+          List<KeyValue> results = new ArrayList<KeyValue>();
+          hasMore = scanner.next(results);
+          if (results.size() > 0) {
+            deleteRows.add(results);
+          }
+          if (!hasMore) {
+            // There are no more rows.
+            break;
+          }
+        }
+        if (deleteRows.size() > 0) {
+          Pair<Mutation, Integer>[] deleteWithLockArr = new Pair[deleteRows.size()];
+          int i = 0;
+          for (List<KeyValue> deleteRow : deleteRows) {
+            Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp);
+            deleteWithLockArr[i++] = new Pair<Mutation, Integer>(delete, null);
+          }
+          OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr);
+          for (i = 0; i < opStatus.length; i++) {
+            if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
+              break;
+            }
+            totalRowsDeleted++;
+            if (deleteType == DeleteType.VERSION) {
+              byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute(
+                  NO_OF_VERSIONS_TO_DELETE);
+              if (versionsDeleted != null) {
+                totalVersionsDeleted += Bytes.toInt(versionsDeleted);
+              }
+            }
+          }
+        } 
+      }
+    } catch (IOException ioe) {
+      LOG.error(ioe);
+      response.setIoException(ioe);
+    } finally {
+      if (scanner != null) {
+        try {
+          scanner.close();
+        } catch (IOException ioe) {
+          LOG.error(ioe);
+        }
+      }
+    }
+    response.setRowsDeleted(totalRowsDeleted);
+    response.setVersionsDeleted(totalVersionsDeleted);
+    return response;
+  }
+
+  private Delete createDeleteMutation(List<KeyValue> deleteRow, byte deleteType, Long timestamp) {
+    long ts;
+    if (timestamp == null) {
+      ts = HConstants.LATEST_TIMESTAMP;
+    } else {
+      ts = timestamp;
+    }
+    // We just need the rowkey. Get it from 1st KV.
+    byte[] row = deleteRow.get(0).getRow();
+    Delete delete = new Delete(row, ts, null);
+    if (deleteType != DeleteType.ROW) {
+      switch (deleteType) {
+      case DeleteType.FAMILY:
+        Set<byte[]> families = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+        for (KeyValue kv : deleteRow) {
+          if (families.add(kv.getFamily())) {
+            delete.deleteFamily(kv.getFamily(), ts);
+          }
+        }
+        break;
+
+      case DeleteType.COLUMN:
+        Set<Column> columns = new HashSet<Column>();
+        for (KeyValue kv : deleteRow) {
+          Column column = new Column(kv.getFamily(), kv.getQualifier());
+          if (columns.add(column)) {
+            // Making deleteColumns() calls more than once for the same cf:qualifier is not correct
+            // Every call to deleteColumns() will add a new KV to the familymap which will finally
+            // get written to the memstore as part of delete().
+            delete.deleteColumns(column.family, column.qualifier, ts);
+          }
+        }
+        break;
+
+      case DeleteType.VERSION:
+        // When some timestamp was passed to the delete() call only one version of the column (with
+        // given timestamp) will be deleted. If no timestamp passed, it will delete N versions.
+        // How many versions will get deleted depends on the Scan being passed. All the KVs that
+        // the scan fetched will get deleted.
+        int noOfVersionsToDelete = 0;
+        if (timestamp == null) {
+          for (KeyValue kv : deleteRow) {
+            delete.deleteColumn(kv.getFamily(), kv.getQualifier(), kv.getTimestamp());
+            noOfVersionsToDelete++;
+          }
+        } else {
+          columns = new HashSet<Column>();
+          for (KeyValue kv : deleteRow) {
+            Column column = new Column(kv.getFamily(), kv.getQualifier());
+            // Only one version of particular column getting deleted.
+            if (columns.add(column)) {
+              delete.deleteColumn(column.family, column.qualifier, ts);
+              noOfVersionsToDelete++;
+            }
+          }
+        }
+        delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete));
+      }
+    }
+    return delete;
+  }
+  
+  private static class Column {
+    private byte[] family;
+    private byte[] qualifier;
+
+    public Column(byte[] family, byte[] qualifier) {
+      this.family = family;
+      this.qualifier = qualifier;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof Column)) {
+        return false;
+      }
+      Column column = (Column) other;
+      return Bytes.equals(this.family, column.family)
+          && Bytes.equals(this.qualifier, column.qualifier);
+    }
+
+    @Override
+    public int hashCode() {
+      int h = 31;
+      h = h + 13 * Bytes.hashCode(this.family);
+      h = h + 13 * Bytes.hashCode(this.qualifier);
+      return h;
+    }
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java?rev=1401332&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java Tue Oct 23 16:04:46 2012
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+/**
+ * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with
+ * conditions(filters) etc.
+ * </br> Example: <code><pre>
+ * Scan scan = new Scan();
+ * // set scan properties(rowkey range, filters, timerange etc).
+ * HTable ht = ...;
+ * long noOfDeletedRows = 0L;
+ * Batch.Call&lt;BulkDeleteProtocol, BulkDeleteResponse&gt; callable = 
+ *     new Batch.Call&lt;BulkDeleteProtocol, BulkDeleteResponse&gt;() {
+ *   public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException {
+ *     return instance.deleteRows(scan, BulkDeleteProtocol.DeleteType, timestamp, rowBatchSize);
+ *   }
+ * };
+ * Map&lt;byte[], BulkDeleteResponse&gt; result = ht.coprocessorExec(BulkDeleteProtocol.class,
+ *      scan.getStartRow(), scan.getStopRow(), callable);
+ *  for (BulkDeleteResponse response : result.values()) {
+ *    noOfDeletedRows = response.getRowsDeleted();
+ *  }
+ * </pre></code>
+ */
+public interface BulkDeleteProtocol extends CoprocessorProtocol {
+  
+  public interface DeleteType {
+    /** 
+     * Delete full row
+     */
+    byte ROW = 0;
+    /**
+     * Delete full family(s).
+     * Which family(s) to be deleted will be determined by the Scan.
+     * Scan need to select all the families which need to be deleted.
+     */
+    byte FAMILY = 1;
+    /**
+     * Delete full column(s).
+     * Which column(s) to be deleted will be determined by the Scan.
+     * Scan need to select all the qualifiers which need to be deleted.
+     */
+    byte COLUMN = 2;
+    /**
+     * Delete one or more version(s) of column(s).
+     * Which column(s) and version(s) to be deleted will be determined by the Scan.
+     * Scan need to select all the qualifiers and its versions which need to be deleted.
+     * When a timestamp is passed only one version at that timestamp will be deleted(even if scan
+     * fetches many versions)
+     */
+    byte VERSION = 3;
+  }
+  
+  /**
+   * 
+   * @param scan
+   * @param deleteType
+   * @param timestamp
+   * @param rowBatchSize
+   *          The number of rows which need to be accumulated by scan and delete as one batch
+   * @return
+   */
+  BulkDeleteResponse delete(Scan scan, byte deleteType, Long timestamp, int rowBatchSize);
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java?rev=1401332&view=auto
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java (added)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java Tue Oct 23 16:04:46 2012
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Wrapper class which returns the result of the bulk deletion operation happened at the server for
+ * a region. This includes the total number of rows deleted and/or any {@link IOException} which is
+ * happened while doing the operation. It will also include total number of versions deleted, when
+ * the delete type is VERSION.
+ */
+public class BulkDeleteResponse implements Serializable {
+  private static final long serialVersionUID = -8192337710525997237L;
+  private long rowsDeleted;
+  private IOException ioException;
+  private long versionsDeleted;
+
+  public BulkDeleteResponse() {
+
+  }
+
+  public void setRowsDeleted(long rowsDeleted) {
+    this.rowsDeleted = rowsDeleted;
+  }
+
+  public long getRowsDeleted() {
+    return rowsDeleted;
+  }
+
+  public void setIoException(IOException ioException) {
+    this.ioException = ioException;
+  }
+
+  public IOException getIoException() {
+    return ioException;
+  }
+
+  public long getVersionsDeleted() {
+    return versionsDeleted;
+  }
+
+  public void setVersionsDeleted(long versionsDeleted) {
+    this.versionsDeleted = versionsDeleted;
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java?rev=1401332&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java Tue Oct 23 16:04:46 2012
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.example.BulkDeleteProtocol.DeleteType;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestBulkDeleteProtocol {
+  private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
+  private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
+  private static final byte[] QUALIFIER1 = Bytes.toBytes("c1");
+  private static final byte[] QUALIFIER2 = Bytes.toBytes("c2");
+  private static final byte[] QUALIFIER3 = Bytes.toBytes("c3");
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+        BulkDeleteEndpoint.class.getName());
+    TEST_UTIL.startMiniCluster(2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testBulkDeleteEndpoint() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteEndpoint");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      byte[] rowkey = Bytes.toBytes(j);
+      puts.add(createPut(rowkey, "v1"));
+    }
+    ht.put(puts);
+    // Deleting all the rows.
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 500, DeleteType.ROW,
+        null);
+    assertEquals(100, noOfRowsDeleted);
+
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      rows++;
+    }
+    assertEquals(0, rows);
+  }
+
+  @Test
+  public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion()
+      throws Throwable {
+    byte[] tableName = Bytes
+        .toBytes("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      byte[] rowkey = Bytes.toBytes(j);
+      puts.add(createPut(rowkey, "v1"));
+    }
+    ht.put(puts);
+    // Deleting all the rows.
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null);
+    assertEquals(100, noOfRowsDeleted);
+
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      rows++;
+    }
+    assertEquals(0, rows);
+  }
+  
+  private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize,
+      final byte deleteType, final Long timeStamp) throws Throwable {
+    HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    long noOfDeletedRows = 0L;
+    Batch.Call<BulkDeleteProtocol, BulkDeleteResponse> callable = 
+        new Batch.Call<BulkDeleteProtocol, BulkDeleteResponse>() {
+      public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException {
+        return instance.delete(scan, deleteType, timeStamp, rowBatchSize);
+      }
+    };
+    Map<byte[], BulkDeleteResponse> result = ht.coprocessorExec(BulkDeleteProtocol.class,
+        scan.getStartRow(), scan.getStopRow(), callable);
+    for (BulkDeleteResponse response : result.values()) {
+      noOfDeletedRows += response.getRowsDeleted();
+    }
+    return noOfDeletedRows;
+  }
+
+  @Test
+  public void testBulkDeleteWithConditionBasedDelete() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteWithConditionBasedDelete");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      byte[] rowkey = Bytes.toBytes(j);
+      String value = (j % 10 == 0) ? "v1" : "v2";
+      puts.add(createPut(rowkey, value));
+    }
+    ht.put(puts);
+    Scan scan = new Scan();
+    FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
+    SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3,
+        CompareOp.EQUAL, Bytes.toBytes("v1"));
+    //fl.addFilter(new FirstKeyOnlyFilter());
+    fl.addFilter(scvf);
+    scan.setFilter(fl);
+    // Deleting all the rows where cf1:c1=v1
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null);
+    assertEquals(10, noOfRowsDeleted);
+
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      rows++;
+    }
+    assertEquals(90, rows);
+  }
+
+  @Test
+  public void testBulkDeleteColumn() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteColumn");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      byte[] rowkey = Bytes.toBytes(j);
+      String value = (j % 10 == 0) ? "v1" : "v2";
+      puts.add(createPut(rowkey, value));
+    }
+    ht.put(puts);
+    Scan scan = new Scan ();
+    scan.addColumn(FAMILY1, QUALIFIER2);
+    // Delete the column cf1:col2
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null);
+    assertEquals(100, noOfRowsDeleted);
+
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      assertEquals(2, result.getFamilyMap(FAMILY1).size());
+      assertTrue(result.getColumn(FAMILY1, QUALIFIER2).isEmpty());
+      assertEquals(1, result.getColumn(FAMILY1, QUALIFIER1).size());
+      assertEquals(1, result.getColumn(FAMILY1, QUALIFIER3).size());
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+  
+  @Test
+  public void testBulkDeleteFamily() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteFamily");
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(FAMILY1));
+    htd.addFamily(new HColumnDescriptor(FAMILY2));
+    TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
+    HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      Put put = new Put(Bytes.toBytes(j));
+      put.add(FAMILY1, QUALIFIER1, "v1".getBytes());
+      put.add(FAMILY2, QUALIFIER2, "v2".getBytes());
+      puts.add(put);
+    }
+    ht.put(puts);
+    Scan scan = new Scan ();
+    scan.addFamily(FAMILY1);
+    // Delete the column family cf1
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null);
+    assertEquals(100, noOfRowsDeleted);
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      assertTrue(result.getFamilyMap(FAMILY1).isEmpty());
+      assertEquals(1, result.getColumn(FAMILY2, QUALIFIER2).size());
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+  
+  @Test
+  public void testBulkDeleteColumnVersion() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersion");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      Put put = new Put(Bytes.toBytes(j));
+      byte[] value = "v1".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1234L, value);
+      put.add(FAMILY1, QUALIFIER2, 1234L, value);
+      put.add(FAMILY1, QUALIFIER3, 1234L, value);
+      // Latest version values
+      value = "v2".getBytes();
+      put.add(FAMILY1, QUALIFIER1, value);
+      put.add(FAMILY1, QUALIFIER2, value);
+      put.add(FAMILY1, QUALIFIER3, value);
+      put.add(FAMILY1, null, value);
+      puts.add(put);
+    }
+    ht.put(puts);
+    Scan scan = new Scan ();
+    scan.addFamily(FAMILY1);
+    // Delete the latest version values of all the columns in family cf1.
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION,
+        HConstants.LATEST_TIMESTAMP);
+    assertEquals(100, noOfRowsDeleted);
+    int rows = 0;
+    scan = new Scan ();
+    scan.setMaxVersions();
+    for (Result result : ht.getScanner(scan)) {
+      assertEquals(3, result.getFamilyMap(FAMILY1).size());
+      List<KeyValue> column = result.getColumn(FAMILY1, QUALIFIER1);
+      assertEquals(1, column.size());
+      assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
+      
+      column = result.getColumn(FAMILY1, QUALIFIER2);
+      assertEquals(1, column.size());
+      assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
+      
+      column = result.getColumn(FAMILY1, QUALIFIER3);
+      assertEquals(1, column.size());
+      assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+  
+  @Test
+  public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersionBasedOnTS");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      Put put = new Put(Bytes.toBytes(j));
+      // TS = 1000L
+      byte[] value = "v1".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1000L, value);
+      put.add(FAMILY1, QUALIFIER2, 1000L, value);
+      put.add(FAMILY1, QUALIFIER3, 1000L, value);
+      // TS = 1234L
+      value = "v2".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1234L, value);
+      put.add(FAMILY1, QUALIFIER2, 1234L, value);
+      put.add(FAMILY1, QUALIFIER3, 1234L, value);
+      // Latest version values
+      value = "v3".getBytes();
+      put.add(FAMILY1, QUALIFIER1, value);
+      put.add(FAMILY1, QUALIFIER2, value);
+      put.add(FAMILY1, QUALIFIER3, value);
+      puts.add(put);
+    }
+    ht.put(puts);
+    Scan scan = new Scan ();
+    scan.addColumn(FAMILY1, QUALIFIER3);
+    // Delete the column cf1:c3's one version at TS=1234 
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L);
+    assertEquals(100, noOfRowsDeleted);
+    int rows = 0;
+    scan = new Scan ();
+    scan.setMaxVersions();
+    for (Result result : ht.getScanner(scan)) {
+      assertEquals(3, result.getFamilyMap(FAMILY1).size());
+      assertEquals(3, result.getColumn(FAMILY1, QUALIFIER1).size());
+      assertEquals(3, result.getColumn(FAMILY1, QUALIFIER2).size());
+      List<KeyValue> column = result.getColumn(FAMILY1, QUALIFIER3);
+      assertEquals(2, column.size());
+      assertTrue(Bytes.equals("v3".getBytes(), column.get(0).getValue()));
+      assertTrue(Bytes.equals("v1".getBytes(), column.get(1).getValue()));
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+  
+  @Test
+  public void testBulkDeleteWithNumberOfVersions() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteWithNumberOfVersions");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      Put put = new Put(Bytes.toBytes(j));
+      // TS = 1000L
+      byte[] value = "v1".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1000L, value);
+      put.add(FAMILY1, QUALIFIER2, 1000L, value);
+      put.add(FAMILY1, QUALIFIER3, 1000L, value);
+      // TS = 1234L
+      value = "v2".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1234L, value);
+      put.add(FAMILY1, QUALIFIER2, 1234L, value);
+      put.add(FAMILY1, QUALIFIER3, 1234L, value);
+      // TS = 2000L
+      value = "v3".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 2000L, value);
+      put.add(FAMILY1, QUALIFIER2, 2000L, value);
+      put.add(FAMILY1, QUALIFIER3, 2000L, value);
+      // Latest version values
+      value = "v4".getBytes();
+      put.add(FAMILY1, QUALIFIER1, value);
+      put.add(FAMILY1, QUALIFIER2, value);
+      put.add(FAMILY1, QUALIFIER3, value);
+      puts.add(put);
+    }
+    ht.put(puts);
+    
+    // Delete all the versions of columns cf1:c1 and cf1:c2 falling with the time range
+    // [1000,2000)
+    final Scan scan = new Scan();
+    scan.addColumn(FAMILY1, QUALIFIER1);
+    scan.addColumn(FAMILY1, QUALIFIER2);
+    scan.setTimeRange(1000L, 2000L);
+    scan.setMaxVersions();
+    
+    long noOfDeletedRows = 0L;
+    long noOfVersionsDeleted = 0L;
+    Batch.Call<BulkDeleteProtocol, BulkDeleteResponse> callable = 
+        new Batch.Call<BulkDeleteProtocol, BulkDeleteResponse>() {
+      public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException {
+        return instance.delete(scan, DeleteType.VERSION, null, 500);
+      }
+    };
+    Map<byte[], BulkDeleteResponse> result = ht.coprocessorExec(BulkDeleteProtocol.class,
+        scan.getStartRow(), scan.getStopRow(), callable);
+    for (BulkDeleteResponse response : result.values()) {
+      noOfDeletedRows += response.getRowsDeleted();
+      noOfVersionsDeleted += response.getVersionsDeleted();
+    }
+    assertEquals(100, noOfDeletedRows);
+    assertEquals(400, noOfVersionsDeleted);
+    
+    int rows = 0;
+    Scan scan1 = new Scan ();
+    scan1.setMaxVersions();
+    for (Result res : ht.getScanner(scan1)) {
+      assertEquals(3, res.getFamilyMap(FAMILY1).size());
+      List<KeyValue> column = res.getColumn(FAMILY1, QUALIFIER1);
+      assertEquals(2, column.size());
+      assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue()));
+      assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue()));
+      column = res.getColumn(FAMILY1, QUALIFIER2);
+      assertEquals(2, column.size());
+      assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue()));
+      assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue()));
+      assertEquals(4, res.getColumn(FAMILY1, QUALIFIER3).size());
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+  
+  private HTable createTable(byte[] tableName) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1);
+    hcd.setMaxVersions(10);// Just setting 10 as I am not testing with more than 10 versions here
+    htd.addFamily(hcd);
+    TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
+    HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    return ht;
+  }
+
+  private Put createPut(byte[] rowkey, String value) throws IOException {
+    Put put = new Put(rowkey);
+    put.add(FAMILY1, QUALIFIER1, value.getBytes());
+    put.add(FAMILY1, QUALIFIER2, value.getBytes());
+    put.add(FAMILY1, QUALIFIER3, value.getBytes());
+    return put;
+  }
+}
\ No newline at end of file