You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/09/27 19:45:35 UTC

[4/7] hbase git commit: HBASE-17732 Coprocessor Design Improvements

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java
index c59e5e7..d6dbcd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hbase.quotas;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -24,6 +25,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -35,7 +37,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
  * are deleted.
  */
 @InterfaceAudience.Private
-public class MasterSpaceQuotaObserver implements MasterObserver {
+public class MasterSpaceQuotaObserver implements MasterCoprocessor, MasterObserver {
   public static final String REMOVE_QUOTA_ON_TABLE_DELETE = "hbase.quota.remove.on.table.delete";
   public static final boolean REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT = true;
 
@@ -44,6 +46,11 @@ public class MasterSpaceQuotaObserver implements MasterObserver {
   private boolean quotasEnabled = false;
 
   @Override
+  public Optional<MasterObserver> getMasterObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
   public void start(CoprocessorEnvironment ctx) throws IOException {
     this.cpEnv = ctx;
     this.conf = cpEnv.getConfiguration();

http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 19491b4..84e9aa5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -33,7 +33,6 @@ import com.google.protobuf.Message;
 import com.google.protobuf.Service;
 import org.apache.commons.collections4.map.AbstractReferenceMap;
 import org.apache.commons.collections4.map.ReferenceMap;
-import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -56,11 +55,15 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
+import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
 import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
 import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
@@ -68,7 +71,6 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.metrics.MetricRegistry;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
@@ -91,7 +93,7 @@ import org.apache.yetus.audience.InterfaceStability;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
 @InterfaceStability.Evolving
 public class RegionCoprocessorHost
-    extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
+    extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
 
   private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
   // The shared data map
@@ -103,10 +105,10 @@ public class RegionCoprocessorHost
   private final boolean hasCustomPostScannerFilterRow;
 
   /**
-   * 
+   *
    * Encapsulation of the environment of each coprocessor
    */
-  static class RegionEnvironment extends CoprocessorHost.Environment
+  static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
       implements RegionCoprocessorEnvironment {
 
     private Region region;
@@ -119,7 +121,7 @@ public class RegionCoprocessorHost
      * @param impl the coprocessor instance
      * @param priority chaining priority
      */
-    public RegionEnvironment(final Coprocessor impl, final int priority,
+    public RegionEnvironment(final RegionCoprocessor impl, final int priority,
         final int seq, final Configuration conf, final Region region,
         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
       super(impl, priority, seq, conf);
@@ -142,6 +144,7 @@ public class RegionCoprocessorHost
       return rsServices;
     }
 
+    @Override
     public void shutdown() {
       super.shutdown();
       MetricsCoprocessor.removeRegistry(this.metricRegistry);
@@ -226,7 +229,7 @@ public class RegionCoprocessorHost
 
     // now check whether any coprocessor implements postScannerFilterRow
     boolean hasCustomPostScannerFilterRow = false;
-    out: for (RegionEnvironment env: coprocessors) {
+    out: for (RegionCoprocessorEnvironment env: coprocEnvironments) {
       if (env.getInstance() instanceof RegionObserver) {
         Class<?> clazz = env.getInstance().getClass();
         for(;;) {
@@ -361,13 +364,16 @@ public class RegionCoprocessorHost
 
     // scan the table attributes for coprocessor load specifications
     // initialize the coprocessors
-    List<RegionEnvironment> configured = new ArrayList<>();
+    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
         region.getTableDescriptor())) {
       // Load encompasses classloading and coprocessor initialization
       try {
-        RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
-          attr.getConf());
+        RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(),
+            attr.getPriority(), attr.getConf());
+        if (env == null) {
+          continue;
+        }
         configured.add(env);
         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
             region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
@@ -381,60 +387,101 @@ public class RegionCoprocessorHost
       }
     }
     // add together to coprocessor set for COW efficiency
-    coprocessors.addAll(configured);
+    coprocEnvironments.addAll(configured);
   }
 
   @Override
-  public RegionEnvironment createEnvironment(Class<?> implClass,
-      Coprocessor instance, int priority, int seq, Configuration conf) {
-    // Check if it's an Endpoint.
-    // Due to current dynamic protocol design, Endpoint
-    // uses a different way to be registered and executed.
-    // It uses a visitor pattern to invoke registered Endpoint
-    // method.
-    for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
-      Class<?> c = (Class<?>) itf;
-      if (CoprocessorService.class.isAssignableFrom(c)) {
-        region.registerService( ((CoprocessorService)instance).getService() );
-      }
-    }
+  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
+      Configuration conf) {
+    // Due to current dynamic protocol design, Endpoint uses a different way to be registered and
+    // executed. It uses a visitor pattern to invoke registered Endpoint method.
+    instance.getService().ifPresent(region::registerService);
     ConcurrentMap<String, Object> classData;
     // make sure only one thread can add maps
     synchronized (SHARED_DATA_MAP) {
       // as long as at least one RegionEnvironment holds on to its classData it will
       // remain in this map
       classData =
-          SHARED_DATA_MAP.computeIfAbsent(implClass.getName(), k -> new ConcurrentHashMap<>());
+          SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
+              k -> new ConcurrentHashMap<>());
     }
     return new RegionEnvironment(instance, priority, seq, conf, region,
         rsServices, classData);
   }
 
+  @Override
+  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
+      throws InstantiationException, IllegalAccessException {
+    if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
+      return (RegionCoprocessor)implClass.newInstance();
+    } else if (CoprocessorService.class.isAssignableFrom(implClass)) {
+      // For backward compatibility with old CoprocessorService impl which don't extend
+      // RegionCoprocessor.
+      return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(
+          (CoprocessorService)implClass.newInstance());
+    } else {
+      LOG.error(implClass.getName() + " is not of type RegionCoprocessor. Check the "
+          + "configuration " + CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
+      return null;
+    }
+  }
+
+  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
+      RegionCoprocessor::getRegionObserver;
+
+  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
+      RegionCoprocessor::getEndpointObserver;
+
+  abstract class RegionObserverOperation extends ObserverOperationWithoutResult<RegionObserver> {
+    public RegionObserverOperation() {
+      super(regionObserverGetter);
+    }
+
+    public RegionObserverOperation(User user) {
+      super(regionObserverGetter, user);
+    }
+  }
+
+  abstract class BulkLoadObserverOperation extends
+      ObserverOperationWithoutResult<BulkLoadObserver> {
+    public BulkLoadObserverOperation(User user) {
+      super(RegionCoprocessor::getBulkLoadObserver, user);
+    }
+  }
+
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // Observer operations
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // Observer operations
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
   /**
    * Invoked before a region open.
    *
    * @throws IOException Signals that an I/O exception has occurred.
    */
   public void preOpen() throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preOpen(ctx);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preOpen(this);
       }
     });
   }
 
