You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/05/11 07:24:33 UTC
[hbase] branch master updated: HBASE-24321 - Add writable MinVersions and read-only Scan to coproc S… (#1655)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new ca81283 HBASE-24321 - Add writable MinVersions and read-only Scan to coproc S… (#1655)
ca81283 is described below
commit ca81283fe561499f119e5a0c5efa97947a6b2e26
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Mon May 11 00:24:22 2020 -0700
HBASE-24321 - Add writable MinVersions and read-only Scan to coproc S… (#1655)
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Lars Hofhansl <la...@apache.org>
Signed-off-by: Anoop Sam John <an...@gmail.com>
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../regionserver/CustomizedScanInfoBuilder.java | 40 ++++++-
.../apache/hadoop/hbase/regionserver/HStore.java | 2 +-
.../hbase/regionserver/RegionCoprocessorHost.java | 4 +-
.../apache/hadoop/hbase/regionserver/ScanInfo.java | 9 +-
.../hadoop/hbase/regionserver/ScanOptions.java | 15 ++-
.../hbase/coprocessor/SimpleRegionObserver.java | 36 ++++++
.../coprocessor/TestRegionCoprocessorHost.java | 123 +++++++++++++++++++--
7 files changed, 209 insertions(+), 20 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
index b791010..81a43c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -34,8 +36,22 @@ public class CustomizedScanInfoBuilder implements ScanOptions {
private KeepDeletedCells keepDeletedCells = null;
+ private Integer minVersions;
+
+ private final Scan scan;
+
public CustomizedScanInfoBuilder(ScanInfo scanInfo) {
this.scanInfo = scanInfo;
+ this.scan = new Scan();
+ }
+ public CustomizedScanInfoBuilder(ScanInfo scanInfo, Scan scan) {
+ this.scanInfo = scanInfo;
+ //copy the scan so no coproc using this ScanOptions can alter the "real" scan
+ try {
+ this.scan = new Scan(scan);
+ } catch (IOException e) {
+ throw new AssertionError("Scan should not throw IOException", e);
+ }
}
@Override
@@ -62,12 +78,13 @@ public class CustomizedScanInfoBuilder implements ScanOptions {
if (maxVersions == null && ttl == null && keepDeletedCells == null) {
return scanInfo;
}
- return scanInfo.customize(getMaxVersions(), getTTL(), getKeepDeletedCells());
+ return scanInfo.customize(getMaxVersions(), getTTL(), getKeepDeletedCells(), getMinVersions());
}
@Override
public String toString() {
- return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() + "]";
+ return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() +
+ ", KeepDeletedCells=" + getKeepDeletedCells() + ", MinVersions=" + getMinVersions() + "]";
}
@Override
@@ -80,4 +97,23 @@ public class CustomizedScanInfoBuilder implements ScanOptions {
return keepDeletedCells != null ? keepDeletedCells : scanInfo.getKeepDeletedCells();
}
+ @Override
+ public int getMinVersions() {
+ return minVersions != null ? minVersions : scanInfo.getMinVersions();
+ }
+
+ @Override
+ public void setMinVersions(int minVersions) {
+ this.minVersions = minVersions;
+ }
+
+ @Override
+ public Scan getScan() {
+ try {
+ return new Scan(scan);
+ } catch(IOException e) {
+ throw new AssertionError("Scan should not throw IOException anymore", e);
+ }
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f853155..ae37e85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2128,7 +2128,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
try {
ScanInfo scanInfo;
if (this.getCoprocessorHost() != null) {
- scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this);
+ scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan);
} else {
scanInfo = getScanInfo();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 1c41a64..af1478f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1625,9 +1625,9 @@ public class RegionCoprocessorHost
/**
* Called before open store scanner for user scan.
*/
- public ScanInfo preStoreScannerOpen(HStore store) throws IOException {
+ public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
if (coprocEnvironments.isEmpty()) return store.getScanInfo();
- CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+ CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
execOperation(new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
index 2fde311..7d61618 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
@@ -174,8 +174,13 @@ public class ScanInfo {
* Used for CP users for customizing max versions, ttl and keepDeletedCells.
*/
ScanInfo customize(int maxVersions, long ttl, KeepDeletedCells keepDeletedCells) {
+ return customize(maxVersions, ttl, keepDeletedCells, minVersions);
+ }
+
+ ScanInfo customize(int maxVersions, long ttl, KeepDeletedCells keepDeletedCells,
+ int minVersions) {
return new ScanInfo(family, minVersions, maxVersions, ttl, keepDeletedCells, timeToPurgeDeletes,
- comparator, tableMaxRowSize, usePread, cellsPerTimeoutCheck, parallelSeekEnabled,
- preadMaxBytes, newVersionBehavior);
+ comparator, tableMaxRowSize, usePread, cellsPerTimeoutCheck, parallelSeekEnabled,
+ preadMaxBytes, newVersionBehavior);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
index aca857a..e81f3ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -26,8 +27,9 @@ import org.apache.yetus.audience.InterfaceStability;
* This class gives you the ability to change the max versions and TTL options before opening a
* scanner for a Store. And also gives you some information for the scan.
* <p>
- * Changing max versions and TTL are usually safe even for flush/compaction, so here we provide a
- * way to do it for you. If you want to do other complicated stuffs such as filtering, please wrap
+ * Changing max versions, min versins, KeepDeletedCells, and TTL are usually safe even
+ * for flush/compaction, so here we provide a way to do it for you. If you want to do other
+ * complicated operations such as filtering, please wrap
* the {@link InternalScanner} in the {@code preCompact} and {@code preFlush} methods in
* {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}.
* <p>
@@ -64,4 +66,13 @@ public interface ScanOptions {
void setKeepDeletedCells(KeepDeletedCells keepDeletedCells);
KeepDeletedCells getKeepDeletedCells();
+
+ int getMinVersions();
+
+ void setMinVersions(int minVersions);
+
+ /**
+ * Returns a copy of the Scan object. Modifying it will have no effect.
+ */
+ Scan getScan();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index 523466d..ef62901 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -686,6 +688,40 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
}
@Override
+ public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ Store store, ScanOptions options) throws IOException {
+ if (options.getScan().getTimeRange().isAllTime()) {
+ setScanOptions(options);
+ }
+ }
+
+ @Override
+ public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
+ setScanOptions(options);
+ }
+
+ public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ ScanOptions options,FlushLifeCycleTracker tracker) throws IOException {
+ setScanOptions(options);
+ }
+
+ public void preMemStoreCompactionCompactScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
+ throws IOException {
+ setScanOptions(options);
+ }
+
+ private void setScanOptions(ScanOptions options) {
+ options.setMaxVersions(TestRegionCoprocessorHost.MAX_VERSIONS);
+ options.setMinVersions(TestRegionCoprocessorHost.MIN_VERSIONS);
+ options.setKeepDeletedCells(KeepDeletedCells.TRUE);
+ options.setTTL(TestRegionCoprocessorHost.TTL);
+ }
+
+
+ @Override
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
WALKey key, WALEdit edit) throws IOException {
ctPreWALAppend.incrementAndGet();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorHost.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorHost.java
index ee6e216..423a412 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorHost.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorHost.java
@@ -22,48 +22,80 @@ import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCE
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR;
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.USER_COPROCESSORS_ENABLED_CONF_KEY;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import java.io.IOException;
@Category({SmallTests.class})
public class TestRegionCoprocessorHost {
+ private Configuration conf;
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionCoprocessorHost.class);
- @Test
- public void testLoadDuplicateCoprocessor() throws Exception {
- Configuration conf = HBaseConfiguration.create();
+ @Rule
+ public final TestName name = new TestName();
+ private RegionInfo regionInfo;
+ private HRegion region;
+ private RegionServerServices rsServices;
+ public static final int MAX_VERSIONS = 3;
+ public static final int MIN_VERSIONS = 2;
+ public static final int TTL = 1000;
+
+ @Before
+ public void setup() throws IOException {
+ conf = HBaseConfiguration.create();
conf.setBoolean(COPROCESSORS_ENABLED_CONF_KEY, true);
conf.setBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, true);
- conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, true);
- conf.set(REGION_COPROCESSOR_CONF_KEY, SimpleRegionObserver.class.getName());
- TableName tableName = TableName.valueOf("testDoubleLoadingCoprocessor");
- RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
// config a same coprocessor with system coprocessor
TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
- .setCoprocessor(SimpleRegionObserver.class.getName()).build();
- HRegion region = mock(HRegion.class);
+ .setCoprocessor(SimpleRegionObserver.class.getName()).build();
+ region = mock(HRegion.class);
when(region.getRegionInfo()).thenReturn(regionInfo);
when(region.getTableDescriptor()).thenReturn(tableDesc);
- RegionServerServices rsServices = mock(RegionServerServices.class);
+ rsServices = mock(RegionServerServices.class);
+ }
+ @Test
+ public void testLoadDuplicateCoprocessor() throws Exception {
+ conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, true);
+ conf.set(REGION_COPROCESSOR_CONF_KEY, SimpleRegionObserver.class.getName());
RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
// Only one coprocessor SimpleRegionObserver loaded
assertEquals(1, host.coprocEnvironments.size());
@@ -74,4 +106,73 @@ public class TestRegionCoprocessorHost {
// Two duplicate coprocessors loaded
assertEquals(2, host.coprocEnvironments.size());
}
-}
\ No newline at end of file
+
+ @Test
+ public void testPreStoreScannerOpen() throws IOException {
+
+ RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
+ Scan scan = new Scan();
+ scan.setTimeRange(TimeRange.INITIAL_MIN_TIMESTAMP, TimeRange.INITIAL_MAX_TIMESTAMP);
+ assertTrue("Scan is not for all time", scan.getTimeRange().isAllTime());
+ //SimpleRegionObserver is set to update the ScanInfo parameters if the passed-in scan
+ //is for all time. this lets us exercise both that the Scan is wired up properly in the coproc
+ //and that we can customize the metadata
+
+ ScanInfo oldScanInfo = getScanInfo();
+
+ HStore store = mock(HStore.class);
+ when(store.getScanInfo()).thenReturn(oldScanInfo);
+ ScanInfo newScanInfo = host.preStoreScannerOpen(store, scan);
+
+ verifyScanInfo(newScanInfo);
+ }
+
+ @Test
+ public void testPreCompactScannerOpen() throws IOException {
+ RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
+ ScanInfo oldScanInfo = getScanInfo();
+ HStore store = mock(HStore.class);
+ when(store.getScanInfo()).thenReturn(oldScanInfo);
+ ScanInfo newScanInfo = host.preCompactScannerOpen(store, ScanType.COMPACT_DROP_DELETES,
+ mock(CompactionLifeCycleTracker.class), mock(CompactionRequest.class), mock(User.class));
+ verifyScanInfo(newScanInfo);
+ }
+
+ @Test
+ public void testPreFlushScannerOpen() throws IOException {
+ RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
+ ScanInfo oldScanInfo = getScanInfo();
+ HStore store = mock(HStore.class);
+ when(store.getScanInfo()).thenReturn(oldScanInfo);
+ ScanInfo newScanInfo = host.preFlushScannerOpen(store, mock(FlushLifeCycleTracker.class));
+ verifyScanInfo(newScanInfo);
+ }
+
+ @Test
+ public void testPreMemStoreCompactionCompactScannerOpen() throws IOException {
+ RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
+ ScanInfo oldScanInfo = getScanInfo();
+ HStore store = mock(HStore.class);
+ when(store.getScanInfo()).thenReturn(oldScanInfo);
+ ScanInfo newScanInfo = host.preMemStoreCompactionCompactScannerOpen(store);
+ verifyScanInfo(newScanInfo);
+ }
+
+ private void verifyScanInfo(ScanInfo newScanInfo) {
+ assertEquals(KeepDeletedCells.TRUE, newScanInfo.getKeepDeletedCells());
+ assertEquals(MAX_VERSIONS, newScanInfo.getMaxVersions());
+ assertEquals(MIN_VERSIONS, newScanInfo.getMinVersions());
+ assertEquals(TTL, newScanInfo.getTtl());
+ }
+
+ private ScanInfo getScanInfo() {
+ int oldMaxVersions = 1;
+ int oldMinVersions = 0;
+ long oldTTL = 10000;
+
+ return new ScanInfo(conf, Bytes.toBytes("cf"), oldMinVersions, oldMaxVersions, oldTTL,
+ KeepDeletedCells.FALSE, HConstants.FOREVER, 1000,
+ CellComparator.getInstance(), true);
+ }
+
+}