You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/09/27 19:45:34 UTC

[3/7] hbase git commit: HBASE-17732 Coprocessor Design Improvements

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
index 5cb87b5..3325ba3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
@@ -19,33 +19,29 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.Comparator;
-import java.util.List;
 
-import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
 import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
 import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.metrics.MetricRegistry;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
 @InterfaceStability.Evolving
 public class RegionServerCoprocessorHost extends
-    CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
+    CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
 
   private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
 
@@ -70,242 +66,149 @@ public class RegionServerCoprocessorHost extends
   }
 
   @Override
-  public RegionServerEnvironment createEnvironment(Class<?> implClass,
-      Coprocessor instance, int priority, int sequence, Configuration conf) {
-    return new RegionServerEnvironment(implClass, instance, priority,
-      sequence, conf, this.rsServices);
+  public RegionServerEnvironment createEnvironment(
+      RegionServerCoprocessor instance, int priority, int sequence, Configuration conf) {
+    return new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices);
+  }
+
+  @Override
+  public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass)
+      throws InstantiationException, IllegalAccessException {
+    if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) {
+      return (RegionServerCoprocessor)implClass.newInstance();
+    } else if (SingletonCoprocessorService.class.isAssignableFrom(implClass)) {
+      // For backward compatibility with old CoprocessorService impl which don't extend
+      // RegionCoprocessor.
+      return new CoprocessorServiceBackwardCompatiblity.RegionServerCoprocessorService(
+          (SingletonCoprocessorService)implClass.newInstance());
+    } else {
+      LOG.error(implClass.getName() + " is not of type RegionServerCoprocessor. Check the "
+          + "configuration " + CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
+      return null;
+    }
+  }
+
+  private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter =
+      RegionServerCoprocessor::getRegionServerObserver;
+
+  abstract class RegionServerObserverOperation extends
+      ObserverOperationWithoutResult<RegionServerObserver> {
+    public RegionServerObserverOperation() {
+      super(rsObserverGetter);
+    }
+
+    public RegionServerObserverOperation(User user) {
+      super(rsObserverGetter, user);
+    }
   }
 
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // RegionServerObserver operations
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
   public void preStop(String message, User user) throws IOException {
     // While stopping the region server all coprocessors method should be executed first then the
     // coprocessor should be cleaned up.
-    execShutdown(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+    execShutdown(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation(user) {
       @Override
-      public void call(RegionServerObserver oserver,
-          ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
-        oserver.preStopRegionServer(ctx);
+      public void call(RegionServerObserver observer) throws IOException {
+        observer.preStopRegionServer(this);
       }
+
       @Override
-      public void postEnvCall(RegionServerEnvironment env) {
+      public void postEnvCall() {
         // invoke coprocessor stop method
-        shutdown(env);
+        shutdown(this.getEnvironment());
       }
     });
   }
 
   public void preRollWALWriterRequest() throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
       @Override
-      public void call(RegionServerObserver oserver,
-          ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
-        oserver.preRollWALWriterRequest(ctx);
+      public void call(RegionServerObserver observer) throws IOException {
+        observer.preRollWALWriterRequest(this);
       }
     });
   }
 
   public void postRollWALWriterRequest() throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
       @Override
-      public void call(RegionServerObserver oserver,
-          ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
-        oserver.postRollWALWriterRequest(ctx);
+      public void call(RegionServerObserver observer) throws IOException {
+        observer.postRollWALWriterRequest(this);
       }
     });
   }
 
   public void preReplicateLogEntries()
       throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
       @Override
-      public void call(RegionServerObserver oserver,
-          ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
-        oserver.preReplicateLogEntries(ctx);
+      public void call(RegionServerObserver observer) throws IOException {
+        observer.preReplicateLogEntries(this);
       }
     });
   }
 
   public void postReplicateLogEntries()
       throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
       @Override
-      public void call(RegionServerObserver oserver,
-          ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
-        oserver.postReplicateLogEntries(ctx);
+      public void call(RegionServerObserver observer) throws IOException {
+        observer.postReplicateLogEntries(this);
       }
     });
   }
 
   public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
       throws IOException {
-    return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
-        : new CoprocessOperationWithResult<ReplicationEndpoint>() {
-          @Override
-          public void call(RegionServerObserver oserver,
-              ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
-            setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
-          }
-        });
+    return execOperationWithResult(endpoint, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
+            rsObserverGetter) {
+      @Override
+      public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
+        return observer.postCreateReplicationEndPoint(this, getResult());
+      }
+    });
   }
 
   public void preClearCompactionQueues() throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
       @Override
-      public void call(RegionServerObserver oserver,
-                       ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
-        oserver.preClearCompactionQueues(ctx);
+      public void call(RegionServerObserver observer) throws IOException {
+        observer.preClearCompactionQueues(this);
       }
     });
   }
 
   public void postClearCompactionQueues() throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
       @Override
-      public void call(RegionServerObserver oserver,
-                       ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
-        oserver.postClearCompactionQueues(ctx);
+      public void call(RegionServerObserver observer) throws IOException {
+        observer.postClearCompactionQueues(this);
       }
     });
   }
 