+
   /**
    * Invoked after a region open
    */
   public void postOpen() {
     try {
-      execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+      execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
         @Override
-        public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-            throws IOException {
-          oserver.postOpen(ctx);
+        public void call(RegionObserver observer) throws IOException {
+          observer.postOpen(this);
         }
       });
     } catch (IOException e) {
@@ -447,11 +494,10 @@ public class RegionCoprocessorHost
    */
   public void postLogReplay() {
     try {
-      execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+      execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
         @Override
-        public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-            throws IOException {
-          oserver.postLogReplay(ctx);
+        public void call(RegionObserver observer) throws IOException {
+          observer.postLogReplay(this);
         }
       });
     } catch (IOException e) {
@@ -464,11 +510,10 @@ public class RegionCoprocessorHost
    * @param abortRequested true if the server is aborting
    */
   public void preClose(final boolean abortRequested) throws IOException {
-    execOperation(false, new RegionOperation() {
+    execOperation(false, new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preClose(ctx, abortRequested);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preClose(this, abortRequested);
       }
     });
   }
@@ -479,14 +524,15 @@ public class RegionCoprocessorHost
    */
   public void postClose(final boolean abortRequested) {
     try {
-      execOperation(false, new RegionOperation() {
+      execOperation(false, new RegionObserverOperation() {
         @Override
-        public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-            throws IOException {
-          oserver.postClose(ctx, abortRequested);
+        public void call(RegionObserver observer) throws IOException {
+          observer.postClose(this, abortRequested);
         }
-        public void postEnvCall(RegionEnvironment env) {
-          shutdown(env);
+
+        @Override
+        public void postEnvCall() {
+          shutdown(this.getEnvironment());
         }
       });
     } catch (IOException e) {
@@ -499,18 +545,19 @@ public class RegionCoprocessorHost
    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
    *   InternalScanner, CompactionLifeCycleTracker, long)}
    */
-  public InternalScanner preCompactScannerOpen(HStore store, List<StoreFileScanner> scanners,
-      ScanType scanType, long earliestPutTs, CompactionLifeCycleTracker tracker, User user,
-      long readPoint) throws IOException {
-    return execOperationWithResult(null,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
-          earliestPutTs, getResult(), tracker, readPoint));
-      }
-    });
+  public InternalScanner preCompactScannerOpen(final HStore store,
+      final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
+      final CompactionLifeCycleTracker tracker, final User user, final long readPoint)
+      throws IOException {
+    return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, InternalScanner>(
+            regionObserverGetter, user) {
+          @Override
+          public InternalScanner call(RegionObserver observer) throws IOException {
+            return observer.preCompactScannerOpen(this, store, scanners, scanType,
+                earliestPutTs, getResult(), tracker, readPoint);
+          }
+        });
   }
 
   /**
@@ -522,13 +569,12 @@ public class RegionCoprocessorHost
    * @return If {@code true}, skip the normal selection process and use the current list
    * @throws IOException
    */
-  public boolean preCompactSelection(HStore store, List<HStoreFile> candidates,
-      CompactionLifeCycleTracker tracker, User user) throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
+  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
+      final CompactionLifeCycleTracker tracker, final User user) throws IOException {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preCompactSelection(ctx, store, candidates, tracker);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preCompactSelection(this, store, candidates, tracker);
       }
     });
   }
