You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/05/11 07:26:08 UTC

[hbase] branch branch-2 updated: HBASE-24321 - Add writable MinVersions and read-only Scan to coproc S… (#1655)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 2993a95  HBASE-24321 - Add writable MinVersions and read-only Scan to coproc S… (#1655)
2993a95 is described below

commit 2993a95ee21ba4610bb6c900b467d271f96b28f7
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Mon May 11 00:24:22 2020 -0700

    HBASE-24321 - Add writable MinVersions and read-only Scan to coproc S… (#1655)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Lars Hofhansl <la...@apache.org>
    Signed-off-by: Anoop Sam John <an...@gmail.com>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../regionserver/CustomizedScanInfoBuilder.java    |  40 ++++++-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   2 +-
 .../hbase/regionserver/RegionCoprocessorHost.java  |   4 +-
 .../apache/hadoop/hbase/regionserver/ScanInfo.java |   9 +-
 .../hadoop/hbase/regionserver/ScanOptions.java     |  15 ++-
 .../hbase/coprocessor/SimpleRegionObserver.java    |  36 ++++++
 .../coprocessor/TestRegionCoprocessorHost.java     | 123 +++++++++++++++++++--
 7 files changed, 209 insertions(+), 20 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
index b791010..81a43c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
 import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -34,8 +36,22 @@ public class CustomizedScanInfoBuilder implements ScanOptions {
 
   private KeepDeletedCells keepDeletedCells = null;
 
+  private Integer minVersions;
+
+  private final Scan scan;
+
   public CustomizedScanInfoBuilder(ScanInfo scanInfo) {
     this.scanInfo = scanInfo;
+    this.scan = new Scan();
+  }
+  public CustomizedScanInfoBuilder(ScanInfo scanInfo, Scan scan) {
+    this.scanInfo = scanInfo;
+    //copy the scan so no coproc using this ScanOptions can alter the "real" scan
+    try {
+      this.scan = new Scan(scan);
+    } catch (IOException e) {
+      throw new AssertionError("Scan should not throw IOException", e);
+    }
   }
 
   @Override
@@ -62,12 +78,13 @@ public class CustomizedScanInfoBuilder implements ScanOptions {
     if (maxVersions == null && ttl == null && keepDeletedCells == null) {
       return scanInfo;
     }
-    return scanInfo.customize(getMaxVersions(), getTTL(), getKeepDeletedCells());
+    return scanInfo.customize(getMaxVersions(), getTTL(), getKeepDeletedCells(), getMinVersions());
   }
 
   @Override
   public String toString() {
-    return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() + "]";
+    return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() +
+      ", KeepDeletedCells=" + getKeepDeletedCells() + ", MinVersions=" + getMinVersions() + "]";
   }
 
   @Override
@@ -80,4 +97,23 @@ public class CustomizedScanInfoBuilder implements ScanOptions {
     return keepDeletedCells != null ? keepDeletedCells : scanInfo.getKeepDeletedCells();
   }
 
+  @Override
+  public int getMinVersions() {
+    return minVersions != null ? minVersions : scanInfo.getMinVersions();
+  }
+
+  @Override
+  public void setMinVersions(int minVersions) {
+    this.minVersions = minVersions;
+  }
+
+  @Override
+  public Scan getScan() {
+    try {
+      return new Scan(scan);
+    } catch(IOException e) {
+      throw new AssertionError("Scan should not throw IOException anymore", e);
+    }
+  }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 6103819..c0ec662 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2142,7 +2142,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     try {
       ScanInfo scanInfo;
       if (this.getCoprocessorHost() != null) {
-        scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this);
+        scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan);
       } else {
         scanInfo = getScanInfo();
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 331e7cd..93c314d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1634,9 +1634,9 @@ public class RegionCoprocessorHost
   /**
    * Called before open store scanner for user scan.
    */
-  public ScanInfo preStoreScannerOpen(HStore store) throws IOException {
+  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
     if (coprocEnvironments.isEmpty()) return store.getScanInfo();
-    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
     execOperation(new RegionObserverOperationWithoutResult() {
       @Override
       public void call(RegionObserver observer) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
index 2fde311..7d61618 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
@@ -174,8 +174,13 @@ public class ScanInfo {
    * Used for CP users for customizing max versions, ttl and keepDeletedCells.
    */
   ScanInfo customize(int maxVersions, long ttl, KeepDeletedCells keepDeletedCells) {
+    return customize(maxVersions, ttl, keepDeletedCells, minVersions);
+  }
+
+  ScanInfo customize(int maxVersions, long ttl, KeepDeletedCells keepDeletedCells,
+    int minVersions) {
     return new ScanInfo(family, minVersions, maxVersions, ttl, keepDeletedCells, timeToPurgeDeletes,
-        comparator, tableMaxRowSize, usePread, cellsPerTimeoutCheck, parallelSeekEnabled,
-        preadMaxBytes, newVersionBehavior);
+      comparator, tableMaxRowSize, usePread, cellsPerTimeoutCheck, parallelSeekEnabled,
+      preadMaxBytes, newVersionBehavior);
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
index aca857a..e81f3ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -26,8 +27,9 @@ import org.apache.yetus.audience.InterfaceStability;
  * This class gives you the ability to change the max versions and TTL options before opening a
  * scanner for a Store. And also gives you some information for the scan.
  * <p>
- * Changing max versions and TTL are usually safe even for flush/compaction, so here we provide a
- * way to do it for you. If you want to do other complicated stuffs such as filtering, please wrap
+ * Changing max versions, min versins, KeepDeletedCells, and TTL are usually safe even
+ * for flush/compaction, so here we provide a way to do it for you. If you want to do other
+ * complicated operations such as filtering, please wrap
  * the {@link InternalScanner} in the {@code preCompact} and {@code preFlush} methods in
  * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}.
  * <p>
@@ -64,4 +66,13 @@ public interface ScanOptions {
   void setKeepDeletedCells(KeepDeletedCells keepDeletedCells);
 
   KeepDeletedCells getKeepDeletedCells();
+
+  int getMinVersions();
+
+  void setMinVersions(int minVersions);
+
+  /**
+   * Returns a copy of the Scan object. Modifying it will have no effect.
+   */
+  Scan getScan();
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index 523466d..ef62901 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -686,6 +688,40 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   }
 
   @Override
+  public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
+    Store store, ScanOptions options) throws IOException {
+    if (options.getScan().getTimeRange().isAllTime()) {
+      setScanOptions(options);
+    }
+  }
+
+  @Override
+  public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+    ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
+    CompactionRequest request) throws IOException {
+    setScanOptions(options);
+  }
+
+  public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+    ScanOptions options,FlushLifeCycleTracker tracker) throws IOException {
+    setScanOptions(options);
+  }
+
+  public void preMemStoreCompactionCompactScannerOpen(
+    ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
+    throws IOException {
+    setScanOptions(options);
+  }
+
+  private void setScanOptions(ScanOptions options) {
+    options.setMaxVersions(TestRegionCoprocessorHost.MAX_VERSIONS);
+    options.setMinVersions(TestRegionCoprocessorHost.MIN_VERSIONS);
+    options.setKeepDeletedCells(KeepDeletedCells.TRUE);
+    options.setTTL(TestRegionCoprocessorHost.TTL);
+  }
+
+
+  @Override
   public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
                                  WALKey key, WALEdit edit) throws IOException {
     ctPreWALAppend.incrementAndGet();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorHost.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorHost.java
index ee6e216..423a412 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorHost.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorHost.java
@@ -22,48 +22,80 @@ import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCE
 import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR;
 import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.USER_COPROCESSORS_ENABLED_CONF_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import java.io.IOException;
 
 @Category({SmallTests.class})
 public class TestRegionCoprocessorHost {
+  private Configuration conf;
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestRegionCoprocessorHost.class);
 
-  @Test
-  public void testLoadDuplicateCoprocessor() throws Exception {
-    Configuration conf = HBaseConfiguration.create();
+  @Rule
+  public final TestName name = new TestName();
+  private RegionInfo regionInfo;
+  private HRegion region;
+  private RegionServerServices rsServices;
+  public static final int MAX_VERSIONS = 3;
+  public static final int MIN_VERSIONS = 2;
+  public static final int TTL = 1000;
+
+  @Before
+  public void setup() throws IOException {
+    conf = HBaseConfiguration.create();
     conf.setBoolean(COPROCESSORS_ENABLED_CONF_KEY, true);
     conf.setBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, true);
-    conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, true);
-    conf.set(REGION_COPROCESSOR_CONF_KEY, SimpleRegionObserver.class.getName());
-    TableName tableName = TableName.valueOf("testDoubleLoadingCoprocessor");
-    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
     // config a same coprocessor with system coprocessor
     TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
-        .setCoprocessor(SimpleRegionObserver.class.getName()).build();
-    HRegion region = mock(HRegion.class);
+      .setCoprocessor(SimpleRegionObserver.class.getName()).build();
+    region = mock(HRegion.class);
     when(region.getRegionInfo()).thenReturn(regionInfo);
     when(region.getTableDescriptor()).thenReturn(tableDesc);
-    RegionServerServices rsServices = mock(RegionServerServices.class);
+    rsServices = mock(RegionServerServices.class);
+  }
+  @Test
+  public void testLoadDuplicateCoprocessor() throws Exception {
+    conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, true);
+    conf.set(REGION_COPROCESSOR_CONF_KEY, SimpleRegionObserver.class.getName());
     RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
     // Only one coprocessor SimpleRegionObserver loaded
     assertEquals(1, host.coprocEnvironments.size());
@@ -74,4 +106,73 @@ public class TestRegionCoprocessorHost {
     // Two duplicate coprocessors loaded
     assertEquals(2, host.coprocEnvironments.size());
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testPreStoreScannerOpen() throws IOException {
+
+    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
+    Scan scan = new Scan();
+    scan.setTimeRange(TimeRange.INITIAL_MIN_TIMESTAMP, TimeRange.INITIAL_MAX_TIMESTAMP);
+    assertTrue("Scan is not for all time", scan.getTimeRange().isAllTime());
+    //SimpleRegionObserver is set to update the ScanInfo parameters if the passed-in scan
+    //is for all time. this lets us exercise both that the Scan is wired up properly in the coproc
+    //and that we can customize the metadata
+
+    ScanInfo oldScanInfo = getScanInfo();
+
+    HStore store = mock(HStore.class);
+    when(store.getScanInfo()).thenReturn(oldScanInfo);
+    ScanInfo newScanInfo = host.preStoreScannerOpen(store, scan);
+
+    verifyScanInfo(newScanInfo);
+  }
+
+  @Test
+  public void testPreCompactScannerOpen() throws IOException {
+    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
+    ScanInfo oldScanInfo = getScanInfo();
+    HStore store = mock(HStore.class);
+    when(store.getScanInfo()).thenReturn(oldScanInfo);
+    ScanInfo newScanInfo = host.preCompactScannerOpen(store, ScanType.COMPACT_DROP_DELETES,
+      mock(CompactionLifeCycleTracker.class), mock(CompactionRequest.class), mock(User.class));
+    verifyScanInfo(newScanInfo);
+  }
+
+  @Test
+  public void testPreFlushScannerOpen() throws IOException {
+    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
+    ScanInfo oldScanInfo = getScanInfo();
+    HStore store = mock(HStore.class);
+    when(store.getScanInfo()).thenReturn(oldScanInfo);
+    ScanInfo newScanInfo = host.preFlushScannerOpen(store, mock(FlushLifeCycleTracker.class));
+    verifyScanInfo(newScanInfo);
+  }
+
+  @Test
+  public void testPreMemStoreCompactionCompactScannerOpen() throws IOException {
+    RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
+    ScanInfo oldScanInfo = getScanInfo();
+    HStore store = mock(HStore.class);
+    when(store.getScanInfo()).thenReturn(oldScanInfo);
+    ScanInfo newScanInfo = host.preMemStoreCompactionCompactScannerOpen(store);
+    verifyScanInfo(newScanInfo);
+  }
+
+  private void verifyScanInfo(ScanInfo newScanInfo) {
+    assertEquals(KeepDeletedCells.TRUE, newScanInfo.getKeepDeletedCells());
+    assertEquals(MAX_VERSIONS, newScanInfo.getMaxVersions());
+    assertEquals(MIN_VERSIONS, newScanInfo.getMinVersions());
+    assertEquals(TTL, newScanInfo.getTtl());
+  }
+
+  private ScanInfo getScanInfo() {
+    int oldMaxVersions = 1;
+    int oldMinVersions = 0;
+    long oldTTL = 10000;
+
+    return new ScanInfo(conf, Bytes.toBytes("cf"), oldMinVersions, oldMaxVersions, oldTTL,
+    KeepDeletedCells.FALSE, HConstants.FOREVER, 1000,
+      CellComparator.getInstance(), true);
+  }
+
+}