-  private <T> T execOperationWithResult(final T defaultValue,
-      final CoprocessOperationWithResult<T> ctx) throws IOException {
-    if (ctx == null)
-      return defaultValue;
-    ctx.setResult(defaultValue);
-    execOperation(ctx);
-    return ctx.getResult();
-  }
-
-  private static abstract class CoprocessorOperation
-      extends ObserverContext<RegionServerCoprocessorEnvironment> {
-    public CoprocessorOperation() {
-      this(RpcServer.getRequestUser());
-    }
-
-    public CoprocessorOperation(User user) {
-      super(user);
-    }
-
-    public abstract void call(RegionServerObserver oserver,
-        ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
-
-    public void postEnvCall(RegionServerEnvironment env) {
-    }
-  }
-
-  private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
-    private T result = null;
-
-    public void setResult(final T result) {
-      this.result = result;
-    }
-
-    public T getResult() {
-      return this.result;
-    }
-  }
-
-  private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
-    if (ctx == null) return false;
-    boolean bypass = false;
-    List<RegionServerEnvironment> envs = coprocessors.get();
-    for (int i = 0; i < envs.size(); i++) {
-      RegionServerEnvironment env = envs.get(i);
-      if (env.getInstance() instanceof RegionServerObserver) {
-        ctx.prepare(env);
-        Thread currentThread = Thread.currentThread();
-        ClassLoader cl = currentThread.getContextClassLoader();
-        try {
-          currentThread.setContextClassLoader(env.getClassLoader());
-          ctx.call((RegionServerObserver)env.getInstance(), ctx);
-        } catch (Throwable e) {
-          handleCoprocessorThrowable(env, e);
-        } finally {
-          currentThread.setContextClassLoader(cl);
-        }
-        bypass |= ctx.shouldBypass();
-        if (ctx.shouldComplete()) {
-          break;
-        }
-      }
-      ctx.postEnvCall(env);
-    }
-    return bypass;
-  }
-
-  /**
-   * RegionServer coprocessor classes can be configured in any order, based on that priority is set
-   * and chained in a sorted order. For preStop(), coprocessor methods are invoked in call() and
-   * environment is shutdown in postEnvCall(). <br>
-   * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors
-   * may remain shutdown if any exception occurs during next coprocessor execution which prevent
-   * RegionServer stop. (Refer:
-   * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a>
-   * @param ctx CoprocessorOperation
-   * @return true if bypaas coprocessor execution, false if not.
-   * @throws IOException
-   */
-  private boolean execShutdown(final CoprocessorOperation ctx) throws IOException {
-    if (ctx == null) return false;
-    boolean bypass = false;
-    List<RegionServerEnvironment> envs = coprocessors.get();
-    int envsSize = envs.size();
-    // Iterate the coprocessors and execute CoprocessorOperation's call()
-    for (int i = 0; i < envsSize; i++) {
-      RegionServerEnvironment env = envs.get(i);
-      if (env.getInstance() instanceof RegionServerObserver) {
-        ctx.prepare(env);
-        Thread currentThread = Thread.currentThread();
-        ClassLoader cl = currentThread.getContextClassLoader();
-        try {
-          currentThread.setContextClassLoader(env.getClassLoader());
-          ctx.call((RegionServerObserver) env.getInstance(), ctx);
-        } catch (Throwable e) {
-          handleCoprocessorThrowable(env, e);
-        } finally {
-          currentThread.setContextClassLoader(cl);
-        }
-        bypass |= ctx.shouldBypass();
-        if (ctx.shouldComplete()) {
-          break;
-        }
-      }
-    }
-
-    // Iterate the coprocessors and execute CoprocessorOperation's postEnvCall()
-    for (int i = 0; i < envsSize; i++) {
-      RegionServerEnvironment env = envs.get(i);
-      ctx.postEnvCall(env);
-    }
-    return bypass;
-  }
-
   /**
    * Coprocessor environment extension providing access to region server
    * related services.
    */
-  static class RegionServerEnvironment extends CoprocessorHost.Environment
+  private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
       implements RegionServerCoprocessorEnvironment {
     private final RegionServerServices regionServerServices;
     private final MetricRegistry metricRegistry;
 
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
         justification="Intentional; FB has trouble detecting isAssignableFrom")
-    public RegionServerEnvironment(final Class<?> implClass,
-        final Coprocessor impl, final int priority, final int seq,
-        final Configuration conf, final RegionServerServices services) {
+    public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority,
+        final int seq, final Configuration conf, final RegionServerServices services) {
       super(impl, priority, seq, conf);
       this.regionServerServices = services;
-      for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
-        Class<?> c = (Class<?>) itf;
-        if (SingletonCoprocessorService.class.isAssignableFrom(c)) {// FindBugs: BC_UNCONFIRMED_CAST
-          this.regionServerServices.registerService(
-            ((SingletonCoprocessorService) impl).getService());
-          break;
-        }
-      }
+      impl.getService().ifPresent(regionServerServices::registerService);
       this.metricRegistry =
-          MetricsCoprocessor.createRegistryForRSCoprocessor(implClass.getName());
+          MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
     }
 
     @Override
@@ -319,32 +222,9 @@ public class RegionServerCoprocessorHost extends
     }
 
     @Override
-    protected void shutdown() {
+    public void shutdown() {
       super.shutdown();
       MetricsCoprocessor.removeRegistry(metricRegistry);
     }
   }
