You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ni...@apache.org on 2021/08/21 02:23:11 UTC
[hbase] branch HBASE-25714 updated: HBASE-26089 Support
RegionCoprocessor on CompactionServer (#3580)
This is an automated email from the ASF dual-hosted git repository.
niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-25714 by this push:
new 20b7c8b HBASE-26089 Support RegionCoprocessor on CompactionServer (#3580)
20b7c8b is described below
commit 20b7c8b286cf43d14928687bbbb4f510ff5f96d6
Author: niuyulin <yu...@gmail.com>
AuthorDate: Sat Aug 21 10:22:26 2021 +0800
HBASE-26089 Support RegionCoprocessor on CompactionServer (#3580)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../org/apache/hadoop/hbase/AbstractServer.java | 18 +
.../java/org/apache/hadoop/hbase/ServerType.java | 29 +
.../compactionserver/CompactionThreadManager.java | 17 +-
.../hbase/compactionserver/HCompactionServer.java | 23 +-
.../RegionCompactionCoprocessorHost.java | 773 +++++++++++++++++++++
.../hadoop/hbase/coprocessor/CoprocessorHost.java | 16 +-
.../coprocessor/RegionCoprocessorEnvironment.java | 12 +
.../coprocessor/RegionCoprocessorService.java | 33 +
.../apache/hadoop/hbase/regionserver/HRegion.java | 57 +-
.../apache/hadoop/hbase/regionserver/HStore.java | 20 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 4 +-
.../hbase/regionserver/RegionCoprocessorHost.java | 25 +-
.../hbase/regionserver/RegionServerServices.java | 5 +-
.../hadoop/hbase/MockRegionServerServices.java | 5 +
.../compactionserver/TestCompactionServer.java | 102 +--
.../compactionserver/TestCompactionServerBase.java | 119 ++++
.../TestRegionCoprocessorOnCompactionServer.java | 305 ++++++++
.../hadoop/hbase/master/MockRegionServer.java | 7 +
18 files changed, 1420 insertions(+), 150 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
index 3e64c85..e59d8b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase;
+import static org.apache.hadoop.hbase.compactionserver.HCompactionServer.COMPACTIONSERVER;
+import static org.apache.hadoop.hbase.master.HMaster.MASTER;
+import static org.apache.hadoop.hbase.regionserver.HRegionServer.REGIONSERVER;
+
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -431,4 +435,18 @@ public abstract class AbstractServer extends Thread implements Server {
protected abstract AbstractRpcServices getRpcService();
protected abstract String getProcessName();
+
+ public ServerType getServerType() {
+ String processName = getProcessName();
+ if (processName.equals(MASTER)) {
+ return ServerType.Master;
+ }
+ if (processName.equals(REGIONSERVER)) {
+ return ServerType.RegionServer;
+ }
+ if (processName.equals(COMPACTIONSERVER)) {
+ return ServerType.CompactionServer;
+ }
+ return ServerType.ReplicationServer;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerType.java
new file mode 100644
index 0000000..dbd8269
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Enum describing Server Type
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+public enum ServerType {
+ Master, RegionServer, CompactionServer, ReplicationServer
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
index f5fcfe2..9d67e29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -240,7 +239,7 @@ public class CompactionThreadManager implements ThroughputControllerService {
excludeFiles.addAll(compactedFiles);
// Convert files names to store files
status.setStatus("Convert current compacting and compacted files to store files");
- List<HStoreFile> excludeStoreFiles = getExcludedStoreFiles(store, excludeFiles);
+ List<HStoreFile> excludeStoreFiles = store.getStoreFilesBaseOnFileNames(excludeFiles);
LOG.info(
"Start select store: {}, excludeFileNames: {}, excluded: {}, compacting: {}, compacted: {}",
logStr, excludeFiles.size(), excludeStoreFiles.size(), compactingFiles.size(),
@@ -354,23 +353,11 @@ public class CompactionThreadManager implements ThroughputControllerService {
}
}
- private List<HStoreFile> getExcludedStoreFiles(HStore store, Set<String> excludeFileNames) {
- Collection<HStoreFile> storefiles = store.getStorefiles();
- List<HStoreFile> storeFiles = new ArrayList<>();
- for (HStoreFile storefile : storefiles) {
- String name = storefile.getPath().getName();
- if (excludeFileNames.contains(name)) {
- storeFiles.add(storefile);
- }
- }
- return storeFiles;
- }
-
private HStore getStore(final Configuration conf, final FileSystem fs, final Path rootDir,
final TableDescriptor htd, final RegionInfo hri, final String familyName) throws IOException {
HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs,
CommonFSUtils.getTableDir(rootDir, htd.getTableName()), hri);
- HRegion region = new HRegion(regionFs, null, conf, htd, null);
+ HRegion region = new HRegion(regionFs, conf, htd, server);
ColumnFamilyDescriptor columnFamilyDescriptor = htd.getColumnFamily(Bytes.toBytes(familyName));
HStore store;
if (columnFamilyDescriptor.isMobEnabled()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
index f78e58a..578925f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
@@ -28,9 +29,12 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorService;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.Superusers;
@@ -54,7 +58,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusP
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerStatusService;
@InterfaceAudience.Private
-public class HCompactionServer extends AbstractServer {
+public class HCompactionServer extends AbstractServer implements RegionCoprocessorService {
/** compaction server process name */
public static final String COMPACTIONSERVER = "compactionserver";
@@ -312,4 +316,21 @@ public class HCompactionServer extends AbstractServer {
}
}
+ @Override
+ public HRegion getRegion(final String encodedRegionName) {
+ throw new UnsupportedOperationException(
+ "Method getRegion is not supported in HCompactionServer");
+ }
+
+ @Override
+ public List<HRegion> getRegions(TableName tableName) {
+ throw new UnsupportedOperationException(
+ "Method getRegions is not supported in HCompactionServer");
+ }
+
+ @Override
+ public List<HRegion> getRegions() {
+ throw new UnsupportedOperationException(
+ "Method getRegions is not supported in HCompactionServer");
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/RegionCompactionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/RegionCompactionCoprocessorHost.java
new file mode 100644
index 0000000..4057c59
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/RegionCompactionCoprocessorHost.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.lang.reflect.Method;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.RawCellBuilder;
+import org.apache.hadoop.hbase.RawCellBuilderFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SharedConnection;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorService;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.OnlineRegions;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.Region.Operation;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.Service;
+
+/**
+ * 1.Inherited from {@link RegionCoprocessorHost}, only be used on CompactionServer. This host only
+ * load coprocessor involves compaction.
+ * 2.Other methods of the host, like preFlush,postFlush,prePut, postPut, etc will be not supported.
+ * 3.Four methods: preOpen, postOpen, preClose, postClose are overridden as blank implementations.
+ * 4.The methods preCompactSelection, postCompactSelection, preCompactScannerOpen, preCompact,
+ * postCompact, preStoreFileReaderOpen, postStoreFileReaderOpen, postInstantiateDeleteTracker
+ * will be retained.
+ */
+@InterfaceAudience.Private
+public class RegionCompactionCoprocessorHost extends RegionCoprocessorHost {
+ private static final Logger LOG = LoggerFactory.getLogger(RegionCompactionCoprocessorHost.class);
+ // postCompact will be executed on HRegionServer, so we don't check here
+ private static final Set<String> compactionCoprocessor = ImmutableSet.of("preCompactSelection",
+ "postCompactSelection", "preCompactScannerOpen", "preCompact", "preStoreFileReaderOpen",
+ "postStoreFileReaderOpen", "postInstantiateDeleteTracker");
+
+ private boolean IsCompactionRelatedCoprocessor(Class<?> cpClass) {
+ while (cpClass != null) {
+ for (Method method : cpClass.getDeclaredMethods()) {
+ if (compactionCoprocessor.contains(method.getName())) {
+ return true;
+ }
+ }
+ cpClass = cpClass.getSuperclass();
+ }
+ return false;
+ }
+
+ /**
+ * The environment will only be used on compactionServer for NotCoreCoprocessor, and the method
+ * getRegion, getOnlineRegions, getSharedData will not be supported.
+ */
+ private static class RegionCompactionEnvironment extends RegionCoprocessorHost.RegionEnvironment {
+
+ /**
+ * Constructor
+ * @param impl the coprocessor instance
+ * @param priority chaining priority
+ */
+ public RegionCompactionEnvironment(final RegionCoprocessor impl, final int priority,
+ final int seq, final Configuration conf, final Region region,
+ final RegionCoprocessorService services) {
+ super(impl, priority, seq, conf, region, services, null);
+ }
+
+ /** @return the region */
+ @Override
+ public Region getRegion() {
+ throw new UnsupportedOperationException(
+ "Method getRegion is not supported when loaded CP on compactionServer");
+ }
+
+ @Override
+ public OnlineRegions getOnlineRegions() {
+ throw new UnsupportedOperationException(
+ "Method getOnlineRegions is not supported when loaded CP on compactionServer");
+ }
+
+ @Override
+ public Connection getConnection() {
+ // Mocks may have services as null at test time.
+ return services != null ? new SharedConnection(services.getConnection()) : null;
+ }
+
+ @Override
+ public Connection createConnection(Configuration conf) throws IOException {
+ return services != null ? this.services.createConnection(conf) : null;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return services != null? services.getServerName(): null;
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ MetricsCoprocessor.removeRegistry(this.metricRegistry);
+ }
+
+ @Override
+ public ConcurrentMap<String, Object> getSharedData() {
+ throw new UnsupportedOperationException(
+ "Method getSharedData is not supported when loaded CP on compactionServer");
+ }
+
+ @Override
+ public RegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public MetricRegistry getMetricRegistryForRegionServer() {
+ return metricRegistry;
+ }
+
+ @Override
+ public RawCellBuilder getCellBuilder() {
+ // We always do a DEEP_COPY only
+ return RawCellBuilderFactory.create();
+ }
+ }
+
+ /**
+ * Constructor
+ * @param region the region
+ * @param coprocessorService interface to available RegionServer/CompactionServer functionality
+ * @param conf the configuration
+ */
+ public RegionCompactionCoprocessorHost(final HRegion region,
+ final RegionCoprocessorService coprocessorService, final Configuration conf) {
+ super(region, coprocessorService, conf);
+ }
+
+ @Override
+ public RegionCompactionEnvironment createEnvironment(RegionCoprocessor instance, int priority,
+ int seq, Configuration conf) {
+ if (instance.getClass().isAnnotationPresent(CoreCoprocessor.class)) {
+ LOG.info("skip load core coprocessor {} on CompactionServer", instance.getClass().getName());
+ return null;
+ }
+ if (!IsCompactionRelatedCoprocessor(instance.getClass())) {
+ LOG.info("skip load compaction no-related coprocessor {} on CompactionServer",
+ instance.getClass().getName());
+ return null;
+ }
+ // If coprocessor exposes any services, register them.
+ for (Service service : instance.getServices()) {
+ region.registerService(service);
+ }
+ return new RegionCompactionEnvironment(instance, priority, seq, conf, region, rsServices);
+ }
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // Observer operations
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Invoked before a region open.
+ * For invoke compact, we will open fake region on compaction server.
+ * So implement preOpen as blank
+ *
+ */
+ public void preOpen() throws IOException {
+ }
+
+ /**
+ * Invoked after a region open
+ * For invoke compact, we will open fake region on compaction server.
+ * So implement postOpen as blank
+ */
+ public void postOpen() {
+ }
+
+ /**
+ * Invoked before a region is closed
+ * For invoke compact, we will open fake region on compaction server.
+ * So implement preClose as blank
+ */
+ public void preClose(final boolean abortRequested) throws IOException {
+ }
+
+ /**
+ * Invoked after a region is closed
+ * For invoke compact, we will open fake region on compaction server.
+ * So implement postClose as blank
+ */
+ public void postClose(final boolean abortRequested) {
+ }
+
+ /**
+ * Invoked before create StoreScanner for flush.
+ */
+ public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preFlushScannerOpen is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Invoked before a memstore flush
+ * @return Scanner to use (cannot be null!)
+ * @throws IOException
+ */
+ public InternalScanner preFlush(HStore store, InternalScanner scanner,
+ FlushLifeCycleTracker tracker) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preFlush is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Invoked before a memstore flush
+ * @throws IOException
+ */
+ public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preFlush is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Invoked after a memstore flush
+ * @throws IOException
+ */
+ public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postFlush is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Invoked before in memory compaction.
+ */
+ public void preMemStoreCompaction(HStore store) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preMemStoreCompaction is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Invoked before create StoreScanner for in memory compaction.
+ */
+ public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
+ throw new UnsupportedOperationException("Method preMemStoreCompactionCompactScannerOpen "
+ + "is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Invoked before compacting memstore.
+ */
+ public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preMemStoreCompactionCompact is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Invoked after in memory compaction.
+ */
+ public void postMemStoreCompaction(HStore store) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postMemStoreCompaction is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Invoked after a memstore flush
+ * @throws IOException
+ */
+ public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postFlush is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param get the Get request
+ * @param results What to return if return is true/'bypass'.
+ * @return true if default processing should be bypassed.
+ * @exception IOException Exception
+ */
+ public boolean preGet(final Get get, final List<Cell> results) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preGet is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param get the Get request
+ * @param results the result set
+ * @exception IOException Exception
+ */
+ public void postGet(final Get get, final List<Cell> results) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postGet is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param get the Get request
+ * @return true or false to return to client if bypassing normal operation, or null otherwise
+ * @exception IOException Exception
+ */
+ public Boolean preExists(final Get get) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preExists is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param get the Get request
+ * @param result the result returned by the region server
+ * @return the result to return to the client
+ * @exception IOException Exception
+ */
+ public boolean postExists(final Get get, boolean result)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postExists is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param put The Put object
+ * @param edit The WALEdit object.
+ * @return true if default processing should be bypassed
+ * @exception IOException Exception
+ */
+ public boolean prePut(final Put put, final WALEdit edit) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method prePut is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param mutation - the current mutation
+ * @param kv - the current cell
+ * @param byteNow - current timestamp in bytes
+ * @param get - the get that could be used
+ * Note that the get only does not specify the family and qualifier that should be used
+ * @return true if default processing should be bypassed
+ * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single
+ * Coprocessor for its needs only. Will be removed.
+ */
+ @Deprecated
+ public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv,
+ final byte[] byteNow, final Get get) throws IOException {
+ throw new UnsupportedOperationException("Method prePrepareTimeStampForDeleteVersion is "
+ + "not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param put The Put object
+ * @param edit The WALEdit object.
+ * @exception IOException Exception
+ */
+ public void postPut(final Put put, final WALEdit edit) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postPut is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param delete The Delete object
+ * @param edit The WALEdit object.
+ * @return true if default processing should be bypassed
+ * @exception IOException Exception
+ */
+ public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preDelete is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param delete The Delete object
+ * @param edit The WALEdit object.
+ * @exception IOException Exception
+ */
+ public void postDelete(final Delete delete, final WALEdit edit) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postDelete is not supported when loaded CP on compactionServer");
+ }
+
+ public void preBatchMutate(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preBatchMutate is not supported when loaded CP on compactionServer");
+ }
+
+ public void postBatchMutate(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postBatchMutate is not supported when loaded CP on compactionServer");
+ }
+
+ public void postBatchMutateIndispensably(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postBatchMutateIndispensably is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param checkAndMutate the CheckAndMutate object
+ * @return true or false to return to client if default processing should be bypassed, or null
+ * otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preCheckAndMutate is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param checkAndMutate the CheckAndMutate object
+ * @return true or false to return to client if default processing should be bypassed, or null
+ * otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preCheckAndMutateAfterRowLock is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param checkAndMutate the CheckAndMutate object
+ * @param result the result returned by the checkAndMutate
+ * @return true or false to return to client if default processing should be bypassed, or null
+ * otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
+ CheckAndMutateResult result) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postCheckAndMutate is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param append append object
+ * @param edit The WALEdit object.
+ * @return result to return to client if default operation should be bypassed, null otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Result preAppend(final Append append, final WALEdit edit) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preAppend is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param append append object
+ * @return result to return to client if default operation should be bypassed, null otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Result preAppendAfterRowLock(final Append append) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preAppendAfterRowLock is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param increment increment object
+ * @param edit The WALEdit object.
+ * @return result to return to client if default operation should be bypassed, null otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preIncrement is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param increment increment object
+ * @return result to return to client if default operation should be bypassed, null otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preIncrementAfterRowLock is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param append Append object
+ * @param result the result returned by the append
+ * @param edit The WALEdit object.
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Result postAppend(final Append append, final Result result, final WALEdit edit)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postAppend is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param increment increment object
+ * @param result the result returned by postIncrement
+ * @param edit The WALEdit object.
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Result postIncrement(final Increment increment, Result result, final WALEdit edit)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postIncrement is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param scan the Scan specification
+ * @exception IOException Exception
+ */
+ public void preScannerOpen(final Scan scan) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preScannerOpen is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param scan the Scan specification
+ * @param s the scanner
+ * @return the scanner instance to use
+ * @exception IOException Exception
+ */
+ public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postScannerOpen is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param s the scanner
+ * @param results the result set returned by the region server
+ * @param limit the maximum number of results to return
+ * @return 'has next' indication to client if bypassing default behavior, or null otherwise
+ * @exception IOException Exception
+ */
+ public Boolean preScannerNext(final InternalScanner s,
+ final List<Result> results, final int limit) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preScannerNext is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param s the scanner
+ * @param results the result set returned by the region server
+ * @param limit the maximum number of results to return
+ * @param hasMore
+ * @return 'has more' indication to give to client
+ * @exception IOException Exception
+ */
+ public boolean postScannerNext(final InternalScanner s,
+ final List<Result> results, final int limit, boolean hasMore)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postScannerNext is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * This will be called by the scan flow when the current scanned row is being filtered out by the
+ * filter.
+ * @param s the scanner
+ * @param curRowCell The cell in the current row which got filtered out
+ * @return whether more rows are available for the scanner or not
+ * @throws IOException
+ */
+ public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postScannerFilterRow is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @param s the scanner
+ * @return true if default behavior should be bypassed, false otherwise
+ * @exception IOException Exception
+ */
+ public boolean preScannerClose(final InternalScanner s) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preScannerClose is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @exception IOException Exception
+ */
+ public void postScannerClose(final InternalScanner s) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postScannerClose is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Called before open store scanner for user scan.
+ */
+ public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postScannerClose is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param info the RegionInfo for this region
+ * @param edits the file of recovered edits
+ */
+ public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preReplayWALs is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param info the RegionInfo for this region
+ * @param edits the file of recovered edits
+ * @throws IOException Exception
+ */
+ public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postReplayWALs is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * Supports Coprocessor 'bypass'.
+ * @return true if default behavior should be bypassed, false otherwise
+ * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
+ * with something that doesn't expose IntefaceAudience.Private classes.
+ */
+ @Deprecated
+ public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preWALRestore is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
+ * with something that doesn't expose IntefaceAudience.Private classes.
+ */
+ @Deprecated
+ public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postWALRestore is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param familyPaths pairs of { CF, file path } submitted for bulk load
+ */
+ public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preBulkLoadHFile is not supported when loaded CP on compactionServer");
+ }
+
+ public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preCommitStoreFile is not supported when loaded CP on compactionServer");
+ }
+
+ public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postCommitStoreFile is not supported when loaded CP on compactionServer");
+ }
+
+ /**
+ * @param familyPaths pairs of { CF, file path } submitted for bulk load
+ * @param map Map of CF to List of file paths for the final loaded files
+ * @throws IOException
+ */
+ public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
+ Map<byte[], List<Path>> map) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postBulkLoadHFile is not supported when loaded CP on compactionServer");
+ }
+
+ public void postStartRegionOperation(final Operation op) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postStartRegionOperation is not supported when loaded CP on compactionServer");
+ }
+
+ public void postCloseRegionOperation(final Operation op) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postCloseRegionOperation is not supported when loaded CP on compactionServer");
+ }
+
+ public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
+ final List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postIncrementBeforeWAL is not supported when loaded CP on compactionServer");
+ }
+
+ public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
+ final List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postAppendBeforeWAL is not supported when loaded CP on compactionServer");
+ }
+
+ public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preWALAppend is not supported when loaded CP on compactionServer");
+ }
+
+ public Message preEndpointInvocation(final Service service, final String methodName,
+ Message request) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preEndpointInvocation is not supported when loaded CP on compactionServer");
+ }
+
+ public void postEndpointInvocation(final Service service, final String methodName,
+ final Message request, final Message.Builder responseBuilder) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method postEndpointInvocation is not supported when loaded CP on compactionServer");
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // BulkLoadObserver hooks
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ public void prePrepareBulkLoad(User user) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method prePrepareBulkLoad is not supported when loaded CP on compactionServer");
+ }
+
+ public void preCleanupBulkLoad(User user) throws IOException {
+ throw new UnsupportedOperationException(
+ "Method preCleanupBulkLoad is not supported when loaded CP on compactionServer");
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index 4425076..414c6e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -255,7 +255,9 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
public void load(Class<? extends C> implClass, int priority, Configuration conf)
throws IOException {
E env = checkAndLoadInstance(implClass, priority, conf);
- coprocEnvironments.add(env);
+ if (env != null) {
+ coprocEnvironments.add(env);
+ }
}
/**
@@ -279,11 +281,13 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
}
// create the environment
E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf);
- assert env instanceof BaseEnvironment;
- ((BaseEnvironment<C>) env).startup();
- // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
- // if server (master or regionserver) aborts.
- coprocessorNames.add(implClass.getName());
+ if (env != null) {
+ assert env instanceof BaseEnvironment;
+ ((BaseEnvironment<C>) env).startup();
+ // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
+ // if server (master or regionserver) aborts.
+ coprocessorNames.add(implClass.getName());
+ }
return env;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
index 84e6d25..bb72559 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.RawCellBuilder;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ServerType;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
@@ -130,4 +131,15 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment<Reg
* @return the RawCellBuilder
*/
RawCellBuilder getCellBuilder();
+
+ /**
+ * Provide server type to let users know the current context, it is on region server or compaction
+ * server or replication server, then user could choose different implementations.For example, on
+ * compaction server, you can not get the region instance, so if you want to get something from a
+ * region, maybe you should go with getConnection(), to communicate with the actual region
+ * instance through rpc. But if you are on region server, then you can just use getRegion or
+ * getOnlineRegions to get the region instance directly
+ */
+ ServerType getServerType();
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorService.java
new file mode 100644
index 0000000..1a1616b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorService.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.AbstractServer;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerType;
+import org.apache.hadoop.hbase.regionserver.OnlineRegions;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface RegionCoprocessorService extends Server, OnlineRegions {
+ /**
+ * The method is used by {@link RegionCoprocessorEnvironment#getServerType()}. HRegionServer and
+ * HCompactionServer extends {@link AbstractServer#getServerType()} to implement this method.
+ */
+ ServerType getServerType();
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9da7f7c..5985160 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -115,6 +115,8 @@ import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.compactionserver.HCompactionServer;
+import org.apache.hadoop.hbase.compactionserver.RegionCompactionCoprocessorHost;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -418,7 +420,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final CellComparator cellComparator;
/**
- * @return The smallest mvcc readPoint across all the scanners in this
+ * Get the smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
* read operation.
*/
@@ -731,10 +733,53 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
@Deprecated
public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
- final Configuration confParam, final RegionInfo regionInfo,
- final TableDescriptor htd, final RegionServerServices rsServices) {
- this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
- wal, confParam, htd, rsServices);
+ final Configuration confParam, final RegionInfo regionInfo, final TableDescriptor htd,
+ final RegionServerServices rsServices) {
+ this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo), wal, confParam, htd,
+ rsServices);
+ }
+
+ /**
+ * HRegion constructor. This constructor should only be used for compaction offload.
+ * {@link org.apache.hadoop.hbase.compactionserver.CompactionThreadManager#getStore }
+ */
+ public HRegion(final HRegionFileSystem fs, final Configuration confParam,
+ final TableDescriptor htd, final HCompactionServer csServices) {
+ if (htd == null) {
+ throw new IllegalArgumentException("Need table descriptor");
+ }
+ if (confParam instanceof CompoundConfiguration) {
+ throw new IllegalArgumentException("Need original base configuration");
+ }
+ this.fs = fs;
+ this.baseConf = confParam;
+ this.conf = new CompoundConfiguration().add(confParam).addBytesMap(htd.getValues());
+ this.cellComparator = htd.isMetaTable()
+ || conf.getBoolean(USE_META_CELL_COMPARATOR, DEFAULT_USE_META_CELL_COMPARATOR)
+ ? MetaCellComparator.META_COMPARATOR
+ : CellComparatorImpl.COMPARATOR;
+ this.regionServicesForStores = new RegionServicesForStores(this, null);
+ this.htableDescriptor = htd;
+ this.scannerReadPoints = new ConcurrentHashMap<>();
+ this.mvcc = new MultiVersionConcurrencyControl(getRegionInfo().getShortNameToLog());
+ this.coprocessorHost = new RegionCompactionCoprocessorHost(this, csServices, conf);
+ // we must initialize these final variable below
+ this.isLoadingCfsOnDemandDefault = true;
+ this.wal = null;
+ this.timestampSlop = HConstants.LATEST_TIMESTAMP;
+ this.lock = null;
+ this.regionLockHolders = null;
+ this.metricsRegion = null;
+ this.metricsRegionWrapper = null;
+ this.regionDurability = null;
+ this.regionStatsEnabled = false;
+ this.storeHotnessProtector = null;
+ this.rowLockWaitDuration = DEFAULT_ROWLOCK_WAIT_DURATION;
+ this.busyWaitDuration = DEFAULT_BUSY_WAIT_DURATION;
+ this.maxCellSize = DEFAULT_MAX_CELL_SIZE;
+ this.miniBatchSize = DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE;
+ this.maxBusyWaitMultiplier = 2;
+ this.maxBusyWaitDuration = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
}
/**
@@ -5149,9 +5194,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Check the collection of families for valid timestamps
- * @param familyMap
* @param now current timestamp
- * @throws FailedSanityCheckException
*/
public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
throws FailedSanityCheckException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index cacea25..ad28dc1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1541,8 +1541,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
}
- protected boolean completeCompaction(CompactionRequestImpl cr, List<String> filesToCompact,
- User user, List<String> newFiles) throws IOException {
+ protected boolean completeCompaction(List<String> filesToCompact, User user,
+ List<String> newFiles) throws IOException {
Collection<HStoreFile> selectedStoreFiles = new ArrayList<>();
for (String selectedFile : filesToCompact) {
HStoreFile storeFile = getStoreFileBasedOnFileName(selectedFile);
@@ -1559,6 +1559,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
for (String newFile : newFiles) {
newFilePaths.add(new Path(storeTmpDir, newFile));
}
+ CompactionRequestImpl cr = new CompactionRequestImpl(selectedStoreFiles);
+ cr.setIsMajor(getForceMajor(), getForceMajor());
completeCompaction(cr, selectedStoreFiles, user, newFilePaths);
return true;
}
@@ -1574,17 +1576,19 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
return null;
}
- private HStoreFile getStoreFile(Path path) {
- for (HStoreFile storefile : getStorefiles()) {
- if (storefile.getPath().equals(path)) {
- return storefile;
+ public List<HStoreFile> getStoreFilesBaseOnFileNames(Collection<String> fileNames) {
+ List<HStoreFile> storeFiles = new ArrayList<>();
+ for (HStoreFile storeFile : getStorefiles()) {
+ String name = storeFile.getPath().getName();
+ if (fileNames.contains(name)) {
+ storeFiles.add(storeFile);
}
}
- return null;
+ return storeFiles;
}
private synchronized List<HStoreFile> completeCompaction(CompactionRequestImpl cr,
- Collection<HStoreFile> filesToCompact, User user, List<Path> newFiles) throws IOException {
+ Collection<HStoreFile> filesToCompact, User user, List<Path> newFiles) throws IOException {
// TODO check store contains files to compact
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 3910ab4..53658ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3787,8 +3787,8 @@ public class RSRpcServices extends AbstractRpcServices implements
List<String> newFiles = request.getNewFilesList().stream().collect(Collectors.toList());
try {
// TODO: If we could write HFile directly into the data directory, here the completion
- // will be easier
- success = store.completeCompaction(null, selectedFiles, null, newFiles);
+ // will be easier
+ success = store.completeCompaction(selectedFiles, null, newFiles);
LOG.debug("Complete compaction result: {} for region: {}, store {}", success,
regionInfo.getRegionNameAsString(),
store.getColumnFamilyDescriptor().getNameAsString());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index a916d0d..658819c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RawCellBuilder;
import org.apache.hadoop.hbase.RawCellBuilderFactory;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ServerType;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.CheckAndMutateResult;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
@@ -111,12 +113,12 @@ public class RegionCoprocessorHost
*
* Encapsulation of the environment of each coprocessor
*/
- private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
+ public static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
implements RegionCoprocessorEnvironment {
- private Region region;
+ protected Region region;
ConcurrentMap<String, Object> sharedData;
- private final MetricRegistry metricRegistry;
- private final RegionServerServices services;
+ protected final MetricRegistry metricRegistry;
+ protected final RegionCoprocessorService services;
/**
* Constructor
@@ -125,7 +127,7 @@ public class RegionCoprocessorHost
*/
public RegionEnvironment(final RegionCoprocessor impl, final int priority,
final int seq, final Configuration conf, final Region region,
- final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
+ final RegionCoprocessorService services, final ConcurrentMap<String, Object> sharedData) {
super(impl, priority, seq, conf);
this.region = region;
this.sharedData = sharedData;
@@ -187,6 +189,11 @@ public class RegionCoprocessorHost
// We always do a DEEP_COPY only
return RawCellBuilderFactory.create();
}
+
+ @Override
+ public ServerType getServerType(){
+ return services.getServerType();
+ }
}
/**
@@ -246,9 +253,9 @@ public class RegionCoprocessorHost
}
/** The region server services */
- RegionServerServices rsServices;
+ protected RegionCoprocessorService rsServices;
/** The region */
- HRegion region;
+ protected HRegion region;
/**
* Constructor
@@ -257,7 +264,7 @@ public class RegionCoprocessorHost
* @param conf the configuration
*/
public RegionCoprocessorHost(final HRegion region,
- final RegionServerServices rsServices, final Configuration conf) {
+ final RegionCoprocessorService rsServices, final Configuration conf) {
super(rsServices);
this.conf = conf;
this.rsServices = rsServices;
@@ -425,7 +432,7 @@ public class RegionCoprocessorHost
// If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)?
new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region,
- rsServices, classData):
+ (RegionServerServices) rsServices, classData):
new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 1f51fe8..a4ceab0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.locking.EntityLock;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorService;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -57,8 +58,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* the code base.
*/
@InterfaceAudience.Private
-public interface RegionServerServices
- extends Server, MutableOnlineRegions, FavoredNodesForRegion, ThroughputControllerService {
+public interface RegionServerServices extends Server, MutableOnlineRegions, FavoredNodesForRegion,
+ ThroughputControllerService, RegionCoprocessorService {
/**
* @return the WAL for a particular region. Pass null for getting the default (common) WAL
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index c7e172a..b692ec6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -397,4 +397,9 @@ public class MockRegionServerServices implements RegionServerServices {
boolean major, int priority) {
return false;
}
+
+ @Override
+ public ServerType getServerType() {
+ return ServerType.RegionServer;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index 18b07d9..b246d21 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -17,134 +17,36 @@
*/
package org.apache.hadoop.hbase.compactionserver;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import java.io.IOException;
import java.util.ArrayList;
-
import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.StartMiniClusterOption;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.compaction.CompactionOffloadManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
+
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
@Category({CompactionServerTests.class, MediumTests.class})
-public class TestCompactionServer {
+public class TestCompactionServer extends TestCompactionServerBase{
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompactionServer.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestCompactionServer.class);
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static Configuration CONF = TEST_UTIL.getConfiguration();
- private static HMaster MASTER;
- private static HCompactionServer COMPACTION_SERVER;
- private static ServerName COMPACTION_SERVER_NAME;
- private static TableName TABLENAME = TableName.valueOf("t");
- private static String FAMILY = "C";
- private static String COL ="c0";
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1).build());
- TEST_UTIL.getAdmin().switchCompactionOffload(true);
- MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
- TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
- COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
- .getCompactionServer();
- COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName();
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @Before
- public void before() throws Exception {
- TableDescriptor tableDescriptor =
- TableDescriptorBuilder.newBuilder(TABLENAME).setCompactionOffloadEnabled(true).build();
- TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY),
- TEST_UTIL.getConfiguration());
- TEST_UTIL.waitTableAvailable(TABLENAME);
- COMPACTION_SERVER.requestCount.reset();
- }
-
- @After
- public void after() throws IOException {
- TEST_UTIL.deleteTableIfAny(TABLENAME);
- }
-
- private void doPutRecord(int start, int end, boolean flush) throws Exception {
- Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
- for (int i = start; i <= end; i++) {
- Put p = new Put(Bytes.toBytes(i));
- p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), Bytes.toBytes(i));
- h.put(p);
- if (i % 100 == 0 && flush) {
- TEST_UTIL.flush(TABLENAME);
- }
- }
- h.close();
- }
-
- private void doFillRecord(int start, int end, byte[] value) throws Exception {
- Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
- for (int i = start; i <= end; i++) {
- Put p = new Put(Bytes.toBytes(i));
- p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), value);
- h.put(p);
- }
- h.close();
- }
-
- private void verifyRecord(int start, int end, boolean exist) throws Exception {
- Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
- for (int i = start; i <= end; i++) {
- Get get = new Get(Bytes.toBytes(i));
- Result r = h.get(get);
- if (exist) {
- assertArrayEquals(Bytes.toBytes(i), r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
- } else {
- assertNull(r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
- }
- }
- h.close();
- }
-
@Test
public void testCompaction() throws Exception {
TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServerBase.java
new file mode 100644
index 0000000..7a112e1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServerBase.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+public class TestCompactionServerBase {
+ protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ protected static Configuration CONF = TEST_UTIL.getConfiguration();
+ protected static HMaster MASTER;
+ protected static HCompactionServer COMPACTION_SERVER;
+ protected static ServerName COMPACTION_SERVER_NAME;
+ protected static TableName TABLENAME = TableName.valueOf("t");
+ protected static String FAMILY = "C";
+ protected static String COL ="c0";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1).build());
+ TEST_UTIL.getAdmin().switchCompactionOffload(true);
+ MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
+ TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+ COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
+ .getCompactionServer();
+ COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws Exception {
+ TableDescriptor tableDescriptor =
+ TableDescriptorBuilder.newBuilder(TABLENAME).setCompactionOffloadEnabled(true).build();
+ TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY),
+ TEST_UTIL.getConfiguration());
+ TEST_UTIL.waitTableAvailable(TABLENAME);
+ COMPACTION_SERVER.requestCount.reset();
+ }
+
+ @After
+ public void after() throws IOException {
+ TEST_UTIL.deleteTableIfAny(TABLENAME);
+ }
+
+ void doPutRecord(int start, int end, boolean flush) throws Exception {
+ Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+ for (int i = start; i <= end; i++) {
+ Put p = new Put(Bytes.toBytes(i));
+ p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), Bytes.toBytes(i));
+ h.put(p);
+ if (i % 100 == 0 && flush) {
+ TEST_UTIL.flush(TABLENAME);
+ }
+ }
+ h.close();
+ }
+
+ void doFillRecord(int start, int end, byte[] value) throws Exception {
+ Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+ for (int i = start; i <= end; i++) {
+ Put p = new Put(Bytes.toBytes(i));
+ p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), value);
+ h.put(p);
+ }
+ h.close();
+ }
+
+ void verifyRecord(int start, int end, boolean exist) throws Exception {
+ Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+ for (int i = start; i <= end; i++) {
+ Get get = new Get(Bytes.toBytes(i));
+ Result r = h.get(get);
+ if (exist) {
+ assertArrayEquals(Bytes.toBytes(i), r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
+ } else {
+ assertNull(r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
+ }
+ }
+ h.close();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionCoprocessorOnCompactionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionCoprocessorOnCompactionServer.java
new file mode 100644
index 0000000..f553602
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionCoprocessorOnCompactionServer.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerType;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileReader;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
+import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+
+/**
+ * use verify table test {@link RegionCoprocessorHost.RegionEnvironment#getConnection()} and record
+ * coprocessor method invoke {@link RegionObserver#preCompactSelection},
+ * {@link RegionObserver#postCompactSelection}, {@link RegionObserver#preCompact},
+ * {@link RegionObserver#preCompactScannerOpen}, {@link RegionObserver#preStoreFileReaderOpen},
+ * {@link RegionObserver#postStoreFileReaderOpen},
+ * {@link RegionObserver#postInstantiateDeleteTracker}
+ */
+@Category({ CompactionServerTests.class, MediumTests.class })
+public class TestRegionCoprocessorOnCompactionServer extends TestCompactionServerBase {
+ private static final TableName VERIFY_TABLE_NAME = TableName.valueOf("verifyTable");
+ protected static String RS = "RS";
+ protected static String CS = "CS";
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRegionCoprocessorOnCompactionServer.class);
+ private static final Set<String> compactionCoprocessor = ImmutableSet.of("preCompactSelection",
+ "postCompactSelection", "preCompactScannerOpen", "preCompact", "preStoreFileReaderOpen",
+ "postStoreFileReaderOpen", "postInstantiateDeleteTracker");
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionCoprocessorOnCompactionServer.class);
+
+ private static void recordCoprocessorCall(ObserverContext<RegionCoprocessorEnvironment> c,
+ String methodName) throws IOException {
+ byte[] col = c.getEnvironment().getServerName().equals(COMPACTION_SERVER_NAME) ? CS.getBytes()
+ : RS.getBytes();
+ Put put = new Put(methodName.getBytes());
+ put.addColumn(FAMILY.getBytes(), col, Bytes.toBytes(1));
+ // Here user could choose different implementations for different context
+ if (c.getEnvironment().getServerType() == ServerType.CompactionServer) {
+ c.getEnvironment().getConnection().getTable(VERIFY_TABLE_NAME).put(put);
+ } else if (c.getEnvironment().getServerType() == ServerType.RegionServer) {
+ c.getEnvironment().getOnlineRegions().getRegions(VERIFY_TABLE_NAME).get(0).put(put);
+ }
+ }
+
+ private void verifyRecord(byte[] row, byte[] col, boolean exist) throws Exception {
+ Table h = TEST_UTIL.getConnection().getTable(VERIFY_TABLE_NAME);
+ Get get = new Get(row);
+ Result r = h.get(get);
+ if (exist) {
+ assertEquals(1, Bytes.toInt(r.getValue(Bytes.toBytes(FAMILY), col)));
+ } else {
+ assertNull(r.getValue(Bytes.toBytes(FAMILY), col));
+ }
+ h.close();
+ }
+
+ public static class TestCompactionCoprocessor implements RegionObserver, RegionCoprocessor {
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker)
+ throws IOException {
+ recordCoprocessorCall(c, "preCompactSelection");
+ }
+
+ @Override
+ public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) {
+ try {
+ recordCoprocessorCall(c, "postCompactSelection");
+ } catch (IOException e) {
+ LOG.error("postCompactSelection catch IOException:", e);
+ }
+ }
+
+ @Override
+ public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
+ recordCoprocessorCall(c, "preCompactScannerOpen");
+ }
+
+ @Override
+ public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
+ recordCoprocessorCall(c, "preCompact");
+ return scanner;
+ }
+
+ @Override
+ public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
+ throws IOException {
+ recordCoprocessorCall(c, "postCompact");
+ }
+
+ @Override
+ public StoreFileReader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+ Reference r, StoreFileReader reader) throws IOException {
+ recordCoprocessorCall(ctx, "preStoreFileReaderOpen");
+ return reader;
+ }
+
+ @Override
+ public StoreFileReader postStoreFileReaderOpen(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, FileSystem fs, Path p,
+ FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, Reference r,
+ StoreFileReader reader) throws IOException {
+ recordCoprocessorCall(ctx, "postStoreFileReaderOpen");
+ return reader;
+ }
+
+ @Override
+ public DeleteTracker postInstantiateDeleteTracker(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
+ throws IOException {
+ recordCoprocessorCall(ctx, "postInstantiateDeleteTracker");
+ return delTracker;
+ }
+
+ }
+
+ @CoreCoprocessor
+ public static class TestCompactionCoreCoprocessor extends TestCompactionCoprocessor {
+ }
+
+ public static class TestCoprocessorNotCompactionRelated
+ implements RegionObserver, RegionCoprocessor {
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
+ recordCoprocessorCall(c, "preOpen");
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(VERIFY_TABLE_NAME).build();
+ TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY),
+ TEST_UTIL.getConfiguration());
+ TEST_UTIL.waitTableAvailable(VERIFY_TABLE_NAME);
+ }
+
+ @After
+ public void after() throws IOException {
+ super.after();
+ TEST_UTIL.deleteTableIfAny(VERIFY_TABLE_NAME);
+ }
+
+ /**
+ * Test coprocessor related compaction will be load on compaction server, but postCompact be
+ * executed on region server.
+ */
+ @Test
+ public void testLoadCoprocessor() throws Exception {
+ TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+ ColumnFamilyDescriptor cfd =
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).build();
+ TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
+ .setColumnFamily(cfd).setCompactionOffloadEnabled(true)
+ .setCoprocessor(TestCompactionCoprocessor.class.getName()).build();
+ TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
+ TEST_UTIL.waitTableAvailable(TABLENAME);
+ doPutRecord(1, 1000, true);
+ TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+ TEST_UTIL.compact(TABLENAME, true);
+ Thread.sleep(5000);
+ TEST_UTIL.waitFor(60000,
+ () -> COMPACTION_SERVER.requestCount.sum() > 0 && COMPACTION_SERVER.compactionThreadManager
+ .getRunningCompactionTasks().values().size() == 0);
+ for (String methodName : compactionCoprocessor) {
+ verifyRecord(methodName.getBytes(), CS.getBytes(), true);
+ }
+ verifyRecord("postCompact".getBytes(), CS.getBytes(), false);
+ verifyRecord("postCompact".getBytes(), RS.getBytes(), true);
+ }
+
+ /**
+ * Test core coprocessor will not be load on compaction server
+ */
+ @Test
+ public void testNotLoadCoreCoprocessor() throws Exception {
+ TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+ ColumnFamilyDescriptor cfd =
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).build();
+ TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
+ .setColumnFamily(cfd).setCompactionOffloadEnabled(true)
+ .setCoprocessor(TestCompactionCoreCoprocessor.class.getName()).build();
+ TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
+ TEST_UTIL.waitTableAvailable(TABLENAME);
+ doPutRecord(1, 1000, true);
+ TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+ TEST_UTIL.compact(TABLENAME, true);
+ Thread.sleep(5000);
+ TEST_UTIL.waitFor(60000,
+ () -> COMPACTION_SERVER.requestCount.sum() > 0 && COMPACTION_SERVER.compactionThreadManager
+ .getRunningCompactionTasks().values().size() == 0);
+ for (String methodName : compactionCoprocessor) {
+ verifyRecord(methodName.getBytes(), CS.getBytes(), false);
+ }
+ verifyRecord("postCompact".getBytes(), CS.getBytes(), false);
+ verifyRecord("postCompact".getBytes(), RS.getBytes(), true);
+ }
+
+ /**
+ * Test coprocessor not compaction related will not be load on compaction server
+ */
+ @Test
+ public void testNotLoadCompactionNotRelatedCoprocessor() throws Exception {
+ TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+ ColumnFamilyDescriptor cfd =
+ ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).build();
+ TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
+ .setColumnFamily(cfd).setCompactionOffloadEnabled(true)
+ .setCoprocessor(TestCoprocessorNotCompactionRelated.class.getName()).build();
+ TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
+ TEST_UTIL.waitTableAvailable(TABLENAME);
+ doPutRecord(1, 1000, true);
+ TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+ TEST_UTIL.compact(TABLENAME, true);
+ Thread.sleep(5000);
+ TEST_UTIL.waitFor(60000,
+ () -> COMPACTION_SERVER.requestCount.sum() > 0 && COMPACTION_SERVER.compactionThreadManager
+ .getRunningCompactionTasks().values().size() == 0);
+ verifyRecord("preOpen".getBytes(), CS.getBytes(), false);
+ verifyRecord("preOpen".getBytes(), RS.getBytes(), true);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index db4a5eb..43a7cd6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ServerType;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -777,4 +778,10 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
boolean major, int priority) {
return false;
}
+
+ @Override
+ public ServerType getServerType() {
+ return ServerType.RegionServer;
+ }
+
}