@@ -540,13 +586,12 @@ public class RegionCoprocessorHost
    * @param selected The store files selected to compact
    * @param tracker used to track the life cycle of a compaction
    */
-  public void postCompactSelection(HStore store, ImmutableList<HStoreFile> selected,
-      CompactionLifeCycleTracker tracker, User user) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
+  public void postCompactSelection(final HStore store, final ImmutableList<HStoreFile> selected,
+      final CompactionLifeCycleTracker tracker, final User user) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postCompactSelection(ctx, store, selected, tracker);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postCompactSelection(this, store, selected, tracker);
       }
     });
   }
@@ -559,16 +604,17 @@ public class RegionCoprocessorHost
    * @param tracker used to track the life cycle of a compaction
    * @throws IOException
    */
-  public InternalScanner preCompact(HStore store, InternalScanner scanner, ScanType scanType,
-      CompactionLifeCycleTracker tracker, User user) throws IOException {
-    return execOperationWithResult(false, scanner,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preCompact(ctx, store, getResult(), scanType, tracker));
-      }
-    });
+  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
+      final ScanType scanType, final CompactionLifeCycleTracker tracker, final User user)
+      throws IOException {
+    return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, InternalScanner>(
+            regionObserverGetter, user) {
+          @Override
+          public InternalScanner call(RegionObserver observer) throws IOException {
+            return observer.preCompact(this, store, getResult(), scanType, tracker);
+          }
+        });
   }
 
   /**
@@ -578,13 +624,12 @@ public class RegionCoprocessorHost
    * @param tracker used to track the life cycle of a compaction
    * @throws IOException
    */
-  public void postCompact(HStore store, HStoreFile resultFile, CompactionLifeCycleTracker tracker,
-      User user) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
+  public void postCompact(final HStore store, final HStoreFile resultFile,
+      final CompactionLifeCycleTracker tracker, final User user) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postCompact(ctx, store, resultFile, tracker);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postCompact(this, store, resultFile, tracker);
       }
     });
   }
@@ -595,14 +640,13 @@ public class RegionCoprocessorHost
    */
   public InternalScanner preFlush(HStore store, final InternalScanner scanner)
       throws IOException {
-    return execOperationWithResult(false, scanner,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preFlush(ctx, store, getResult()));
-      }
-    });
+    return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter) {
+          @Override
+          public InternalScanner call(RegionObserver observer) throws IOException {
+            return observer.preFlush(this, store, getResult());
+          }
+        });
   }
 
   /**
@@ -610,11 +654,10 @@ public class RegionCoprocessorHost
    * @throws IOException
    */
   public void preFlush() throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preFlush(ctx);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preFlush(this);
       }
     });
   }
@@ -623,16 +666,15 @@ public class RegionCoprocessorHost
    * See
    * {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
    */
-  public InternalScanner preFlushScannerOpen(HStore store, List<KeyValueScanner> scanners,
-      long readPoint) throws IOException {
-    return execOperationWithResult(null,
-      coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
-        @Override
-        public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-            throws IOException {
-          setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint));
-        }
-      });
+  public InternalScanner preFlushScannerOpen(final HStore store,
+      final List<KeyValueScanner> scanners, final long readPoint) throws IOException {
+    return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter) {
+          @Override
+          public InternalScanner call(RegionObserver observer) throws IOException {
+            return observer.preFlushScannerOpen(this, store, scanners, getResult(), readPoint);
+          }
+        });
   }
 
   /**
@@ -640,11 +682,10 @@ public class RegionCoprocessorHost
    * @throws IOException
    */
   public void postFlush() throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postFlush(ctx);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postFlush(this);
       }
     });
   }