-
-  /**
-   * Environment priority comparator. Coprocessors are chained in sorted
-   * order.
-   */
-  static class EnvironmentPriorityComparator implements
-      Comparator<CoprocessorEnvironment> {
-    @Override
-    public int compare(final CoprocessorEnvironment env1,
-        final CoprocessorEnvironment env2) {
-      if (env1.getPriority() < env2.getPriority()) {
-        return -1;
-      } else if (env1.getPriority() > env2.getPriority()) {
-        return 1;
-      }
-      if (env1.getLoadSequence() < env2.getLoadSequence()) {
-        return -1;
-      } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
-        return 1;
-      }
-      return 0;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index d2b8567..c7d0ead 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -138,38 +140,17 @@ public class SecureBulkLoadManager {
 
   public String prepareBulkLoad(final Region region, final PrepareBulkLoadRequest request)
       throws IOException {
-    List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
+    region.getCoprocessorHost().prePrepareBulkLoad(getActiveUser());
 
-    if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
-      ObserverContext<RegionCoprocessorEnvironment> ctx = new ObserverContext<>(getActiveUser());
-      ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
-          .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
-
-      for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
-        bulkLoadObserver.prePrepareBulkLoad(ctx);
-      }
-    }
-
-    String bulkToken =
-        createStagingDir(baseStagingDir, getActiveUser(), region.getTableDescriptor().getTableName())
-            .toString();
+    String bulkToken = createStagingDir(baseStagingDir, getActiveUser(),
+        region.getTableDescriptor().getTableName()).toString();
 
     return bulkToken;
   }
 
   public void cleanupBulkLoad(final Region region, final CleanupBulkLoadRequest request)
       throws IOException {
-    List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
-
-    if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
-      ObserverContext<RegionCoprocessorEnvironment> ctx = new ObserverContext<>(getActiveUser());
-      ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
-        .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
-
-      for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
-        bulkLoadObserver.preCleanupBulkLoad(ctx);
-      }
-    }
+    region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
 
     Path path = new Path(request.getBulkToken());
     if (!fs.delete(path, true)) {
@@ -275,13 +256,6 @@ public class SecureBulkLoadManager {
     return map;
   }
 
-  private List<BulkLoadObserver> getBulkLoadObservers(Region region) {
-    List<BulkLoadObserver> coprocessorList =
-        region.getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
-
-    return coprocessorList;
-  }
-
   private Path createStagingDir(Path baseDir,
                                 User user,
                                 TableName tableName) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
index b6d23bf..73ba776 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
@@ -21,22 +21,23 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
-import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.WALObserver;
 import org.apache.hadoop.hbase.metrics.MetricRegistry;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Implements the coprocessor environment and runtime support for coprocessors
@@ -44,12 +45,13 @@ import org.apache.hadoop.hbase.wal.WALKey;
  */
 @InterfaceAudience.Private
 public class WALCoprocessorHost
-    extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
+    extends CoprocessorHost<WALCoprocessor, WALCoprocessorEnvironment> {
+  private static final Log LOG = LogFactory.getLog(WALCoprocessorHost.class);
 
   /**
    * Encapsulation of the environment of each coprocessor
    */
-  static class WALEnvironment extends CoprocessorHost.Environment
+  static class WALEnvironment extends BaseEnvironment<WALCoprocessor>
     implements WALCoprocessorEnvironment {
 
     private final WAL wal;
@@ -63,19 +65,18 @@ public class WALCoprocessorHost
 
     /**
      * Constructor
-     * @param implClass - not used
      * @param impl the coprocessor instance
      * @param priority chaining priority
      * @param seq load sequence
      * @param conf configuration
      * @param wal WAL
      */
-    public WALEnvironment(Class<?> implClass, final Coprocessor impl,
-        final int priority, final int seq, final Configuration conf,
-        final WAL wal) {
+    private WALEnvironment(final WALCoprocessor impl, final int priority, final int seq,
+        final Configuration conf, final WAL wal) {
       super(impl, priority, seq, conf);
       this.wal = wal;
-      this.metricRegistry = MetricsCoprocessor.createRegistryForWALCoprocessor(implClass.getName());
+      this.metricRegistry = MetricsCoprocessor.createRegistryForWALCoprocessor(
+          impl.getClass().getName());
     }
 
     @Override
@@ -84,7 +85,7 @@ public class WALCoprocessorHost
     }
 
     @Override
-    protected void shutdown() {
+    public void shutdown() {
       super.shutdown();
       MetricsCoprocessor.removeRegistry(this.metricRegistry);
     }
@@ -111,13 +112,34 @@ public class WALCoprocessorHost
   }
 
   @Override
-  public WALEnvironment createEnvironment(final Class<?> implClass,
-      final Coprocessor instance, final int priority, final int seq,
-      final Configuration conf) {
-    return new WALEnvironment(implClass, instance, priority, seq, conf,
-        this.wal);
+  public WALEnvironment createEnvironment(final WALCoprocessor instance, final int priority,
+      final int seq, final Configuration conf) {
+    return new WALEnvironment(instance, priority, seq, conf, this.wal);
+  }
+
+  @Override
+  public WALCoprocessor checkAndGetInstance(Class<?> implClass)
+      throws InstantiationException, IllegalAccessException {
+    if (WALCoprocessor.class.isAssignableFrom(implClass)) {
+      return (WALCoprocessor)implClass.newInstance();
+    } else {
+      LOG.error(implClass.getName() + " is not of type WALCoprocessor. Check the "
+          + "configuration " + CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
+      return null;
+    }
   }
 
+  private ObserverGetter<WALCoprocessor, WALObserver> walObserverGetter =
+      WALCoprocessor::getWALObserver;
+
+  abstract class WALObserverOperation extends
+      ObserverOperationWithoutResult<WALObserver> {
+    public WALObserverOperation() {
+      super(walObserverGetter);
+    }
+  }
+
+
   /**
    * @param info
    * @param logKey
@@ -127,32 +149,13 @@ public class WALCoprocessorHost
    */
   public boolean preWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
       throws IOException {
-    boolean bypass = false;
-    if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass;
-    ObserverContext<WALCoprocessorEnvironment> ctx = null;
-    List<WALEnvironment> envs = coprocessors.get();
-    for (int i = 0; i < envs.size(); i++) {
-      WALEnvironment env = envs.get(i);
-      if (env.getInstance() instanceof WALObserver) {
-        final WALObserver observer = (WALObserver)env.getInstance();
-        ctx = ObserverContext.createAndPrepare(env, ctx);
-        Thread currentThread = Thread.currentThread();
-        ClassLoader cl = currentThread.getContextClassLoader();
-        try {
-          currentThread.setContextClassLoader(env.getClassLoader());
-          observer.preWALWrite(ctx, info, logKey, logEdit);
-        } catch (Throwable e) {
-          handleCoprocessorThrowable(env, e);
-        } finally {
-          currentThread.setContextClassLoader(cl);
-        }
-        bypass |= ctx.shouldBypass();
-        if (ctx.shouldComplete()) {
-          break;
-        }
+    return execOperationWithResult(false, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<WALObserver, Boolean>(walObserverGetter) {
+      @Override
+      public Boolean call(WALObserver oserver) throws IOException {
+        return oserver.preWALWrite(this, info, logKey, logEdit);
       }
-    }
-    return bypass;
+    });
   }
 
   /**
@@ -163,29 +166,12 @@ public class WALCoprocessorHost
    */
   public void postWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
       throws IOException {
-    if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
-    ObserverContext<WALCoprocessorEnvironment> ctx = null;
-    List<WALEnvironment> envs = coprocessors.get();
-    for (int i = 0; i < envs.size(); i++) {
-      WALEnvironment env = envs.get(i);
-      if (env.getInstance() instanceof WALObserver) {
-        final WALObserver observer = (WALObserver)env.getInstance();
-        ctx = ObserverContext.createAndPrepare(env, ctx);
-        Thread currentThread = Thread.currentThread();
-        ClassLoader cl = currentThread.getContextClassLoader();
-        try {
-          currentThread.setContextClassLoader(env.getClassLoader());
-          observer.postWALWrite(ctx, info, logKey, logEdit);
-        } catch (Throwable e) {
-          handleCoprocessorThrowable(env, e);
-        } finally {
-          currentThread.setContextClassLoader(cl);
-        }
-        if (ctx.shouldComplete()) {
-          break;
-        }
+    execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
+      @Override
+      protected void call(WALObserver observer) throws IOException {
+        observer.postWALWrite(this, info, logKey, logEdit);
       }
-    }
+    });
   }
 
   /**
@@ -194,29 +180,12 @@ public class WALCoprocessorHost
    * @param newPath the path of the wal we are going to create
    */
   public void preWALRoll(Path oldPath, Path newPath) throws IOException {
-    if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
-    ObserverContext<WALCoprocessorEnvironment> ctx = null;
-    List<WALEnvironment> envs = coprocessors.get();
-    for (int i = 0; i < envs.size(); i++) {
-      WALEnvironment env = envs.get(i);
-      if (env.getInstance() instanceof WALObserver) {
-        final WALObserver observer = (WALObserver)env.getInstance();
-        ctx = ObserverContext.createAndPrepare(env, ctx);
-        Thread currentThread = Thread.currentThread();
-        ClassLoader cl = currentThread.getContextClassLoader();
-        try {
-          currentThread.setContextClassLoader(env.getClassLoader());
-          observer.preWALRoll(ctx, oldPath, newPath);
-        } catch (Throwable e) {
-          handleCoprocessorThrowable(env, e);
-        } finally {
-          currentThread.setContextClassLoader(cl);
-        }
-        if (ctx.shouldComplete()) {
-          break;
-        }
+    execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
+      @Override
+      protected void call(WALObserver observer) throws IOException {
+        observer.preWALRoll(this, oldPath, newPath);
       }
-    }
+    });
   }
 
   /**
@@ -225,28 +194,11 @@ public class WALCoprocessorHost
    * @param newPath the path of the wal we have created and now is the current
    */
   public void postWALRoll(Path oldPath, Path newPath) throws IOException {
-    if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
-    ObserverContext<WALCoprocessorEnvironment> ctx = null;
-    List<WALEnvironment> envs = coprocessors.get();
-    for (int i = 0; i < envs.size(); i++) {
-      WALEnvironment env = envs.get(i);
-      if (env.getInstance() instanceof WALObserver) {
-        final WALObserver observer = (WALObserver)env.getInstance();
-        ctx = ObserverContext.createAndPrepare(env, ctx);
-        Thread currentThread = Thread.currentThread();
-        ClassLoader cl = currentThread.getContextClassLoader();
-        try {
-          currentThread.setContextClassLoader(env.getClassLoader());
-          observer.postWALRoll(ctx, oldPath, newPath);
-        } catch (Throwable e) {
-          handleCoprocessorThrowable(env, e);
-        } finally {
-          currentThread.setContextClassLoader(cl);
-        }
-        if (ctx.shouldComplete()) {
-          break;
-        }
+    execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
+      @Override
+      protected void call(WALObserver observer) throws IOException {
+        observer.postWALRoll(this, oldPath, newPath);
       }
-    }
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
index fdb951b..32ec617 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
@@ -21,11 +21,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
@@ -40,10 +42,15 @@ import org.apache.hadoop.hbase.util.Pair;
  */
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class ReplicationObserver implements RegionObserver {
+public class ReplicationObserver implements RegionCoprocessor, RegionObserver {
   private static final Log LOG = LogFactory.getLog(ReplicationObserver.class);
 
   @Override
+  public Optional<RegionObserver> getRegionObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
   public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
       final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
     RegionCoprocessorEnvironment env = ctx.getEnvironment();

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 7081ea1..d66b754 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -74,13 +75,15 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
@@ -169,8 +172,10 @@ import org.apache.yetus.audience.InterfaceAudience;
  * </p>
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class AccessController implements MasterObserver, RegionObserver, RegionServerObserver,
-      AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
+public class AccessController implements MasterCoprocessor, RegionCoprocessor,
+    RegionServerCoprocessor, AccessControlService.Interface,
+    MasterObserver, RegionObserver, RegionServerObserver, EndpointObserver, BulkLoadObserver {
+  // TODO: encapsulate observer functions into separate class/sub-class.
 
   private static final Log LOG = LogFactory.getLog(AccessController.class);
 
@@ -987,6 +992,39 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
     }
   }
 
+  /*********************************** Observer/Service Getters ***********************************/
+  @Override
+  public Optional<RegionObserver> getRegionObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public Optional<MasterObserver> getMasterObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public Optional<EndpointObserver> getEndpointObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public Optional<BulkLoadObserver> getBulkLoadObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public Optional<RegionServerObserver> getRegionServerObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public Optional<Service> getService() {
+    return Optional.of(AccessControlProtos.AccessControlService.newReflectiveService(this));
+  }
+
+  /*********************************** Observer implementations ***********************************/
+
   @Override
   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
       TableDescriptor desc, RegionInfo[] regions) throws IOException {
@@ -2448,11 +2486,6 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
     done.run(response);
   }
 
-  @Override
-  public Service getService() {
-    return AccessControlProtos.AccessControlService.newReflectiveService(this);
-  }
-
   private Region getRegion(RegionCoprocessorEnvironment e) {
     return e.getRegion();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
index 0b765d7..5b4acbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.security.access;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.regex.Matcher;
 
 import org.apache.commons.io.FilenameUtils;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -43,7 +45,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * Master observer for restricting coprocessor assignments.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class CoprocessorWhitelistMasterObserver implements MasterObserver {
+public class CoprocessorWhitelistMasterObserver implements MasterCoprocessor, MasterObserver {
 
   public static final String CP_COPROCESSOR_WHITELIST_PATHS_KEY =
       "hbase.coprocessor.region.whitelist.paths";
@@ -52,6 +54,11 @@ public class CoprocessorWhitelistMasterObserver implements MasterObserver {
       .getLog(CoprocessorWhitelistMasterObserver.class);
 
   @Override
+  public Optional<MasterObserver> getMasterObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
       TableName tableName, TableDescriptor htd) throws IOException {
     verifyCoprocessors(ctx, htd);

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
index 50b8765..4b1f28e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
@@ -19,13 +19,12 @@
 package org.apache.hadoop.hbase.security.token;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.RpcServer;
@@ -42,6 +41,7 @@ import org.apache.hadoop.security.token.Token;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Provides a service for obtaining authentication tokens via the
@@ -49,7 +49,7 @@ import com.google.protobuf.Service;
  */
 @InterfaceAudience.Private
 public class TokenProvider implements AuthenticationProtos.AuthenticationService.Interface,
-    Coprocessor, CoprocessorService {
+    RegionCoprocessor {
 
   private static final Log LOG = LogFactory.getLog(TokenProvider.class);
 
@@ -96,8 +96,8 @@ public class TokenProvider implements AuthenticationProtos.AuthenticationService
   // AuthenticationService implementation
 
   @Override
-  public Service getService() {
-    return AuthenticationProtos.AuthenticationService.newReflectiveService(this);
+  public Optional<Service> getService() {
+    return Optional.of(AuthenticationProtos.AuthenticationService.newReflectiveService(this));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index b3b1bc4..671e989 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -64,14 +65,13 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.constraint.ConstraintException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -101,7 +101,6 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
@@ -122,8 +121,9 @@ import com.google.protobuf.Service;
  * visibility labels
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class VisibilityController implements MasterObserver, RegionObserver,
-    VisibilityLabelsService.Interface, CoprocessorService {
+// TODO: break out Observer functions into separate class/sub-class.
+public class VisibilityController implements MasterCoprocessor, RegionCoprocessor,
+    VisibilityLabelsService.Interface, MasterObserver, RegionObserver {
 
   private static final Log LOG = LogFactory.getLog(VisibilityController.class);
   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."
@@ -176,10 +176,6 @@ public class VisibilityController implements MasterObserver, RegionObserver,
         + " accordingly.");
     }
 
-    if (env instanceof RegionServerCoprocessorEnvironment) {
-      throw new RuntimeException("Visibility controller should not be configured as "
-          + "'hbase.coprocessor.regionserver.classes'.");
-    }
     // Do not create for master CPs
     if (!(env instanceof MasterCoprocessorEnvironment)) {
       visibilityLabelService = VisibilityLabelServiceManager.getInstance()
@@ -192,6 +188,22 @@ public class VisibilityController implements MasterObserver, RegionObserver,
 
   }
 
+  /**************************** Observer/Service Getters ************************************/
+  @Override
+  public Optional<RegionObserver> getRegionObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public Optional<MasterObserver> getMasterObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public Optional<Service> getService() {
+    return Optional.of(VisibilityLabelsProtos.VisibilityLabelsService.newReflectiveService(this));
+  }
+
   /********************************* Master related hooks **********************************/
 
   @Override
@@ -761,11 +773,6 @@ public class VisibilityController implements MasterObserver, RegionObserver,
   }
 
   @Override
-  public Service getService() {
-    return VisibilityLabelsProtos.VisibilityLabelsService.newReflectiveService(this);
-  }
-
-  @Override
   public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
       final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
     // 'default' in RegionObserver might do unnecessary copy for Off heap backed Cells.
@@ -1087,35 +1094,6 @@ public class VisibilityController implements MasterObserver, RegionObserver,
   }
 
   /**
-   * A RegionServerObserver impl that provides the custom
-   * VisibilityReplicationEndpoint. This class should be configured as the
-   * 'hbase.coprocessor.regionserver.classes' for the visibility tags to be
-   * replicated as string.  The value for the configuration should be
-   * 'org.apache.hadoop.hbase.security.visibility.VisibilityController$VisibilityReplication'.
-   */
-  public static class VisibilityReplication implements RegionServerObserver {
-    private Configuration conf;
-    private VisibilityLabelService visibilityLabelService;
-
-    @Override
-    public void start(CoprocessorEnvironment env) throws IOException {
-      this.conf = env.getConfiguration();
-      visibilityLabelService = VisibilityLabelServiceManager.getInstance()
-          .getVisibilityLabelService(this.conf);
-    }
-
-    @Override
-    public void stop(CoprocessorEnvironment env) throws IOException {
-    }
-
-    @Override
-    public ReplicationEndpoint postCreateReplicationEndPoint(
-        ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
-      return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
-    }
-  }
-
-  /**
    * @param t
    * @return NameValuePair of the exception name to stringified version os exception.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplication.java
new file mode 100644
index 0000000..6887c31
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplication.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.visibility;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * A RegionServerObserver impl that provides the custom
+ * VisibilityReplicationEndpoint. This class should be configured as the
+ * 'hbase.coprocessor.regionserver.classes' for the visibility tags to be
+ * replicated as string.  The value for the configuration should be
+ * 'org.apache.hadoop.hbase.security.visibility.VisibilityController$VisibilityReplication'.
+ */
+public class VisibilityReplication implements RegionServerCoprocessor, RegionServerObserver {
+  private Configuration conf;
+  private VisibilityLabelService visibilityLabelService;
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    this.conf = env.getConfiguration();
+    visibilityLabelService = VisibilityLabelServiceManager.getInstance()
+        .getVisibilityLabelService(this.conf);
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+  }
+
+  @Override public Optional<RegionServerObserver> getRegionServerObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public ReplicationEndpoint postCreateReplicationEndPoint(
+      ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
+    return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java
index 5ec61d4..60fd22d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java
@@ -23,12 +23,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -58,9 +60,15 @@ import java.util.concurrent.atomic.AtomicLong;
  * 0 row(s) in 0.0050 seconds
  * </p>
  */
-public class WriteSinkCoprocessor implements RegionObserver {
+public class WriteSinkCoprocessor implements RegionCoprocessor, RegionObserver {
   private static final Log LOG = LogFactory.getLog(WriteSinkCoprocessor.class);
   private final AtomicLong ops = new AtomicLong();
+
+  @Override
+  public Optional<RegionObserver> getRegionObserver() {
+    return Optional.of(this);
+  }
+
   private String regionName;
 
   @Override
@@ -68,7 +76,6 @@ public class WriteSinkCoprocessor implements RegionObserver {
     regionName = e.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString();
   }
 
-
   @Override
   public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
                              final MiniBatchOperationInProgress<Mutation> miniBatchOp)

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 682709e..cfe4d1f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -159,13 +161,18 @@ public class HConnectionTestingUtility {
   /**
    * This coproceesor sleep 2s at first increment/append rpc call.
    */
-  public static class SleepAtFirstRpcCall implements RegionObserver {
+  public static class SleepAtFirstRpcCall implements RegionCoprocessor, RegionObserver {
     static final AtomicLong ct = new AtomicLong(0);
     static final String SLEEP_TIME_CONF_KEY =
         "hbase.coprocessor.SleepAtFirstRpcCall.sleepTime";
     static final long DEFAULT_SLEEP_TIME = 2000;
     static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
 
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
     public SleepAtFirstRpcCall() {
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
index 350bf6e..05324aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +36,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -170,10 +172,15 @@ public class TestAsyncAdminBuilder {
     }
   }
 
-  public static class TestRpcTimeoutCoprocessor implements MasterObserver {
+  public static class TestRpcTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
     public TestRpcTimeoutCoprocessor() {
     }
 
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+      return Optional.of(this);
+    }
     @Override
     public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
         String namespace) throws IOException {
@@ -181,13 +188,18 @@ public class TestAsyncAdminBuilder {
     }
   }
 
-  public static class TestOperationTimeoutCoprocessor implements MasterObserver {
+  public static class TestOperationTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
     AtomicLong sleepTime = new AtomicLong(0);
 
     public TestOperationTimeoutCoprocessor() {
     }
 
     @Override
+    public Optional<MasterObserver> getMasterObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
     public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
         String namespace) throws IOException {
       Threads.sleep(DEFAULT_RPC_TIMEOUT / 2);
@@ -197,13 +209,18 @@ public class TestAsyncAdminBuilder {
     }
   }
 
-  public static class TestMaxRetriesCoprocessor implements MasterObserver {
+  public static class TestMaxRetriesCoprocessor implements MasterCoprocessor, MasterObserver {
     AtomicLong retryNum = new AtomicLong(0);
 
     public TestMaxRetriesCoprocessor() {
     }
 
     @Override
+    public Optional<MasterObserver> getMasterObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
     public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
         String namespace) throws IOException {
       if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index 70df318..efa2c1e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -73,7 +75,12 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
 
   private static AtomicInteger MAX_CONCURRENCY = new AtomicInteger(0);
 
-  public static final class CountingRegionObserver implements RegionObserver {
+  public static final class CountingRegionObserver implements RegionCoprocessor, RegionObserver {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
 
     @Override
     public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
index e4c343a..8a341b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -65,7 +67,11 @@ public class TestAsyncRegionLocatorTimeout {
 
   private static volatile long SLEEP_MS = 0L;
 
-  public static class SleepRegionObserver implements RegionObserver {
+  public static class SleepRegionObserver implements RegionCoprocessor, RegionObserver {
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
 
     @Override
     public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index 6c9dd86..fce9041 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -28,6 +28,7 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -202,7 +204,12 @@ public class TestAsyncTableBatch {
     assertEquals(4, Bytes.toInt(appendValue, 8));
   }
 
-  public static final class ErrorInjectObserver implements RegionObserver {
+  public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
 
     @Override
     public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
index b389d9e..30fe731 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -253,7 +255,12 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
     }
   }
 
-  public static class CompactorRegionObserver implements RegionObserver {
+  public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
 
     @Override
     public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
index e1b31e7..d558307 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -1549,7 +1551,7 @@ public class TestBlockEvictionFromClient {
     }
   }
 
-  public static class CustomInnerRegionObserver implements RegionObserver {
+  public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
     static final AtomicLong sleepTime = new AtomicLong(0);
     static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
     static final AtomicInteger countOfNext = new AtomicInteger(0);
@@ -1560,6 +1562,11 @@ public class TestBlockEvictionFromClient {
         new CountDownLatch(0));
 
     @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
     public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
         InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
       slowdownCode(e, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
index 62ceca3..e92ba23 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -45,6 +46,7 @@ import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 @Category({MediumTests.class, ClientTests.class})
@@ -58,7 +60,12 @@ public class TestClientOperationInterrupt {
   private static final byte[] test = Bytes.toBytes("test");
   private static Configuration conf;
 
-  public static class TestCoprocessor implements RegionObserver {
+  public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
     @Override
     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
                          final Get get, final List<Cell> results) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
index 379ab31..6b03594 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -189,11 +191,16 @@ public class TestEnableTable {
     }
   }
 
-  public  static class MasterSyncObserver implements MasterObserver {
+  public  static class MasterSyncObserver implements MasterCoprocessor, MasterObserver {
     volatile CountDownLatch tableCreationLatch = null;
     volatile CountDownLatch tableDeletionLatch = null;
 
     @Override
+    public Optional<MasterObserver> getMasterObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
     public void postCompletedCreateTableAction(
         final ObserverContext<MasterCoprocessorEnvironment> ctx,
         final TableDescriptor desc,
@@ -222,8 +229,8 @@ public class TestEnableTable {
   throws Exception {
     // NOTE: We need a latch because admin is not sync,
     // so the postOp coprocessor method may be called after the admin operation returned.
-    MasterSyncObserver observer = (MasterSyncObserver)testUtil.getHBaseCluster().getMaster()
-      .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class.getName());
+    MasterSyncObserver observer = testUtil.getHBaseCluster().getMaster()
+      .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class);
     observer.tableCreationLatch = new CountDownLatch(1);
     Admin admin = testUtil.getAdmin();
     if (splitKeys != null) {
@@ -240,8 +247,8 @@ public class TestEnableTable {
   throws Exception {
     // NOTE: We need a latch because admin is not sync,
     // so the postOp coprocessor method may be called after the admin operation returned.
-    MasterSyncObserver observer = (MasterSyncObserver)testUtil.getHBaseCluster().getMaster()
-      .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class.getName());
+    MasterSyncObserver observer = testUtil.getHBaseCluster().getMaster()
+      .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class);
     observer.tableDeletionLatch = new CountDownLatch(1);
     Admin admin = testUtil.getAdmin();
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 10169ab..a938db6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@@ -543,7 +545,7 @@ public class TestFromClientSide {
    * This is a coprocessor to inject a test failure so that a store scanner.reseek() call will
    * fail with an IOException() on the first call.
    */
-  public static class ExceptionInReseekRegionObserver implements RegionObserver {
+  public static class ExceptionInReseekRegionObserver implements RegionCoprocessor, RegionObserver {
     static AtomicLong reqCount = new AtomicLong(0);
     static AtomicBoolean isDoNotRetry = new AtomicBoolean(false); // whether to throw DNRIOE
     static AtomicBoolean throwOnce = new AtomicBoolean(true); // whether to only throw once
@@ -554,6 +556,11 @@ public class TestFromClientSide {
       throwOnce.set(true);
     }
 
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
     class MyStoreScanner extends StoreScanner {
       public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
           long readPt) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 89ea5b7..ca0a5ea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -688,7 +690,7 @@ public class TestFromClientSide3 {
 
   private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception {
     HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addCoprocessor(WatiingForScanObserver.class.getName());
+    desc.addCoprocessor(WaitingForScanObserver.class.getName());
     desc.addFamily(new HColumnDescriptor(FAMILY));
     TEST_UTIL.getAdmin().createTable(desc);
     ExecutorService service = Executors.newFixedThreadPool(2);
@@ -720,7 +722,7 @@ public class TestFromClientSide3 {
   public void testLockLeakWithDelta() throws Exception, Throwable {
     final TableName tableName = TableName.valueOf(name.getMethodName());
     HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
+    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
     desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
     desc.addFamily(new HColumnDescriptor(FAMILY));
     TEST_UTIL.getAdmin().createTable(desc);
@@ -735,7 +737,7 @@ public class TestFromClientSide3 {
         try (Table table = con.getTable(tableName)) {
           Put put = new Put(ROW);
           put.addColumn(FAMILY, QUALIFIER, VALUE);
-          // the put will be blocked by WatiingForMultiMutationsObserver.
+          // the put will be blocked by WaitingForMultiMutationsObserver.
           table.put(put);
         } catch (IOException ex) {
           throw new RuntimeException(ex);
@@ -753,7 +755,7 @@ public class TestFromClientSide3 {
       });
       appendService.shutdown();
       appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
-      WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
+      WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class);
       observer.latch.countDown();
       putService.shutdown();
       putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
@@ -774,7 +776,7 @@ public class TestFromClientSide3 {
     final TableName tableName = TableName.valueOf(name.getMethodName());
     HTableDescriptor desc = new HTableDescriptor(tableName);
     desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
-    desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
+    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
     desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
     desc.addFamily(new HColumnDescriptor(FAMILY));
     TEST_UTIL.getAdmin().createTable(desc);
@@ -793,7 +795,7 @@ public class TestFromClientSide3 {
         try (Table table = con.getTable(tableName)) {
           Put put0 = new Put(rowLocked);
           put0.addColumn(FAMILY, QUALIFIER, value0);
-          // the put will be blocked by WatiingForMultiMutationsObserver.
+          // the put will be blocked by WaitingForMultiMutationsObserver.
           table.put(put0);
         } catch (IOException ex) {
           throw new RuntimeException(ex);
@@ -830,7 +832,7 @@ public class TestFromClientSide3 {
       });
       cpService.shutdown();
       cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
-      WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
+      WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class);
       observer.latch.countDown();
       putService.shutdown();
       putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
@@ -975,8 +977,15 @@ public class TestFromClientSide3 {
     return clz.cast(cp);
   }
 
-  public static class WatiingForMultiMutationsObserver implements RegionObserver {
+  public static class WaitingForMultiMutationsObserver
+      implements RegionCoprocessor, RegionObserver {
     final CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
     @Override
     public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
             final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -988,8 +997,14 @@ public class TestFromClientSide3 {
     }
   }
 
-  public static class WatiingForScanObserver implements RegionObserver {
+  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
     private final CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
     @Override
     public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
             final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 56c8c7c..1a67457 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -31,6 +31,7 @@ import java.lang.reflect.Modifier;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
@@ -118,7 +120,7 @@ public class TestHCM {
 /**
 * This copro sleeps 20 second. The first call it fails. The second time, it works.
 */
-  public static class SleepAndFailFirstTime implements RegionObserver {
+  public static class SleepAndFailFirstTime implements RegionCoprocessor, RegionObserver {
     static final AtomicLong ct = new AtomicLong(0);
     static final String SLEEP_TIME_CONF_KEY =
         "hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
@@ -129,6 +131,11 @@ public class TestHCM {
     }
 
     @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
     public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
       RegionCoprocessorEnvironment env = c.getEnvironment();
       Configuration conf = env.getConfiguration();
@@ -175,8 +182,14 @@ public class TestHCM {
 
   }
 
-  public static class SleepCoprocessor implements RegionObserver {
+  public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver {
     public static final int SLEEP_TIME = 5000;
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
     @Override
     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
         final Get get, final List<Cell> results) throws IOException {
@@ -204,9 +217,15 @@ public class TestHCM {
 
   }
 
-  public static class SleepLongerAtFirstCoprocessor implements RegionObserver {
+  public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver {
     public static final int SLEEP_TIME = 2000;
     static final AtomicLong ct = new AtomicLong(0);
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
     @Override
     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
         final Get get, final List<Cell> results) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
index b938f7e..1745c82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.Optional;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
@@ -137,7 +139,13 @@ public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient
   /**
    * This coprocessor is used to delay the flush.
    */
-  public static class DelayFlushCoprocessor implements RegionObserver {
+  public static class DelayFlushCoprocessor implements RegionCoprocessor, RegionObserver {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
     @Override
     public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
       if (delayFlush) {