You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ni...@apache.org on 2021/08/21 02:23:11 UTC

[hbase] branch HBASE-25714 updated: HBASE-26089 Support RegionCoprocessor on CompactionServer (#3580)

This is an automated email from the ASF dual-hosted git repository.

niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-25714 by this push:
     new 20b7c8b  HBASE-26089 Support RegionCoprocessor on CompactionServer (#3580)
20b7c8b is described below

commit 20b7c8b286cf43d14928687bbbb4f510ff5f96d6
Author: niuyulin <yu...@gmail.com>
AuthorDate: Sat Aug 21 10:22:26 2021 +0800

    HBASE-26089 Support RegionCoprocessor on CompactionServer (#3580)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../org/apache/hadoop/hbase/AbstractServer.java    |  18 +
 .../java/org/apache/hadoop/hbase/ServerType.java   |  29 +
 .../compactionserver/CompactionThreadManager.java  |  17 +-
 .../hbase/compactionserver/HCompactionServer.java  |  23 +-
 .../RegionCompactionCoprocessorHost.java           | 773 +++++++++++++++++++++
 .../hadoop/hbase/coprocessor/CoprocessorHost.java  |  16 +-
 .../coprocessor/RegionCoprocessorEnvironment.java  |  12 +
 .../coprocessor/RegionCoprocessorService.java      |  33 +
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  57 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |  20 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   4 +-
 .../hbase/regionserver/RegionCoprocessorHost.java  |  25 +-
 .../hbase/regionserver/RegionServerServices.java   |   5 +-
 .../hadoop/hbase/MockRegionServerServices.java     |   5 +
 .../compactionserver/TestCompactionServer.java     | 102 +--
 .../compactionserver/TestCompactionServerBase.java | 119 ++++
 .../TestRegionCoprocessorOnCompactionServer.java   | 305 ++++++++
 .../hadoop/hbase/master/MockRegionServer.java      |   7 +
 18 files changed, 1420 insertions(+), 150 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
index 3e64c85..e59d8b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/AbstractServer.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase;
 
+import static org.apache.hadoop.hbase.compactionserver.HCompactionServer.COMPACTIONSERVER;
+import static org.apache.hadoop.hbase.master.HMaster.MASTER;
+import static org.apache.hadoop.hbase.regionserver.HRegionServer.REGIONSERVER;
+
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -431,4 +435,18 @@ public abstract class AbstractServer extends Thread implements Server {
   protected abstract AbstractRpcServices getRpcService();
 
   protected abstract String getProcessName();
+
+  public ServerType getServerType() {
+    String processName = getProcessName();
+    if (processName.equals(MASTER)) {
+      return ServerType.Master;
+    }
+    if (processName.equals(REGIONSERVER)) {
+      return ServerType.RegionServer;
+    }
+    if (processName.equals(COMPACTIONSERVER)) {
+      return ServerType.CompactionServer;
+    }
+    return ServerType.ReplicationServer;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerType.java
new file mode 100644
index 0000000..dbd8269
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Enum describing Server Type
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+public enum ServerType {
+  Master, RegionServer, CompactionServer, ReplicationServer
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
index f5fcfe2..9d67e29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -240,7 +239,7 @@ public class CompactionThreadManager implements ThroughputControllerService {
     excludeFiles.addAll(compactedFiles);
     // Convert files names to store files
     status.setStatus("Convert current compacting and compacted files to store files");
-    List<HStoreFile> excludeStoreFiles = getExcludedStoreFiles(store, excludeFiles);
+    List<HStoreFile> excludeStoreFiles = store.getStoreFilesBaseOnFileNames(excludeFiles);
     LOG.info(
       "Start select store: {}, excludeFileNames: {}, excluded: {}, compacting: {}, compacted: {}",
       logStr, excludeFiles.size(), excludeStoreFiles.size(), compactingFiles.size(),
@@ -354,23 +353,11 @@ public class CompactionThreadManager implements ThroughputControllerService {
     }
   }
 
-  private List<HStoreFile> getExcludedStoreFiles(HStore store, Set<String> excludeFileNames) {
-    Collection<HStoreFile> storefiles = store.getStorefiles();
-    List<HStoreFile> storeFiles = new ArrayList<>();
-    for (HStoreFile storefile : storefiles) {
-      String name = storefile.getPath().getName();
-      if (excludeFileNames.contains(name)) {
-        storeFiles.add(storefile);
-      }
-    }
-    return storeFiles;
-  }
-
   private HStore getStore(final Configuration conf, final FileSystem fs, final Path rootDir,
       final TableDescriptor htd, final RegionInfo hri, final String familyName) throws IOException {
     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs,
         CommonFSUtils.getTableDir(rootDir, htd.getTableName()), hri);
-    HRegion region = new HRegion(regionFs, null, conf, htd, null);
+    HRegion region = new HRegion(regionFs, conf, htd, server);
     ColumnFamilyDescriptor columnFamilyDescriptor = htd.getColumnFamily(Bytes.toBytes(familyName));
     HStore store;
     if (columnFamilyDescriptor.isMobEnabled()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
index f78e58a..578925f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,9 +29,12 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorService;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.security.SecurityConstants;
 import org.apache.hadoop.hbase.security.Superusers;
@@ -54,7 +58,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusP
 import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos.CompactionServerStatusService;
 
 @InterfaceAudience.Private
-public class HCompactionServer extends AbstractServer {
+public class HCompactionServer extends AbstractServer implements RegionCoprocessorService {
 
   /** compaction server process name */
   public static final String COMPACTIONSERVER = "compactionserver";
@@ -312,4 +316,21 @@ public class HCompactionServer extends AbstractServer {
     }
   }
 
+  @Override
+  public HRegion getRegion(final String encodedRegionName) {
+    throw new UnsupportedOperationException(
+        "Method getRegion is not supported in HCompactionServer");
+  }
+
+  @Override
+  public List<HRegion> getRegions(TableName tableName) {
+    throw new UnsupportedOperationException(
+        "Method getRegions is not supported in HCompactionServer");
+  }
+
+  @Override
+  public List<HRegion> getRegions() {
+    throw new UnsupportedOperationException(
+        "Method getRegions is not supported in HCompactionServer");
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/RegionCompactionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/RegionCompactionCoprocessorHost.java
new file mode 100644
index 0000000..4057c59
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/RegionCompactionCoprocessorHost.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.lang.reflect.Method;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.RawCellBuilder;
+import org.apache.hadoop.hbase.RawCellBuilderFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SharedConnection;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorService;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.OnlineRegions;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.Region.Operation;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.Service;
+
+/**
+ * 1.Inherited from {@link RegionCoprocessorHost}, only be used on CompactionServer. This host only
+ * load coprocessor involves compaction.
+ * 2.Other methods of the host, like preFlush,postFlush,prePut, postPut, etc will be not supported.
+ * 3.Four methods: preOpen, postOpen, preClose, postClose are overridden as blank implementations.
+ * 4.The methods preCompactSelection, postCompactSelection, preCompactScannerOpen, preCompact,
+ * postCompact, preStoreFileReaderOpen, postStoreFileReaderOpen, postInstantiateDeleteTracker
+ * will be retained.
+ */
+@InterfaceAudience.Private
+public class RegionCompactionCoprocessorHost extends RegionCoprocessorHost {
+  private static final Logger LOG = LoggerFactory.getLogger(RegionCompactionCoprocessorHost.class);
+  // postCompact will be executed on HRegionServer, so we don't check here
+  private static final Set<String> compactionCoprocessor = ImmutableSet.of("preCompactSelection",
+    "postCompactSelection", "preCompactScannerOpen", "preCompact", "preStoreFileReaderOpen",
+    "postStoreFileReaderOpen", "postInstantiateDeleteTracker");
+
+  private boolean IsCompactionRelatedCoprocessor(Class<?> cpClass) {
+    while (cpClass != null) {
+      for (Method method : cpClass.getDeclaredMethods()) {
+        if (compactionCoprocessor.contains(method.getName())) {
+          return true;
+        }
+      }
+      cpClass = cpClass.getSuperclass();
+    }
+    return false;
+  }
+
+  /**
+   * The environment will only be used on compactionServer for NotCoreCoprocessor, and the method
+   * getRegion, getOnlineRegions, getSharedData will not be supported.
+   */
+  private static class RegionCompactionEnvironment extends RegionCoprocessorHost.RegionEnvironment {
+
+    /**
+     * Constructor
+     * @param impl the coprocessor instance
+     * @param priority chaining priority
+     */
+    public RegionCompactionEnvironment(final RegionCoprocessor impl, final int priority,
+        final int seq, final Configuration conf, final Region region,
+        final RegionCoprocessorService services) {
+      super(impl, priority, seq, conf, region, services, null);
+    }
+
+    /** @return the region */
+    @Override
+    public Region getRegion() {
+      throw new UnsupportedOperationException(
+          "Method getRegion is not supported when loaded CP on compactionServer");
+    }
+
+    @Override
+    public OnlineRegions getOnlineRegions() {
+      throw new UnsupportedOperationException(
+        "Method getOnlineRegions is not supported when loaded CP on compactionServer");
+    }
+
+    @Override
+    public Connection getConnection() {
+      // Mocks may have services as null at test time.
+      return services != null ? new SharedConnection(services.getConnection()) : null;
+    }
+
+    @Override
+    public Connection createConnection(Configuration conf) throws IOException {
+      return services != null ? this.services.createConnection(conf) : null;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return services != null? services.getServerName(): null;
+    }
+
+    @Override
+    public void shutdown() {
+      super.shutdown();
+      MetricsCoprocessor.removeRegistry(this.metricRegistry);
+    }
+
+    @Override
+    public ConcurrentMap<String, Object> getSharedData() {
+      throw new UnsupportedOperationException(
+        "Method getSharedData is not supported when loaded CP on compactionServer");
+    }
+
+    @Override
+    public RegionInfo getRegionInfo() {
+      return region.getRegionInfo();
+    }
+
+    @Override
+    public MetricRegistry getMetricRegistryForRegionServer() {
+      return metricRegistry;
+    }
+
+    @Override
+    public RawCellBuilder getCellBuilder() {
+      // We always do a DEEP_COPY only
+      return RawCellBuilderFactory.create();
+    }
+  }
+
+  /**
+   * Constructor
+   * @param region the region
+   * @param coprocessorService interface to available RegionServer/CompactionServer functionality
+   * @param conf the configuration
+   */
+  public RegionCompactionCoprocessorHost(final HRegion region,
+      final RegionCoprocessorService coprocessorService, final Configuration conf) {
+    super(region, coprocessorService, conf);
+  }
+
+  @Override
+  public RegionCompactionEnvironment createEnvironment(RegionCoprocessor instance, int priority,
+      int seq, Configuration conf) {
+    if (instance.getClass().isAnnotationPresent(CoreCoprocessor.class)) {
+      LOG.info("skip load core coprocessor {} on CompactionServer", instance.getClass().getName());
+      return null;
+    }
+    if (!IsCompactionRelatedCoprocessor(instance.getClass())) {
+      LOG.info("skip load compaction no-related coprocessor {} on CompactionServer",
+        instance.getClass().getName());
+      return null;
+    }
+    // If coprocessor exposes any services, register them.
+    for (Service service : instance.getServices()) {
+      region.registerService(service);
+    }
+    return new RegionCompactionEnvironment(instance, priority, seq, conf, region, rsServices);
+  }
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // Observer operations
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Invoked before a region open.
+   * For invoke compact, we will open fake region on compaction server.
+   * So implement preOpen as blank
+   *
+   */
+  public void preOpen() throws IOException {
+  }
+
+  /**
+   * Invoked after a region open
+   * For invoke compact, we will open fake region on compaction server.
+   * So implement postOpen as blank
+   */
+  public void postOpen() {
+  }
+
+  /**
+   * Invoked before a region is closed
+   * For invoke compact, we will open fake region on compaction server.
+   * So implement preClose as blank
+   */
+  public void preClose(final boolean abortRequested) throws IOException {
+  }
+
+  /**
+   * Invoked after a region is closed
+   * For invoke compact, we will open fake region on compaction server.
+   * So implement postClose as blank
+   */
+  public void postClose(final boolean abortRequested) {
+  }
+
+  /**
+   * Invoked before create StoreScanner for flush.
+   */
+  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preFlushScannerOpen is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Invoked before a memstore flush
+   * @return Scanner to use (cannot be null!)
+   * @throws IOException
+   */
+  public InternalScanner preFlush(HStore store, InternalScanner scanner,
+      FlushLifeCycleTracker tracker) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preFlush is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Invoked before a memstore flush
+   * @throws IOException
+   */
+  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preFlush is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Invoked after a memstore flush
+   * @throws IOException
+   */
+  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postFlush is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Invoked before in memory compaction.
+   */
+  public void preMemStoreCompaction(HStore store) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preMemStoreCompaction is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Invoked before create StoreScanner for in memory compaction.
+   */
+  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
+    throw new UnsupportedOperationException("Method preMemStoreCompactionCompactScannerOpen "
+        + "is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Invoked before compacting memstore.
+   */
+  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preMemStoreCompactionCompact is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Invoked after in memory compaction.
+   */
+  public void postMemStoreCompaction(HStore store) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postMemStoreCompaction is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Invoked after a memstore flush
+   * @throws IOException
+   */
+  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postFlush is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param get the Get request
+   * @param results What to return if return is true/'bypass'.
+   * @return true if default processing should be bypassed.
+   * @exception IOException Exception
+   */
+  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preGet is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param get the Get request
+   * @param results the result set
+   * @exception IOException Exception
+   */
+  public void postGet(final Get get, final List<Cell> results) throws IOException {
+    throw new UnsupportedOperationException(
+        "Method postGet is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param get the Get request
+   * @return true or false to return to client if bypassing normal operation, or null otherwise
+   * @exception IOException Exception
+   */
+  public Boolean preExists(final Get get) throws IOException {
+    throw new UnsupportedOperationException(
+        "Method preExists is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param get the Get request
+   * @param result the result returned by the region server
+   * @return the result to return to the client
+   * @exception IOException Exception
+   */
+  public boolean postExists(final Get get, boolean result)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postExists is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param put The Put object
+   * @param edit The WALEdit object.
+   * @return true if default processing should be bypassed
+   * @exception IOException Exception
+   */
+  public boolean prePut(final Put put, final WALEdit edit) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method prePut is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param mutation - the current mutation
+   * @param kv - the current cell
+   * @param byteNow - current timestamp in bytes
+   * @param get - the get that could be used
+   * Note that the get only does not specify the family and qualifier that should be used
+   * @return true if default processing should be bypassed
+   * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single
+   * Coprocessor for its needs only. Will be removed.
+   */
+  @Deprecated
+  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv,
+      final byte[] byteNow, final Get get) throws IOException {
+    throw new UnsupportedOperationException("Method prePrepareTimeStampForDeleteVersion is "
+        + "not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param put The Put object
+   * @param edit The WALEdit object.
+   * @exception IOException Exception
+   */
+  public void postPut(final Put put, final WALEdit edit) throws IOException {
+    throw new UnsupportedOperationException(
+        "Method postPut is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param delete The Delete object
+   * @param edit The WALEdit object.
+   * @return true if default processing should be bypassed
+   * @exception IOException Exception
+   */
+  public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preDelete is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param delete The Delete object
+   * @param edit The WALEdit object.
+   * @exception IOException Exception
+   */
+  public void postDelete(final Delete delete, final WALEdit edit) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postDelete is not supported when loaded CP on compactionServer");
+  }
+
+  public void preBatchMutate(
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preBatchMutate is not supported when loaded CP on compactionServer");
+  }
+
+  public void postBatchMutate(
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postBatchMutate is not supported when loaded CP on compactionServer");
+  }
+
+  public void postBatchMutateIndispensably(
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postBatchMutateIndispensably is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param checkAndMutate the CheckAndMutate object
+   * @return true or false to return to client if default processing should be bypassed, or null
+   *   otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate)
+    throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preCheckAndMutate is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param checkAndMutate the CheckAndMutate object
+   * @return true or false to return to client if default processing should be bypassed, or null
+   *   otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
+    throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preCheckAndMutateAfterRowLock is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param checkAndMutate the CheckAndMutate object
+   * @param result the result returned by the checkAndMutate
+   * @return true or false to return to client if default processing should be bypassed, or null
+   *   otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
+    CheckAndMutateResult result) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postCheckAndMutate is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param append append object
+   * @param edit The WALEdit object.
+   * @return result to return to client if default operation should be bypassed, null otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public Result preAppend(final Append append, final WALEdit edit) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preAppend is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param append append object
+   * @return result to return to client if default operation should be bypassed, null otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public Result preAppendAfterRowLock(final Append append) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preAppendAfterRowLock is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param increment increment object
+   * @param edit The WALEdit object.
+   * @return result to return to client if default operation should be bypassed, null otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preIncrement is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param increment increment object
+   * @return result to return to client if default operation should be bypassed, null otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preIncrementAfterRowLock is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param append Append object
+   * @param result the result returned by the append
+   * @param edit The WALEdit object.
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public Result postAppend(final Append append, final Result result, final WALEdit edit)
+    throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postAppend is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param increment increment object
+   * @param result the result returned by postIncrement
+   * @param edit The WALEdit object.
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public Result postIncrement(final Increment increment, Result result, final WALEdit edit)
+    throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postIncrement is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param scan the Scan specification
+   * @exception IOException Exception
+   */
+  public void preScannerOpen(final Scan scan) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preScannerOpen is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param scan the Scan specification
+   * @param s the scanner
+   * @return the scanner instance to use
+   * @exception IOException Exception
+   */
+  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
+    throw new UnsupportedOperationException(
+        "Method postScannerOpen is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param s the scanner
+   * @param results the result set returned by the region server
+   * @param limit the maximum number of results to return
+   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
+   * @exception IOException Exception
+   */
+  public Boolean preScannerNext(final InternalScanner s,
+      final List<Result> results, final int limit) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preScannerNext is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param s the scanner
+   * @param results the result set returned by the region server
+   * @param limit the maximum number of results to return
+   * @param hasMore
+   * @return 'has more' indication to give to client
+   * @exception IOException Exception
+   */
+  public boolean postScannerNext(final InternalScanner s,
+      final List<Result> results, final int limit, boolean hasMore)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postScannerNext is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * This will be called by the scan flow when the current scanned row is being filtered out by the
+   * filter.
+   * @param s the scanner
+   * @param curRowCell The cell in the current row which got filtered out
+   * @return whether more rows are available for the scanner or not
+   * @throws IOException
+   */
+  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postScannerFilterRow is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @param s the scanner
+   * @return true if default behavior should be bypassed, false otherwise
+   * @exception IOException Exception
+   */
+  public boolean preScannerClose(final InternalScanner s) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preScannerClose is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @exception IOException Exception
+   */
+  public void postScannerClose(final InternalScanner s) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postScannerClose is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Called before open store scanner for user scan.
+   */
+  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postScannerClose is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param info the RegionInfo for this region
+   * @param edits the file of recovered edits
+   */
+  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preReplayWALs is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param info the RegionInfo for this region
+   * @param edits the file of recovered edits
+   * @throws IOException Exception
+   */
+  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postReplayWALs is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * Supports Coprocessor 'bypass'.
+   * @return true if default behavior should be bypassed, false otherwise
+   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
+   * with something that doesn't expose IntefaceAudience.Private classes.
+   */
+  @Deprecated
+  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preWALRestore is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
+   * with something that doesn't expose IntefaceAudience.Private classes.
+   */
+  @Deprecated
+  public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postWALRestore is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param familyPaths pairs of { CF, file path } submitted for bulk load
+   */
+  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preBulkLoadHFile is not supported when loaded CP on compactionServer");
+  }
+
+  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
+      throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preCommitStoreFile is not supported when loaded CP on compactionServer");
+  }
+
+  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postCommitStoreFile is not supported when loaded CP on compactionServer");
+  }
+
+  /**
+   * @param familyPaths pairs of { CF, file path } submitted for bulk load
+   * @param map Map of CF to List of file paths for the final loaded files
+   * @throws IOException
+   */
+  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
+      Map<byte[], List<Path>> map) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postBulkLoadHFile is not supported when loaded CP on compactionServer");
+  }
+
+  public void postStartRegionOperation(final Operation op) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postStartRegionOperation is not supported when loaded CP on compactionServer");
+  }
+
+  public void postCloseRegionOperation(final Operation op) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postCloseRegionOperation is not supported when loaded CP on compactionServer");
+  }
+
+  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
+      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postIncrementBeforeWAL is not supported when loaded CP on compactionServer");
+  }
+
+  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
+      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postAppendBeforeWAL is not supported when loaded CP on compactionServer");
+  }
+
+  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preWALAppend is not supported when loaded CP on compactionServer");
+  }
+
+  public Message preEndpointInvocation(final Service service, final String methodName,
+      Message request) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preEndpointInvocation is not supported when loaded CP on compactionServer");
+  }
+
+  public void postEndpointInvocation(final Service service, final String methodName,
+      final Message request, final Message.Builder responseBuilder) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method postEndpointInvocation is not supported when loaded CP on compactionServer");
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // BulkLoadObserver hooks
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  public void prePrepareBulkLoad(User user) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method prePrepareBulkLoad is not supported when loaded CP on compactionServer");
+  }
+
+  public void preCleanupBulkLoad(User user) throws IOException {
+    throw new UnsupportedOperationException(
+      "Method preCleanupBulkLoad is not supported when loaded CP on compactionServer");
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index 4425076..414c6e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -255,7 +255,9 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
   public void load(Class<? extends C> implClass, int priority, Configuration conf)
       throws IOException {
     E env = checkAndLoadInstance(implClass, priority, conf);
-    coprocEnvironments.add(env);
+    if (env != null) {
+      coprocEnvironments.add(env);
+    }
   }
 
   /**
@@ -279,11 +281,13 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
     }
     // create the environment
     E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf);
-    assert env instanceof BaseEnvironment;
-    ((BaseEnvironment<C>) env).startup();
-    // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
-    // if server (master or regionserver) aborts.
-    coprocessorNames.add(implClass.getName());
+    if (env != null) {
+      assert env instanceof BaseEnvironment;
+      ((BaseEnvironment<C>) env).startup();
+      // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
+      // if server (master or regionserver) aborts.
+      coprocessorNames.add(implClass.getName());
+    }
     return env;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
index 84e6d25..bb72559 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.RawCellBuilder;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ServerType;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.metrics.MetricRegistry;
@@ -130,4 +131,15 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment<Reg
    * @return the RawCellBuilder
    */
   RawCellBuilder getCellBuilder();
+
+  /**
+   * Provide server type to let users know the current context, it is on region server or compaction
+   * server or replication server, then user could choose different implementations.For example, on
+   * compaction server, you can not get the region instance, so if you want to get something from a
+   * region, maybe you should go with getConnection(), to communicate with the actual region
+   * instance through rpc. But if you are on region server, then you can just use getRegion or
+   * getOnlineRegions to get the region instance directly
+   */
+  ServerType getServerType();
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorService.java
new file mode 100644
index 0000000..1a1616b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorService.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.AbstractServer;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerType;
+import org.apache.hadoop.hbase.regionserver.OnlineRegions;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface RegionCoprocessorService extends Server, OnlineRegions {
+  /**
+   * The method is used by {@link RegionCoprocessorEnvironment#getServerType()}. HRegionServer and
+   * HCompactionServer extends {@link AbstractServer#getServerType()} to implement this method.
+   */
+  ServerType getServerType();
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9da7f7c..5985160 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -115,6 +115,8 @@ import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.compactionserver.HCompactionServer;
+import org.apache.hadoop.hbase.compactionserver.RegionCompactionCoprocessorHost;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -418,7 +420,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private final CellComparator cellComparator;
 
   /**
-   * @return The smallest mvcc readPoint across all the scanners in this
+   * Get the smallest mvcc readPoint across all the scanners in this
    * region. Writes older than this readPoint, are included in every
    * read operation.
    */
@@ -731,10 +733,53 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   @Deprecated
   public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
-      final Configuration confParam, final RegionInfo regionInfo,
-      final TableDescriptor htd, final RegionServerServices rsServices) {
-    this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
-      wal, confParam, htd, rsServices);
+      final Configuration confParam, final RegionInfo regionInfo, final TableDescriptor htd,
+      final RegionServerServices rsServices) {
+    this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo), wal, confParam, htd,
+        rsServices);
+  }
+
+  /**
+   * HRegion constructor. This constructor should only be used for compaction offload.
+   * {@link org.apache.hadoop.hbase.compactionserver.CompactionThreadManager#getStore }
+   */
+  public HRegion(final HRegionFileSystem fs, final Configuration confParam,
+      final TableDescriptor htd, final HCompactionServer csServices) {
+    if (htd == null) {
+      throw new IllegalArgumentException("Need table descriptor");
+    }
+    if (confParam instanceof CompoundConfiguration) {
+      throw new IllegalArgumentException("Need original base configuration");
+    }
+    this.fs = fs;
+    this.baseConf = confParam;
+    this.conf = new CompoundConfiguration().add(confParam).addBytesMap(htd.getValues());
+    this.cellComparator = htd.isMetaTable()
+        || conf.getBoolean(USE_META_CELL_COMPARATOR, DEFAULT_USE_META_CELL_COMPARATOR)
+            ? MetaCellComparator.META_COMPARATOR
+            : CellComparatorImpl.COMPARATOR;
+    this.regionServicesForStores = new RegionServicesForStores(this, null);
+    this.htableDescriptor = htd;
+    this.scannerReadPoints = new ConcurrentHashMap<>();
+    this.mvcc = new MultiVersionConcurrencyControl(getRegionInfo().getShortNameToLog());
+    this.coprocessorHost = new RegionCompactionCoprocessorHost(this, csServices, conf);
+    // we must initialize these final variable below
+    this.isLoadingCfsOnDemandDefault = true;
+    this.wal = null;
+    this.timestampSlop = HConstants.LATEST_TIMESTAMP;
+    this.lock = null;
+    this.regionLockHolders = null;
+    this.metricsRegion = null;
+    this.metricsRegionWrapper = null;
+    this.regionDurability = null;
+    this.regionStatsEnabled = false;
+    this.storeHotnessProtector = null;
+    this.rowLockWaitDuration = DEFAULT_ROWLOCK_WAIT_DURATION;
+    this.busyWaitDuration = DEFAULT_BUSY_WAIT_DURATION;
+    this.maxCellSize = DEFAULT_MAX_CELL_SIZE;
+    this.miniBatchSize = DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE;
+    this.maxBusyWaitMultiplier = 2;
+    this.maxBusyWaitDuration = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
   }
 
   /**
@@ -5149,9 +5194,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   /**
    * Check the collection of families for valid timestamps
-   * @param familyMap
    * @param now current timestamp
-   * @throws FailedSanityCheckException
    */
   public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
       throws FailedSanityCheckException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index cacea25..ad28dc1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1541,8 +1541,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     }
   }
 
-  protected boolean completeCompaction(CompactionRequestImpl cr, List<String> filesToCompact,
-    User user, List<String> newFiles) throws IOException {
+  protected boolean completeCompaction(List<String> filesToCompact, User user,
+      List<String> newFiles) throws IOException {
     Collection<HStoreFile> selectedStoreFiles = new ArrayList<>();
     for (String selectedFile : filesToCompact) {
       HStoreFile storeFile = getStoreFileBasedOnFileName(selectedFile);
@@ -1559,6 +1559,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     for (String newFile : newFiles) {
       newFilePaths.add(new Path(storeTmpDir, newFile));
     }
+    CompactionRequestImpl cr = new CompactionRequestImpl(selectedStoreFiles);
+    cr.setIsMajor(getForceMajor(), getForceMajor());
     completeCompaction(cr, selectedStoreFiles, user, newFilePaths);
     return true;
   }
@@ -1574,17 +1576,19 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     return null;
   }
 
-  private HStoreFile getStoreFile(Path path) {
-    for (HStoreFile storefile : getStorefiles()) {
-      if (storefile.getPath().equals(path)) {
-        return storefile;
+  public List<HStoreFile> getStoreFilesBaseOnFileNames(Collection<String> fileNames) {
+    List<HStoreFile> storeFiles = new ArrayList<>();
+    for (HStoreFile storeFile : getStorefiles()) {
+      String name = storeFile.getPath().getName();
+      if (fileNames.contains(name)) {
+        storeFiles.add(storeFile);
       }
     }
-    return null;
+    return storeFiles;
   }
 
   private synchronized List<HStoreFile> completeCompaction(CompactionRequestImpl cr,
-    Collection<HStoreFile> filesToCompact, User user, List<Path> newFiles) throws IOException {
+      Collection<HStoreFile> filesToCompact, User user, List<Path> newFiles) throws IOException {
     // TODO check store contains files to compact
     // Do the steps necessary to complete the compaction.
     setStoragePolicyFromFileName(newFiles);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 3910ab4..53658ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3787,8 +3787,8 @@ public class RSRpcServices extends AbstractRpcServices implements
         List<String> newFiles = request.getNewFilesList().stream().collect(Collectors.toList());
         try {
           // TODO: If we could write HFile directly into the data directory, here the completion
-          //  will be easier
-          success = store.completeCompaction(null, selectedFiles, null, newFiles);
+          // will be easier
+          success = store.completeCompaction(selectedFiles, null, newFiles);
           LOG.debug("Complete compaction result: {} for region: {}, store {}", success,
             regionInfo.getRegionNameAsString(),
             store.getColumnFamilyDescriptor().getNameAsString());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index a916d0d..658819c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RawCellBuilder;
 import org.apache.hadoop.hbase.RawCellBuilderFactory;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ServerType;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.CheckAndMutate;
 import org.apache.hadoop.hbase.client.CheckAndMutateResult;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
@@ -111,12 +113,12 @@ public class RegionCoprocessorHost
    *
    * Encapsulation of the environment of each coprocessor
    */
-  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
+  public static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
       implements RegionCoprocessorEnvironment {
-    private Region region;
+    protected Region region;
     ConcurrentMap<String, Object> sharedData;
-    private final MetricRegistry metricRegistry;
-    private final RegionServerServices services;
+    protected final MetricRegistry metricRegistry;
+    protected final RegionCoprocessorService services;
 
     /**
      * Constructor
@@ -125,7 +127,7 @@ public class RegionCoprocessorHost
      */
     public RegionEnvironment(final RegionCoprocessor impl, final int priority,
         final int seq, final Configuration conf, final Region region,
-        final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
+        final RegionCoprocessorService services, final ConcurrentMap<String, Object> sharedData) {
       super(impl, priority, seq, conf);
       this.region = region;
       this.sharedData = sharedData;
@@ -187,6 +189,11 @@ public class RegionCoprocessorHost
       // We always do a DEEP_COPY only
       return RawCellBuilderFactory.create();
     }
+
+    @Override
+    public ServerType getServerType(){
+      return services.getServerType();
+    }
   }
 
   /**
@@ -246,9 +253,9 @@ public class RegionCoprocessorHost
   }
 
   /** The region server services */
-  RegionServerServices rsServices;
+  protected RegionCoprocessorService rsServices;
   /** The region */
-  HRegion region;
+  protected HRegion region;
 
   /**
    * Constructor
@@ -257,7 +264,7 @@ public class RegionCoprocessorHost
    * @param conf the configuration
    */
   public RegionCoprocessorHost(final HRegion region,
-      final RegionServerServices rsServices, final Configuration conf) {
+      final RegionCoprocessorService rsServices, final Configuration conf) {
     super(rsServices);
     this.conf = conf;
     this.rsServices = rsServices;
@@ -425,7 +432,7 @@ public class RegionCoprocessorHost
     // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
     return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)?
         new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region,
-            rsServices, classData):
+          (RegionServerServices) rsServices, classData):
         new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 1f51fe8..a4ceab0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.locking.EntityLock;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorService;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -57,8 +58,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
  * the code base.
  */
 @InterfaceAudience.Private
-public interface RegionServerServices
-    extends Server, MutableOnlineRegions, FavoredNodesForRegion, ThroughputControllerService {
+public interface RegionServerServices extends Server, MutableOnlineRegions, FavoredNodesForRegion,
+    ThroughputControllerService, RegionCoprocessorService {
 
   /**
    * @return the WAL for a particular region. Pass null for getting the default (common) WAL
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index c7e172a..b692ec6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -397,4 +397,9 @@ public class MockRegionServerServices implements RegionServerServices {
     boolean major, int priority) {
     return false;
   }
+
+  @Override
+  public ServerType getServerType() {
+    return ServerType.RegionServer;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index 18b07d9..b246d21 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -17,134 +17,36 @@
  */
 package org.apache.hadoop.hbase.compactionserver;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-import java.io.IOException;
 import java.util.ArrayList;
-
 import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.StartMiniClusterOption;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.compaction.CompactionOffloadManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
+
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 @Category({CompactionServerTests.class, MediumTests.class})
-public class TestCompactionServer {
+public class TestCompactionServer extends TestCompactionServerBase{
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestCompactionServer.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestCompactionServer.class);
-  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static Configuration CONF = TEST_UTIL.getConfiguration();
-  private static HMaster MASTER;
-  private static HCompactionServer COMPACTION_SERVER;
-  private static ServerName COMPACTION_SERVER_NAME;
-  private static TableName TABLENAME = TableName.valueOf("t");
-  private static String FAMILY = "C";
-  private static String COL ="c0";
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1).build());
-    TEST_UTIL.getAdmin().switchCompactionOffload(true);
-    MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
-    TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
-    COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
-      .getCompactionServer();
-    COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName();
-  }
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @Before
-  public void before() throws Exception {
-    TableDescriptor tableDescriptor =
-        TableDescriptorBuilder.newBuilder(TABLENAME).setCompactionOffloadEnabled(true).build();
-    TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY),
-      TEST_UTIL.getConfiguration());
-    TEST_UTIL.waitTableAvailable(TABLENAME);
-    COMPACTION_SERVER.requestCount.reset();
-  }
-
-  @After
-  public void after() throws IOException {
-    TEST_UTIL.deleteTableIfAny(TABLENAME);
-  }
-
-  private void doPutRecord(int start, int end, boolean flush) throws Exception {
-    Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
-    for (int i = start; i <= end; i++) {
-      Put p = new Put(Bytes.toBytes(i));
-      p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), Bytes.toBytes(i));
-      h.put(p);
-      if (i % 100 == 0 && flush) {
-        TEST_UTIL.flush(TABLENAME);
-      }
-    }
-    h.close();
-  }
-
-  private void doFillRecord(int start, int end, byte[] value) throws Exception {
-    Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
-    for (int i = start; i <= end; i++) {
-      Put p = new Put(Bytes.toBytes(i));
-      p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), value);
-      h.put(p);
-    }
-    h.close();
-  }
-
-  private void verifyRecord(int start, int end, boolean exist) throws Exception {
-    Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
-    for (int i = start; i <= end; i++) {
-      Get get = new Get(Bytes.toBytes(i));
-      Result r = h.get(get);
-      if (exist) {
-        assertArrayEquals(Bytes.toBytes(i), r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
-      } else {
-        assertNull(r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
-      }
-    }
-    h.close();
-  }
-
   @Test
   public void testCompaction() throws Exception {
     TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServerBase.java
new file mode 100644
index 0000000..7a112e1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServerBase.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+public class TestCompactionServerBase {
+  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected static Configuration CONF = TEST_UTIL.getConfiguration();
+  protected static HMaster MASTER;
+  protected static HCompactionServer COMPACTION_SERVER;
+  protected static ServerName COMPACTION_SERVER_NAME;
+  protected static TableName TABLENAME = TableName.valueOf("t");
+  protected static String FAMILY = "C";
+  protected static String COL ="c0";
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1).build());
+    TEST_UTIL.getAdmin().switchCompactionOffload(true);
+    MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+    COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
+      .getCompactionServer();
+    COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception {
+    TableDescriptor tableDescriptor =
+      TableDescriptorBuilder.newBuilder(TABLENAME).setCompactionOffloadEnabled(true).build();
+    TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY),
+      TEST_UTIL.getConfiguration());
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+    COMPACTION_SERVER.requestCount.reset();
+  }
+
+  @After
+  public void after() throws IOException {
+    TEST_UTIL.deleteTableIfAny(TABLENAME);
+  }
+
+  void doPutRecord(int start, int end, boolean flush) throws Exception {
+    Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+    for (int i = start; i <= end; i++) {
+      Put p = new Put(Bytes.toBytes(i));
+      p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), Bytes.toBytes(i));
+      h.put(p);
+      if (i % 100 == 0 && flush) {
+        TEST_UTIL.flush(TABLENAME);
+      }
+    }
+    h.close();
+  }
+
+  void doFillRecord(int start, int end, byte[] value) throws Exception {
+    Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+    for (int i = start; i <= end; i++) {
+      Put p = new Put(Bytes.toBytes(i));
+      p.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(COL), value);
+      h.put(p);
+    }
+    h.close();
+  }
+
+  void verifyRecord(int start, int end, boolean exist) throws Exception {
+    Table h = TEST_UTIL.getConnection().getTable(TABLENAME);
+    for (int i = start; i <= end; i++) {
+      Get get = new Get(Bytes.toBytes(i));
+      Result r = h.get(get);
+      if (exist) {
+        assertArrayEquals(Bytes.toBytes(i), r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
+      } else {
+        assertNull(r.getValue(Bytes.toBytes(FAMILY), Bytes.toBytes(COL)));
+      }
+    }
+    h.close();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionCoprocessorOnCompactionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionCoprocessorOnCompactionServer.java
new file mode 100644
index 0000000..f553602
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionCoprocessorOnCompactionServer.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerType;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileReader;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
+import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+
+/**
+ * use verify table test {@link RegionCoprocessorHost.RegionEnvironment#getConnection()} and record
+ * coprocessor method invoke {@link RegionObserver#preCompactSelection},
+ * {@link RegionObserver#postCompactSelection}, {@link RegionObserver#preCompact},
+ * {@link RegionObserver#preCompactScannerOpen}, {@link RegionObserver#preStoreFileReaderOpen},
+ * {@link RegionObserver#postStoreFileReaderOpen},
+ * {@link RegionObserver#postInstantiateDeleteTracker}
+ */
+@Category({ CompactionServerTests.class, MediumTests.class })
+public class TestRegionCoprocessorOnCompactionServer extends TestCompactionServerBase {
+  private static final TableName VERIFY_TABLE_NAME = TableName.valueOf("verifyTable");
+  protected static String RS = "RS";
+  protected static String CS = "CS";
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRegionCoprocessorOnCompactionServer.class);
+  private static final Set<String> compactionCoprocessor = ImmutableSet.of("preCompactSelection",
+    "postCompactSelection", "preCompactScannerOpen", "preCompact", "preStoreFileReaderOpen",
+    "postStoreFileReaderOpen", "postInstantiateDeleteTracker");
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRegionCoprocessorOnCompactionServer.class);
+
+  private static void recordCoprocessorCall(ObserverContext<RegionCoprocessorEnvironment> c,
+      String methodName) throws IOException {
+    byte[] col = c.getEnvironment().getServerName().equals(COMPACTION_SERVER_NAME) ? CS.getBytes()
+        : RS.getBytes();
+    Put put = new Put(methodName.getBytes());
+    put.addColumn(FAMILY.getBytes(), col, Bytes.toBytes(1));
+    // Here user could choose different implementations for different context
+    if (c.getEnvironment().getServerType() == ServerType.CompactionServer) {
+      c.getEnvironment().getConnection().getTable(VERIFY_TABLE_NAME).put(put);
+    } else if (c.getEnvironment().getServerType() == ServerType.RegionServer) {
+      c.getEnvironment().getOnlineRegions().getRegions(VERIFY_TABLE_NAME).get(0).put(put);
+    }
+  }
+
+  private void verifyRecord(byte[] row, byte[] col, boolean exist) throws Exception {
+    Table h = TEST_UTIL.getConnection().getTable(VERIFY_TABLE_NAME);
+    Get get = new Get(row);
+    Result r = h.get(get);
+    if (exist) {
+      assertEquals(1, Bytes.toInt(r.getValue(Bytes.toBytes(FAMILY), col)));
+    } else {
+      assertNull(r.getValue(Bytes.toBytes(FAMILY), col));
+    }
+    h.close();
+  }
+
+  public static class TestCompactionCoprocessor implements RegionObserver, RegionCoprocessor {
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker)
+        throws IOException {
+      recordCoprocessorCall(c, "preCompactSelection");
+    }
+
+    @Override
+    public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
+        CompactionRequest request) {
+      try {
+        recordCoprocessorCall(c, "postCompactSelection");
+      } catch (IOException e) {
+        LOG.error("postCompactSelection catch IOException:", e);
+      }
+    }
+
+    @Override
+    public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
+        CompactionRequest request) throws IOException {
+      recordCoprocessorCall(c, "preCompactScannerOpen");
+    }
+
+    @Override
+    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+        CompactionRequest request) throws IOException {
+      recordCoprocessorCall(c, "preCompact");
+      return scanner;
+    }
+
+    @Override
+    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
+        throws IOException {
+      recordCoprocessorCall(c, "postCompact");
+    }
+
+    @Override
+    public StoreFileReader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
+        FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+        Reference r, StoreFileReader reader) throws IOException {
+      recordCoprocessorCall(ctx, "preStoreFileReaderOpen");
+      return reader;
+    }
+
+    @Override
+    public StoreFileReader postStoreFileReaderOpen(
+        ObserverContext<RegionCoprocessorEnvironment> ctx, FileSystem fs, Path p,
+        FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, Reference r,
+        StoreFileReader reader) throws IOException {
+      recordCoprocessorCall(ctx, "postStoreFileReaderOpen");
+      return reader;
+    }
+
+    @Override
+    public DeleteTracker postInstantiateDeleteTracker(
+        ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
+        throws IOException {
+      recordCoprocessorCall(ctx, "postInstantiateDeleteTracker");
+      return delTracker;
+    }
+
+  }
+
+  @CoreCoprocessor
+  public static class TestCompactionCoreCoprocessor extends TestCompactionCoprocessor {
+  }
+
+  public static class TestCoprocessorNotCompactionRelated
+      implements RegionObserver, RegionCoprocessor {
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
+      recordCoprocessorCall(c, "preOpen");
+    }
+  }
+
+  @Before
+  public void before() throws Exception {
+    super.before();
+    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(VERIFY_TABLE_NAME).build();
+    TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY),
+      TEST_UTIL.getConfiguration());
+    TEST_UTIL.waitTableAvailable(VERIFY_TABLE_NAME);
+  }
+
+  @After
+  public void after() throws IOException {
+    super.after();
+    TEST_UTIL.deleteTableIfAny(VERIFY_TABLE_NAME);
+  }
+
+  /**
+   * Test coprocessor related compaction will be load on compaction server, but postCompact be
+   * executed on region server.
+   */
+  @Test
+  public void testLoadCoprocessor() throws Exception {
+    TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+    ColumnFamilyDescriptor cfd =
+        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).build();
+    TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
+        .setColumnFamily(cfd).setCompactionOffloadEnabled(true)
+        .setCoprocessor(TestCompactionCoprocessor.class.getName()).build();
+    TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+    doPutRecord(1, 1000, true);
+    TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+    TEST_UTIL.compact(TABLENAME, true);
+    Thread.sleep(5000);
+    TEST_UTIL.waitFor(60000,
+      () -> COMPACTION_SERVER.requestCount.sum() > 0 && COMPACTION_SERVER.compactionThreadManager
+          .getRunningCompactionTasks().values().size() == 0);
+    for (String methodName : compactionCoprocessor) {
+      verifyRecord(methodName.getBytes(), CS.getBytes(), true);
+    }
+    verifyRecord("postCompact".getBytes(), CS.getBytes(), false);
+    verifyRecord("postCompact".getBytes(), RS.getBytes(), true);
+  }
+
+  /**
+   * Test core coprocessor will not be load on compaction server
+   */
+  @Test
+  public void testNotLoadCoreCoprocessor() throws Exception {
+    TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+    ColumnFamilyDescriptor cfd =
+        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).build();
+    TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
+        .setColumnFamily(cfd).setCompactionOffloadEnabled(true)
+        .setCoprocessor(TestCompactionCoreCoprocessor.class.getName()).build();
+    TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+    doPutRecord(1, 1000, true);
+    TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+    TEST_UTIL.compact(TABLENAME, true);
+    Thread.sleep(5000);
+    TEST_UTIL.waitFor(60000,
+      () -> COMPACTION_SERVER.requestCount.sum() > 0 && COMPACTION_SERVER.compactionThreadManager
+          .getRunningCompactionTasks().values().size() == 0);
+    for (String methodName : compactionCoprocessor) {
+      verifyRecord(methodName.getBytes(), CS.getBytes(), false);
+    }
+    verifyRecord("postCompact".getBytes(), CS.getBytes(), false);
+    verifyRecord("postCompact".getBytes(), RS.getBytes(), true);
+  }
+
+  /**
+   * Test coprocessor not compaction related will not be load on compaction server
+   */
+  @Test
+  public void testNotLoadCompactionNotRelatedCoprocessor() throws Exception {
+    TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+    ColumnFamilyDescriptor cfd =
+        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).build();
+    TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
+        .setColumnFamily(cfd).setCompactionOffloadEnabled(true)
+        .setCoprocessor(TestCoprocessorNotCompactionRelated.class.getName()).build();
+    TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+    doPutRecord(1, 1000, true);
+    TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+    TEST_UTIL.compact(TABLENAME, true);
+    Thread.sleep(5000);
+    TEST_UTIL.waitFor(60000,
+      () -> COMPACTION_SERVER.requestCount.sum() > 0 && COMPACTION_SERVER.compactionThreadManager
+        .getRunningCompactionTasks().values().size() == 0);
+    verifyRecord("preOpen".getBytes(), CS.getBytes(), false);
+    verifyRecord("preOpen".getBytes(), RS.getBytes(), true);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index db4a5eb..43a7cd6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ServerType;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
@@ -777,4 +778,10 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
       boolean major, int priority) {
     return false;
   }
+
+  @Override
+  public ServerType getServerType() {
+    return ServerType.RegionServer;
+  }
+
 }