@@ -653,12 +694,11 @@ public class RegionCoprocessorHost
    * Invoked after a memstore flush
    * @throws IOException
    */
-  public void postFlush(HStore store, HStoreFile storeFile) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+  public void postFlush(final HStore store, final HStoreFile storeFile) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postFlush(ctx, store, storeFile);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postFlush(this, store, storeFile);
       }
     });
   }
@@ -671,11 +711,10 @@ public class RegionCoprocessorHost
    */
   public boolean preGet(final Get get, final List<Cell> results)
       throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preGetOp(ctx, get, results);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preGetOp(this, get, results);
       }
     });
   }
@@ -687,11 +726,10 @@ public class RegionCoprocessorHost
    */
   public void postGet(final Get get, final List<Cell> results)
       throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postGetOp(ctx, get, results);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postGetOp(this, get, results);
       }
     });
   }
@@ -703,14 +741,13 @@ public class RegionCoprocessorHost
    * @exception IOException Exception
    */
   public Boolean preExists(final Get get) throws IOException {
-    return execOperationWithResult(true, false,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preExists(ctx, get, getResult()));
-      }
-    });
+    return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.preExists(this, get, getResult());
+          }
+        });
   }
 
   /**
@@ -721,14 +758,13 @@ public class RegionCoprocessorHost
    */
   public boolean postExists(final Get get, boolean exists)
       throws IOException {
-    return execOperationWithResult(exists,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postExists(ctx, get, getResult()));
-      }
-    });
+    return execOperationWithResult(exists, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.postExists(this, get, getResult());
+          }
+        });
   }
 
   /**
@@ -740,11 +776,10 @@ public class RegionCoprocessorHost
    */
   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
       throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.prePut(ctx, put, edit, durability);
+      public void call(RegionObserver observer) throws IOException {
+        observer.prePut(this, put, edit, durability);
       }
     });
   }
@@ -761,11 +796,10 @@ public class RegionCoprocessorHost
    */
   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
+      public void call(RegionObserver observer) throws IOException {
+        observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
       }
     });
   }
@@ -778,11 +812,10 @@ public class RegionCoprocessorHost
    */
   public void postPut(final Put put, final WALEdit edit, final Durability durability)
       throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postPut(ctx, put, edit, durability);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postPut(this, put, edit, durability);
       }
     });
   }
@@ -796,11 +829,10 @@ public class RegionCoprocessorHost
    */
   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
       throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preDelete(ctx, delete, edit, durability);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preDelete(this, delete, edit, durability);
       }
     });
   }
@@ -813,11 +845,10 @@ public class RegionCoprocessorHost
    */
   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
       throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postDelete(ctx, delete, edit, durability);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postDelete(this, delete, edit, durability);
       }
     });
   }
@@ -829,11 +860,10 @@ public class RegionCoprocessorHost
    */
   public boolean preBatchMutate(
       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preBatchMutate(ctx, miniBatchOp);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preBatchMutate(this, miniBatchOp);
       }
     });
   }
@@ -844,11 +874,10 @@ public class RegionCoprocessorHost
    */
   public void postBatchMutate(
       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postBatchMutate(ctx, miniBatchOp);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postBatchMutate(this, miniBatchOp);
       }
     });
   }
@@ -856,11 +885,10 @@ public class RegionCoprocessorHost
   public void postBatchMutateIndispensably(
       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
       throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
       }
     });
   }
@@ -880,15 +908,14 @@ public class RegionCoprocessorHost
       final byte [] qualifier, final CompareOperator op,
       final ByteArrayComparable comparator, final Put put)
       throws IOException {
-    return execOperationWithResult(true, false,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
-          op, comparator, put, getResult()));
-      }
-    });
+    return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.preCheckAndPut(this, row, family, qualifier,
+                op, comparator, put, getResult());
+          }
+        });
   }
 
   /**
@@ -902,18 +929,17 @@ public class RegionCoprocessorHost
    * be bypassed, or null otherwise
    * @throws IOException e
    */
