You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/10/23 17:59:58 UTC

svn commit: r1401330 [2/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/coprocessor/example/ main/java/org/apache/hadoop/hbase/coprocessor/example/generated/ main/protobuf/ test/java/org/apache/hadoop/hbase/coprocessor/example/

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java?rev=1401330&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java Tue Oct 23 15:59:57 2012
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
+import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
+import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
+import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.Builder;
+import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestBulkDeleteProtocol {
+  private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
+  private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
+  private static final byte[] QUALIFIER1 = Bytes.toBytes("c1");
+  private static final byte[] QUALIFIER2 = Bytes.toBytes("c2");
+  private static final byte[] QUALIFIER3 = Bytes.toBytes("c3");
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+        BulkDeleteEndpoint.class.getName());
+    TEST_UTIL.startMiniCluster(2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testBulkDeleteEndpoint() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteEndpoint");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      byte[] rowkey = Bytes.toBytes(j);
+      puts.add(createPut(rowkey, "v1"));
+    }
+    ht.put(puts);
+    // Deleting all the rows.
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 5, DeleteType.ROW, null);
+    assertEquals(100, noOfRowsDeleted);
+
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      rows++;
+    }
+    assertEquals(0, rows);
+  }
+
+  @Test
+  public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion()
+      throws Throwable {
+    byte[] tableName = Bytes
+        .toBytes("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      byte[] rowkey = Bytes.toBytes(j);
+      puts.add(createPut(rowkey, "v1"));
+    }
+    ht.put(puts);
+    // Deleting all the rows.
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null);
+    assertEquals(100, noOfRowsDeleted);
+
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      rows++;
+    }
+    assertEquals(0, rows);
+  }
+
+  private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize,
+      final DeleteType deleteType, final Long timeStamp) throws Throwable {
+    HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    long noOfDeletedRows = 0L;
+    Batch.Call<BulkDeleteService, BulkDeleteResponse> callable = 
+      new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
+      ServerRpcController controller = new ServerRpcController();
+      BlockingRpcCallback<BulkDeleteResponse> rpcCallback = 
+        new BlockingRpcCallback<BulkDeleteResponse>();
+
+      public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
+        Builder builder = BulkDeleteRequest.newBuilder();
+        builder.setScan(ProtobufUtil.toScan(scan));
+        builder.setDeleteType(deleteType);
+        builder.setRowBatchSize(rowBatchSize);
+        if (timeStamp != null) {
+          builder.setTimestamp(timeStamp);
+        }
+        service.delete(controller, builder.build(), rpcCallback);
+        return rpcCallback.get();
+      }
+    };
+    Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
+        .getStartRow(), scan.getStopRow(), callable);
+    for (BulkDeleteResponse response : result.values()) {
+      noOfDeletedRows += response.getRowsDeleted();
+    }
+    return noOfDeletedRows;
+  }
+
+  @Test
+  public void testBulkDeleteWithConditionBasedDelete() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteWithConditionBasedDelete");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      byte[] rowkey = Bytes.toBytes(j);
+      String value = (j % 10 == 0) ? "v1" : "v2";
+      puts.add(createPut(rowkey, value));
+    }
+    ht.put(puts);
+    Scan scan = new Scan();
+    FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
+    SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3,
+        CompareOp.EQUAL, Bytes.toBytes("v1"));
+    // fl.addFilter(new FirstKeyOnlyFilter());
+    fl.addFilter(scvf);
+    scan.setFilter(fl);
+    // Deleting all the rows where cf1:c1=v1
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null);
+    assertEquals(10, noOfRowsDeleted);
+
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      rows++;
+    }
+    assertEquals(90, rows);
+  }
+
+  @Test
+  public void testBulkDeleteColumn() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteColumn");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      byte[] rowkey = Bytes.toBytes(j);
+      String value = (j % 10 == 0) ? "v1" : "v2";
+      puts.add(createPut(rowkey, value));
+    }
+    ht.put(puts);
+    Scan scan = new Scan();
+    scan.addColumn(FAMILY1, QUALIFIER2);
+    // Delete the column cf1:col2
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null);
+    assertEquals(100, noOfRowsDeleted);
+
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      assertEquals(2, result.getFamilyMap(FAMILY1).size());
+      assertTrue(result.getColumn(FAMILY1, QUALIFIER2).isEmpty());
+      assertEquals(1, result.getColumn(FAMILY1, QUALIFIER1).size());
+      assertEquals(1, result.getColumn(FAMILY1, QUALIFIER3).size());
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+
+  @Test
+  public void testBulkDeleteFamily() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteFamily");
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(FAMILY1));
+    htd.addFamily(new HColumnDescriptor(FAMILY2));
+    TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
+    HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      Put put = new Put(Bytes.toBytes(j));
+      put.add(FAMILY1, QUALIFIER1, "v1".getBytes());
+      put.add(FAMILY2, QUALIFIER2, "v2".getBytes());
+      puts.add(put);
+    }
+    ht.put(puts);
+    Scan scan = new Scan();
+    scan.addFamily(FAMILY1);
+    // Delete the column family cf1
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null);
+    assertEquals(100, noOfRowsDeleted);
+    int rows = 0;
+    for (Result result : ht.getScanner(new Scan())) {
+      assertTrue(result.getFamilyMap(FAMILY1).isEmpty());
+      assertEquals(1, result.getColumn(FAMILY2, QUALIFIER2).size());
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+
+  @Test
+  public void testBulkDeleteColumnVersion() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersion");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      Put put = new Put(Bytes.toBytes(j));
+      byte[] value = "v1".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1234L, value);
+      put.add(FAMILY1, QUALIFIER2, 1234L, value);
+      put.add(FAMILY1, QUALIFIER3, 1234L, value);
+      // Latest version values
+      value = "v2".getBytes();
+      put.add(FAMILY1, QUALIFIER1, value);
+      put.add(FAMILY1, QUALIFIER2, value);
+      put.add(FAMILY1, QUALIFIER3, value);
+      put.add(FAMILY1, null, value);
+      puts.add(put);
+    }
+    ht.put(puts);
+    Scan scan = new Scan();
+    scan.addFamily(FAMILY1);
+    // Delete the latest version values of all the columns in family cf1.
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION,
+        HConstants.LATEST_TIMESTAMP);
+    assertEquals(100, noOfRowsDeleted);
+    int rows = 0;
+    scan = new Scan();
+    scan.setMaxVersions();
+    for (Result result : ht.getScanner(scan)) {
+      assertEquals(3, result.getFamilyMap(FAMILY1).size());
+      List<KeyValue> column = result.getColumn(FAMILY1, QUALIFIER1);
+      assertEquals(1, column.size());
+      assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
+
+      column = result.getColumn(FAMILY1, QUALIFIER2);
+      assertEquals(1, column.size());
+      assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
+
+      column = result.getColumn(FAMILY1, QUALIFIER3);
+      assertEquals(1, column.size());
+      assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue()));
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+
+  @Test
+  public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersionBasedOnTS");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      Put put = new Put(Bytes.toBytes(j));
+      // TS = 1000L
+      byte[] value = "v1".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1000L, value);
+      put.add(FAMILY1, QUALIFIER2, 1000L, value);
+      put.add(FAMILY1, QUALIFIER3, 1000L, value);
+      // TS = 1234L
+      value = "v2".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1234L, value);
+      put.add(FAMILY1, QUALIFIER2, 1234L, value);
+      put.add(FAMILY1, QUALIFIER3, 1234L, value);
+      // Latest version values
+      value = "v3".getBytes();
+      put.add(FAMILY1, QUALIFIER1, value);
+      put.add(FAMILY1, QUALIFIER2, value);
+      put.add(FAMILY1, QUALIFIER3, value);
+      puts.add(put);
+    }
+    ht.put(puts);
+    Scan scan = new Scan();
+    scan.addColumn(FAMILY1, QUALIFIER3);
+    // Delete the column cf1:c3's one version at TS=1234
+    long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L);
+    assertEquals(100, noOfRowsDeleted);
+    int rows = 0;
+    scan = new Scan();
+    scan.setMaxVersions();
+    for (Result result : ht.getScanner(scan)) {
+      assertEquals(3, result.getFamilyMap(FAMILY1).size());
+      assertEquals(3, result.getColumn(FAMILY1, QUALIFIER1).size());
+      assertEquals(3, result.getColumn(FAMILY1, QUALIFIER2).size());
+      List<KeyValue> column = result.getColumn(FAMILY1, QUALIFIER3);
+      assertEquals(2, column.size());
+      assertTrue(Bytes.equals("v3".getBytes(), column.get(0).getValue()));
+      assertTrue(Bytes.equals("v1".getBytes(), column.get(1).getValue()));
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+
+  @Test
+  public void testBulkDeleteWithNumberOfVersions() throws Throwable {
+    byte[] tableName = Bytes.toBytes("testBulkDeleteWithNumberOfVersions");
+    HTable ht = createTable(tableName);
+    List<Put> puts = new ArrayList<Put>(100);
+    for (int j = 0; j < 100; j++) {
+      Put put = new Put(Bytes.toBytes(j));
+      // TS = 1000L
+      byte[] value = "v1".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1000L, value);
+      put.add(FAMILY1, QUALIFIER2, 1000L, value);
+      put.add(FAMILY1, QUALIFIER3, 1000L, value);
+      // TS = 1234L
+      value = "v2".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 1234L, value);
+      put.add(FAMILY1, QUALIFIER2, 1234L, value);
+      put.add(FAMILY1, QUALIFIER3, 1234L, value);
+      // TS = 2000L
+      value = "v3".getBytes();
+      put.add(FAMILY1, QUALIFIER1, 2000L, value);
+      put.add(FAMILY1, QUALIFIER2, 2000L, value);
+      put.add(FAMILY1, QUALIFIER3, 2000L, value);
+      // Latest version values
+      value = "v4".getBytes();
+      put.add(FAMILY1, QUALIFIER1, value);
+      put.add(FAMILY1, QUALIFIER2, value);
+      put.add(FAMILY1, QUALIFIER3, value);
+      puts.add(put);
+    }
+    ht.put(puts);
+
+    // Delete all the versions of columns cf1:c1 and cf1:c2 falling with the time range
+    // [1000,2000)
+    final Scan scan = new Scan();
+    scan.addColumn(FAMILY1, QUALIFIER1);
+    scan.addColumn(FAMILY1, QUALIFIER2);
+    scan.setTimeRange(1000L, 2000L);
+    scan.setMaxVersions();
+
+    long noOfDeletedRows = 0L;
+    long noOfVersionsDeleted = 0L;
+    Batch.Call<BulkDeleteService, BulkDeleteResponse> callable = 
+      new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
+      ServerRpcController controller = new ServerRpcController();
+      BlockingRpcCallback<BulkDeleteResponse> rpcCallback = 
+        new BlockingRpcCallback<BulkDeleteResponse>();
+
+      public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
+        Builder builder = BulkDeleteRequest.newBuilder();
+        builder.setScan(ProtobufUtil.toScan(scan));
+        builder.setDeleteType(DeleteType.VERSION);
+        builder.setRowBatchSize(500);
+        service.delete(controller, builder.build(), rpcCallback);
+        return rpcCallback.get();
+      }
+    };
+    Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
+        .getStartRow(), scan.getStopRow(), callable);
+    for (BulkDeleteResponse response : result.values()) {
+      noOfDeletedRows += response.getRowsDeleted();
+      noOfVersionsDeleted += response.getVersionsDeleted();
+    }
+    assertEquals(100, noOfDeletedRows);
+    assertEquals(400, noOfVersionsDeleted);
+
+    int rows = 0;
+    Scan scan1 = new Scan();
+    scan1.setMaxVersions();
+    for (Result res : ht.getScanner(scan1)) {
+      assertEquals(3, res.getFamilyMap(FAMILY1).size());
+      List<KeyValue> column = res.getColumn(FAMILY1, QUALIFIER1);
+      assertEquals(2, column.size());
+      assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue()));
+      assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue()));
+      column = res.getColumn(FAMILY1, QUALIFIER2);
+      assertEquals(2, column.size());
+      assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue()));
+      assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue()));
+      assertEquals(4, res.getColumn(FAMILY1, QUALIFIER3).size());
+      rows++;
+    }
+    assertEquals(100, rows);
+  }
+
+  private HTable createTable(byte[] tableName) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1);
+    hcd.setMaxVersions(10);// Just setting 10 as I am not testing with more than 10 versions here
+    htd.addFamily(hcd);
+    TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
+    HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    return ht;
+  }
+
+  private Put createPut(byte[] rowkey, String value) throws IOException {
+    Put put = new Put(rowkey);
+    put.add(FAMILY1, QUALIFIER1, value.getBytes());
+    put.add(FAMILY1, QUALIFIER2, value.getBytes());
+    put.add(FAMILY1, QUALIFIER3, value.getBytes());
+    return put;
+  }
+}
\ No newline at end of file