-  public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
-                                            final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
-                                            final Put put) throws IOException {
-    return execOperationWithResult(true, false,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
-          op, comparator, put, getResult()));
-      }
-    });
+  public Boolean preCheckAndPutAfterRowLock(
+      final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
+      final ByteArrayComparable comparator, final Put put) throws IOException {
+    return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier,
+                op, comparator, put, getResult());
+          }
+        });
   }
 
   /**
@@ -929,15 +955,14 @@ public class RegionCoprocessorHost
       final byte [] qualifier, final CompareOperator op,
       final ByteArrayComparable comparator, final Put put,
       boolean result) throws IOException {
-    return execOperationWithResult(result,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
-          op, comparator, put, getResult()));
-      }
-    });
+    return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.postCheckAndPut(this, row, family, qualifier,
+                op, comparator, put, getResult());
+          }
+        });
   }
 
   /**
@@ -955,15 +980,14 @@ public class RegionCoprocessorHost
       final byte [] qualifier, final CompareOperator op,
       final ByteArrayComparable comparator, final Delete delete)
       throws IOException {
-    return execOperationWithResult(true, false,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preCheckAndDelete(ctx, row, family,
-            qualifier, op, comparator, delete, getResult()));
-      }
-    });
+    return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.preCheckAndDelete(this, row, family,
+                qualifier, op, comparator, delete, getResult());
+          }
+        });
   }
 
   /**
@@ -978,17 +1002,16 @@ public class RegionCoprocessorHost
    * @throws IOException e
    */
   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
-                                               final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
-                                               final Delete delete) throws IOException {
-    return execOperationWithResult(true, false,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
-              family, qualifier, op, comparator, delete, getResult()));
-      }
-    });
+      final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
+      final Delete delete) throws IOException {
+    return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.preCheckAndDeleteAfterRowLock(this, row,
+                family, qualifier, op, comparator, delete, getResult());
+          }
+        });
   }
 
   /**
@@ -1004,15 +1027,14 @@ public class RegionCoprocessorHost
       final byte [] qualifier, final CompareOperator op,
       final ByteArrayComparable comparator, final Delete delete,
       boolean result) throws IOException {
-    return execOperationWithResult(result,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postCheckAndDelete(ctx, row, family,
-            qualifier, op, comparator, delete, getResult()));
-      }
-    });
+    return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.postCheckAndDelete(this, row, family,
+                qualifier, op, comparator, delete, getResult());
+          }
+        });
   }
 
   /**
@@ -1022,14 +1044,13 @@ public class RegionCoprocessorHost
    * @throws IOException if an error occurred on the coprocessor
    */
   public Result preAppend(final Append append) throws IOException {
-    return execOperationWithResult(true, null,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preAppend(ctx, append));
-      }
-    });
+    return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+          @Override
+          public Result call(RegionObserver observer) throws IOException {
+            return observer.preAppend(this, append);
+          }
+        });
   }
 
   /**
@@ -1039,14 +1060,13 @@ public class RegionCoprocessorHost
    * @throws IOException if an error occurred on the coprocessor
    */
   public Result preAppendAfterRowLock(final Append append) throws IOException {
-    return execOperationWithResult(true, null,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preAppendAfterRowLock(ctx, append));
-      }
-    });
+    return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+          @Override
+          public Result call(RegionObserver observer) throws IOException {
+            return observer.preAppendAfterRowLock(this, append);
+          }
+        });
   }
 
   /**
@@ -1056,14 +1076,13 @@ public class RegionCoprocessorHost
    * @throws IOException if an error occurred on the coprocessor
    */
   public Result preIncrement(final Increment increment) throws IOException {
-    return execOperationWithResult(true, null,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preIncrement(ctx, increment));
-      }
-    });
+    return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+          @Override
+          public Result call(RegionObserver observer) throws IOException {
+            return observer.preIncrement(this, increment);
+          }
+        });
   }
 
   /**
@@ -1073,14 +1092,13 @@ public class RegionCoprocessorHost
    * @throws IOException if an error occurred on the coprocessor
    */
   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
-    return execOperationWithResult(true, null,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preIncrementAfterRowLock(ctx, increment));
-      }
-    });
+    return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+          @Override
+          public Result call(RegionObserver observer) throws IOException {
+            return observer.preIncrementAfterRowLock(this, increment);
+          }
+        });
   }
 
   /**
@@ -1089,14 +1107,13 @@ public class RegionCoprocessorHost
    * @throws IOException if an error occurred on the coprocessor
    */
   public Result postAppend(final Append append, final Result result) throws IOException {
-    return execOperationWithResult(result,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postAppend(ctx, append, result));
-      }
-    });
+    return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+          @Override
+          public Result call(RegionObserver observer) throws IOException {
+            return observer.postAppend(this, append, result);
+          }
+        });
   }
 
   /**
@@ -1105,14 +1122,13 @@ public class RegionCoprocessorHost
    * @throws IOException if an error occurred on the coprocessor
    */
   public Result postIncrement(final Increment increment, Result result) throws IOException {
-    return execOperationWithResult(result,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postIncrement(ctx, increment, getResult()));
-      }
-    });
+    return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+          @Override
+          public Result call(RegionObserver observer) throws IOException {
+            return observer.postIncrement(this, increment, getResult());
+          }
+        });
   }
 
   /**
@@ -1122,30 +1138,28 @@ public class RegionCoprocessorHost
    * @exception IOException Exception
    */
   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
-    return execOperationWithResult(true, null,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preScannerOpen(ctx, scan, getResult()));
-      }
-    });
+    return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter) {
+          @Override
+          public RegionScanner call(RegionObserver observer) throws IOException {
+            return observer.preScannerOpen(this, scan, getResult());
+          }
+        });
   }
 
   /**
    * See
    * {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner, long)}
    */
-  public KeyValueScanner preStoreScannerOpen(HStore store, Scan scan,
-      NavigableSet<byte[]> targetCols, long readPt) throws IOException {
-    return execOperationWithResult(null,
-      coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
-        @Override
-        public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-            throws IOException {
-          setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult(), readPt));
-        }
-      });
+  public KeyValueScanner preStoreScannerOpen(final HStore store, final Scan scan,
+      final NavigableSet<byte[]> targetCols, final long readPt) throws IOException {
+    return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, KeyValueScanner>(regionObserverGetter) {
+          @Override
+          public KeyValueScanner call(RegionObserver observer) throws IOException {
+            return observer.preStoreScannerOpen(this, store, scan, targetCols, getResult(), readPt);
+          }
+        });
   }
 
   /**
@@ -1155,14 +1169,13 @@ public class RegionCoprocessorHost
    * @exception IOException Exception
    */
   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
-    return execOperationWithResult(s,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postScannerOpen(ctx, scan, getResult()));
-      }
-    });
+    return execOperationWithResult(s, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter) {
+          @Override
+          public RegionScanner call(RegionObserver observer) throws IOException {
+            return observer.postScannerOpen(this, scan, getResult());
+          }
+        });
   }
 
   /**
@@ -1175,14 +1188,13 @@ public class RegionCoprocessorHost
    */
   public Boolean preScannerNext(final InternalScanner s,
       final List<Result> results, final int limit) throws IOException {
-    return execOperationWithResult(true, false,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
-      }
-    });
+    return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.preScannerNext(this, s, results, limit, getResult());
+          }
+        });
   }
 
   /**
@@ -1196,14 +1208,13 @@ public class RegionCoprocessorHost
   public boolean postScannerNext(final InternalScanner s,
       final List<Result> results, final int limit, boolean hasMore)
       throws IOException {
-    return execOperationWithResult(hasMore,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
-      }
-    });
+    return execOperationWithResult(hasMore, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.postScannerNext(this, s, results, limit, getResult());
+          }
+        });
   }
 
   /**
@@ -1218,14 +1229,13 @@ public class RegionCoprocessorHost
       throws IOException {
     // short circuit for performance
     if (!hasCustomPostScannerFilterRow) return true;
-    return execOperationWithResult(true,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postScannerFilterRow(ctx, s, curRowCell, getResult()));
-      }
-    });
+    return execOperationWithResult(true, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.postScannerFilterRow(this, s, curRowCell, getResult());
+          }
+        });
   }
 
   /**
@@ -1234,11 +1244,10 @@ public class RegionCoprocessorHost
    * @exception IOException Exception
    */
   public boolean preScannerClose(final InternalScanner s) throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preScannerClose(ctx, s);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preScannerClose(this, s);
       }
     });
   }
@@ -1247,11 +1256,10 @@ public class RegionCoprocessorHost
    * @exception IOException Exception
    */
   public void postScannerClose(final InternalScanner s) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postScannerClose(ctx, s);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postScannerClose(this, s);
       }
     });
   }
@@ -1262,11 +1270,10 @@ public class RegionCoprocessorHost
    * @throws IOException Exception
    */
   public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-        throws IOException {
-        oserver.preReplayWALs(ctx, info, edits);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preReplayWALs(this, info, edits);
       }
     });
   }
@@ -1277,11 +1284,10 @@ public class RegionCoprocessorHost
    * @throws IOException Exception
    */
   public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-        throws IOException {
-        oserver.postReplayWALs(ctx, info, edits);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postReplayWALs(this, info, edits);
       }
     });
   }
@@ -1295,11 +1301,10 @@ public class RegionCoprocessorHost
    */
   public boolean preWALRestore(final RegionInfo info, final WALKey logKey,
       final WALEdit logEdit) throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preWALRestore(ctx, info, logKey, logEdit);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preWALRestore(this, info, logKey, logEdit);
       }
     });
   }
@@ -1312,11 +1317,10 @@ public class RegionCoprocessorHost
    */
   public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
       throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postWALRestore(ctx, info, logKey, logEdit);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postWALRestore(this, info, logKey, logEdit);
       }
     });
   }
@@ -1327,31 +1331,28 @@ public class RegionCoprocessorHost
    * @throws IOException
    */
   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preBulkLoadHFile(ctx, familyPaths);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preBulkLoadHFile(this, familyPaths);
       }
     });
   }
 
   public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
       throws IOException {
-    return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.preCommitStoreFile(ctx, family, pairs);
+      public void call(RegionObserver observer) throws IOException {
+        observer.preCommitStoreFile(this, family, pairs);
       }
     });
   }
   public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postCommitStoreFile(ctx, family, srcPath, dstPath);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postCommitStoreFile(this, family, srcPath, dstPath);
       }
     });
   }
@@ -1365,32 +1366,29 @@ public class RegionCoprocessorHost
    */
   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
       Map<byte[], List<Path>> map, boolean hasLoaded) throws IOException {
-    return execOperationWithResult(hasLoaded,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postBulkLoadHFile(ctx, familyPaths, map, getResult()));
-      }
-    });
+    return execOperationWithResult(hasLoaded, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+          @Override
+          public Boolean call(RegionObserver observer) throws IOException {
+            return observer.postBulkLoadHFile(this, familyPaths, map, getResult());
+          }
+        });
   }
 
   public void postStartRegionOperation(final Operation op) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postStartRegionOperation(ctx, op);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postStartRegionOperation(this, op);
       }
     });
   }
 
   public void postCloseRegionOperation(final Operation op) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
       @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postCloseRegionOperation(ctx, op);
+      public void call(RegionObserver observer) throws IOException {
+        observer.postCloseRegionOperation(this, op);
       }
     });
   }
@@ -1409,14 +1407,14 @@ public class RegionCoprocessorHost
   public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
       final Reference r) throws IOException {
-    return execOperationWithResult(null,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
-      }
-    });
+    return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter) {
+          @Override
+          public StoreFileReader call(RegionObserver observer) throws IOException {
+            return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
+                getResult());
+          }
+        });
   }
 
   /**
@@ -1433,192 +1431,77 @@ public class RegionCoprocessorHost
   public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
       final Reference r, final StoreFileReader reader) throws IOException {
-    return execOperationWithResult(reader,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
-      }
-    });
+    return execOperationWithResult(reader, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter) {
+          @Override
+          public StoreFileReader call(RegionObserver observer) throws IOException {
+            return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
+                getResult());
+          }
+        });
   }
 
   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
       final Cell oldCell, Cell newCell) throws IOException {
-    return execOperationWithResult(newCell,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
-      }
-    });
+    return execOperationWithResult(newCell, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, Cell>(regionObserverGetter) {
+          @Override
+          public Cell call(RegionObserver observer) throws IOException {
+            return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult());
+          }
+        });
   }
 
   public Message preEndpointInvocation(final Service service, final String methodName,
       Message request) throws IOException {
-    return execOperationWithResult(request,
-        coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
-      @Override
-      public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
-      }
-    });
+    return execOperationWithResult(request, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter) {
+          @Override
+          public Message call(EndpointObserver observer) throws IOException {
+            return observer.preEndpointInvocation(this, service, methodName, getResult());
+          }
+        });
   }
 
   public void postEndpointInvocation(final Service service, final String methodName,
       final Message request, final Message.Builder responseBuilder) throws IOException {
-    execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
-      @Override
-      public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
-      }
-    });
+    execOperation(coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
+          @Override
+          public void call(EndpointObserver observer) throws IOException {
+            observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
+          }
+        });
   }
 
   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
-    return execOperationWithResult(tracker,
-        coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
-      @Override
-      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
-          throws IOException {
-        setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
-      }
-    });
-  }
-
-  private static abstract class CoprocessorOperation
-      extends ObserverContext<RegionCoprocessorEnvironment> {
-    public CoprocessorOperation() {
-      this(RpcServer.getRequestUser());
-    }
-
-    public CoprocessorOperation(User user) {
-      super(user);
-    }
-
-    public abstract void call(Coprocessor observer,
-        ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
-    public abstract boolean hasCall(Coprocessor observer);
-    public void postEnvCall(RegionEnvironment env) { }
-  }
-
-  private static abstract class RegionOperation extends CoprocessorOperation {
-    public RegionOperation() {
-    }
-
-    public RegionOperation(User user) {
-      super(user);
-    }
-
-    public abstract void call(RegionObserver observer,
-        ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
-
-    public boolean hasCall(Coprocessor observer) {
-      return observer instanceof RegionObserver;
-    }
-
-    public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
-        throws IOException {
-      call((RegionObserver)observer, ctx);
-    }
-  }
-
-  private static abstract class RegionOperationWithResult<T> extends RegionOperation {
-    public RegionOperationWithResult() {
-    }
-
-    public RegionOperationWithResult(User user) {
-      super (user);
-    }
-
-    private T result = null;
-    public void setResult(final T result) { this.result = result; }
-    public T getResult() { return this.result; }
-  }
-
-  private static abstract class EndpointOperation extends CoprocessorOperation {
-    public abstract void call(EndpointObserver observer,
-        ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
-
-    public boolean hasCall(Coprocessor observer) {
-      return observer instanceof EndpointObserver;
-    }
-
-    public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
-        throws IOException {
-      call((EndpointObserver)observer, ctx);
-    }
-  }
-
-  private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
-    private T result = null;
-    public void setResult(final T result) { this.result = result; }
-    public T getResult() { return this.result; }
-  }
-
-  private boolean execOperation(final CoprocessorOperation ctx)
-      throws IOException {
-    return execOperation(true, ctx);
-  }
-
-  private <T> T execOperationWithResult(final T defaultValue,
-      final RegionOperationWithResult<T> ctx) throws IOException {
-    if (ctx == null) return defaultValue;
-    ctx.setResult(defaultValue);
-    execOperation(true, ctx);
-    return ctx.getResult();
-  }
-
-  private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
-      final RegionOperationWithResult<T> ctx) throws IOException {
-    boolean bypass = false;
-    T result = defaultValue;
-    if (ctx != null) {
-      ctx.setResult(defaultValue);
-      bypass = execOperation(true, ctx);
-      result = ctx.getResult();
-    }
-    return bypass == ifBypass ? result : null;
+    return execOperationWithResult(tracker, coprocEnvironments.isEmpty() ? null :
+        new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter) {
+          @Override
+          public DeleteTracker call(RegionObserver observer) throws IOException {
+            return observer.postInstantiateDeleteTracker(this, getResult());
+          }
+        });
   }
 
-  private <T> T execOperationWithResult(final T defaultValue,
-      final EndpointOperationWithResult<T> ctx) throws IOException {
-    if (ctx == null) return defaultValue;
-    ctx.setResult(defaultValue);
-    execOperation(true, ctx);
-    return ctx.getResult();
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // BulkLoadObserver hooks
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  public void prePrepareBulkLoad(User user) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null :
+        new BulkLoadObserverOperation(user) {
+          @Override protected void call(BulkLoadObserver observer) throws IOException {
+            observer.prePrepareBulkLoad(this);
+          }
+        });
   }
 
-  private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
-      throws IOException {
-    boolean bypass = false;
-    List<RegionEnvironment> envs = coprocessors.get();
-    for (int i = 0; i < envs.size(); i++) {
-      RegionEnvironment env = envs.get(i);
-      Coprocessor observer = env.getInstance();
-      if (ctx.hasCall(observer)) {
-        ctx.prepare(env);
-        Thread currentThread = Thread.currentThread();
-        ClassLoader cl = currentThread.getContextClassLoader();
-        try {
-          currentThread.setContextClassLoader(env.getClassLoader());
-          ctx.call(observer, ctx);
-        } catch (Throwable e) {
-          handleCoprocessorThrowable(env, e);
-        } finally {
-          currentThread.setContextClassLoader(cl);
-        }
-        bypass |= ctx.shouldBypass();
-        if (earlyExit && ctx.shouldComplete()) {
-          break;
-        }
-      }
-
-      ctx.postEnvCall(env);
-    }
-    return bypass;
+  public void preCleanupBulkLoad(User user) throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null :
+        new BulkLoadObserverOperation(user) {
+          @Override protected void call(BulkLoadObserver observer) throws IOException {
+            observer.preCleanupBulkLoad(this);
+          }
+        });
   }
 }