You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2010/12/21 21:39:37 UTC

svn commit: r1051639 [2/3] - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/coprocessor/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/resources/ src/test/java/org/apache/hadoop/hb...

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java?rev=1051639&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java Tue Dec 21 20:39:26 2010
@@ -0,0 +1,553 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.coprocessor.*;
+
+import java.io.IOException;
+
+/**
+ * Provides the coprocessor framework and environment for master oriented
+ * operations.  {@link HMaster} interacts with the loaded coprocessors
+ * through this class.
+ */
+public class MasterCoprocessorHost
+    extends CoprocessorHost<MasterCoprocessorHost.MasterEnvironment> {
+
+  /**
+   * Coprocessor environment extension providing access to master related
+   * services.
+   */
+  static class MasterEnvironment extends CoprocessorHost.Environment
+      implements MasterCoprocessorEnvironment {
+    private MasterServices masterServices;
+
+    public MasterEnvironment(Class<?> implClass, Coprocessor impl,
+        Coprocessor.Priority priority, MasterServices services) {
+      super(impl, priority);
+      this.masterServices = services;
+    }
+
+    public MasterServices getMasterServices() {
+      return masterServices;
+    }
+  }
+
+  private MasterServices masterServices;
+
+  MasterCoprocessorHost(final MasterServices services, final Configuration conf) {
+    this.masterServices = services;
+
+    loadSystemCoprocessors(conf, MASTER_COPROCESSOR_CONF_KEY);
+  }
+
+  @Override
+  public MasterEnvironment createEnvironment(Class<?> implClass,
+      Coprocessor instance, Coprocessor.Priority priority) {
+    return new MasterEnvironment(implClass, instance, priority, masterServices);
+  }
+
+  /* Implementation of hooks for invoking MasterObservers */
+  void preCreateTable(HTableDescriptor desc, byte[][] splitKeys)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preCreateTable(env, desc, splitKeys);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postCreateTable(HRegionInfo[] regions, boolean sync) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postCreateTable(env, regions, sync);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preDeleteTable(byte[] tableName) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preDeleteTable(env, tableName);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postDeleteTable(byte[] tableName) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postDeleteTable(env, tableName);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preModifyTable(final byte[] tableName, HTableDescriptor htd)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preModifyTable(env, tableName, htd);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postModifyTable(final byte[] tableName, HTableDescriptor htd)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postModifyTable(env, tableName, htd);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preAddColumn(byte [] tableName, HColumnDescriptor column)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preAddColumn(env, tableName, column);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postAddColumn(byte [] tableName, HColumnDescriptor column)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postAddColumn(env, tableName, column);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preModifyColumn(byte [] tableName, HColumnDescriptor descriptor)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preModifyColumn(
+              env, tableName, descriptor);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postModifyColumn(byte [] tableName, HColumnDescriptor descriptor)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postModifyColumn(
+              env, tableName, descriptor);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preDeleteColumn(final byte [] tableName, final byte [] c)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preDeleteColumn(env, tableName, c);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postDeleteColumn(final byte [] tableName, final byte [] c)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postDeleteColumn(env, tableName, c);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preEnableTable(final byte [] tableName) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preEnableTable(env, tableName);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postEnableTable(final byte [] tableName) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postEnableTable(env, tableName);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preDisableTable(final byte [] tableName) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preDisableTable(env, tableName);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postDisableTable(final byte [] tableName) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postDisableTable(env, tableName);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
+      throws UnknownRegionException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preMove(
+              env, region, srcServer, destServer);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
+      throws UnknownRegionException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postMove(
+              env, region, srcServer, destServer);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  boolean preAssign(final byte [] regionName, final boolean force)
+      throws IOException {
+    boolean bypass = false;
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preAssign(env, regionName, force);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+    return bypass;
+  }
+
+  void postAssign(final HRegionInfo regionInfo) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postAssign(env, regionInfo);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  boolean preUnassign(final byte [] regionName, final boolean force)
+      throws IOException {
+    boolean bypass = false;
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preUnassign(
+              env, regionName, force);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+    return bypass;
+  }
+
+  void postUnassign(final HRegionInfo regionInfo, final boolean force)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postUnassign(
+              env, regionInfo, force);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  boolean preBalance() throws IOException {
+    try {
+      boolean bypass = false;
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preBalance(env);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void postBalance() throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postBalance(env);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  boolean preBalanceSwitch(final boolean b) throws IOException {
+    boolean balance = b;
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          balance = ((MasterObserver)env.getInstance()).preBalanceSwitch(
+              env, balance);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+    return balance;
+  }
+
+  void postBalanceSwitch(final boolean oldValue, final boolean newValue)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).postBalanceSwitch(
+              env, oldValue, newValue);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preShutdown() throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preShutdown(env);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  void preStopMaster() throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (MasterEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof MasterObserver) {
+          ((MasterObserver)env.getInstance()).preStopMaster(env);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java Tue Dec 21 20:39:26 2010
@@ -23,7 +23,9 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 /**
  * Services Master supplies
@@ -56,4 +58,15 @@ public interface MasterServices {
    * @throws TableNotFoundException 
    */
   public void checkTableModifiable(final byte [] tableName) throws IOException;
+
+  /**
+   * @return Implementation of {@link org.apache.hadoop.hbase.catalog.CatalogTracker} or null.
+   */
+  public CatalogTracker getCatalogTracker();
+
+  /*
+   * @return Implementation of {@link ZooKeeperWatcher} or null.
+   */
+  public ZooKeeperWatcher getZooKeeperWatcher();
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Dec 21 20:39:26 2010
@@ -248,7 +248,7 @@ public class HRegion implements HeapSize
       new ReadWriteConsistencyControl();
 
   // Coprocessor host
-  private CoprocessorHost coprocessorHost;
+  private RegionCoprocessorHost coprocessorHost;
 
   /**
    * Name of the region info file that resides just under the region directory.
@@ -319,7 +319,7 @@ public class HRegion implements HeapSize
     // don't initialize coprocessors if not running within a regionserver
     // TODO: revisit if coprocessors should load in other cases
     if (rsServices != null) {
-      this.coprocessorHost = new CoprocessorHost(this, rsServices, conf);
+      this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
     }
     if (LOG.isDebugEnabled()) {
       // Write out region name as string and its encoded name.
@@ -3557,12 +3557,12 @@ public class HRegion implements HeapSize
   }
 
   /** @return the coprocessor host */
-  public CoprocessorHost getCoprocessorHost() {
+  public RegionCoprocessorHost getCoprocessorHost() {
     return coprocessorHost;
   }
 
   /** @param coprocessorHost the new coprocessor host */
-  public void setCoprocessorHost(final CoprocessorHost coprocessorHost) {
+  public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
     this.coprocessorHost = coprocessorHost;
   }
 

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1051639&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Tue Dec 21 20:39:26 2010
@@ -0,0 +1,1007 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.coprocessor.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Implements the coprocessor environment and runtime support for coprocessors
+ * loaded within a {@link HRegion}.
+ */
+public class RegionCoprocessorHost
+    extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
+
+  private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
+
+  /**
+   * Encapsulation of the environment of each coprocessor
+   */
+  static class RegionEnvironment extends CoprocessorHost.Environment
+      implements RegionCoprocessorEnvironment {
+
+    private HRegion region;
+    private RegionServerServices rsServices;
+
+    /**
+     * Constructor
+     * @param impl the coprocessor instance
+     * @param priority chaining priority
+     */
+    public RegionEnvironment(final Coprocessor impl,
+        Coprocessor.Priority priority, final HRegion region,
+        final RegionServerServices services) {
+      super(impl, priority);
+      this.region = region;
+      this.rsServices = services;
+    }
+
+    /** @return the region */
+    @Override
+    public HRegion getRegion() {
+      return region;
+    }
+
+    /** @return reference to the region server services */
+    @Override
+    public RegionServerServices getRegionServerServices() {
+      return rsServices;
+    }
+
+    public void shutdown() {
+      super.shutdown();
+    }
+  }
+
+  static final Pattern attrSpecMatch = Pattern.compile("(.+):(.+):(.+)");
+
+  /** The region server services */
+  RegionServerServices rsServices;
+  /** The region */
+  HRegion region;
+
+  /**
+   * Constructor
+   * @param region the region
+   * @param rsServices interface to available region server functionality
+   * @param conf the configuration
+   */
+  public RegionCoprocessorHost(final HRegion region,
+      final RegionServerServices rsServices, final Configuration conf) {
+    this.rsServices = rsServices;
+    this.region = region;
+    this.pathPrefix = this.region.getRegionNameAsString().replace(',', '_');
+
+    // load system default cp's from configuration.
+    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
+
+    // load Coprocessor From HDFS
+    loadTableCoprocessors();
+  }
+
+  void loadTableCoprocessors () {
+    // scan the table attributes for coprocessor load specifications
+    // initialize the coprocessors
+    for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
+        region.getTableDesc().getValues().entrySet()) {
+      String key = Bytes.toString(e.getKey().get());
+      if (key.startsWith("COPROCESSOR")) {
+        // found one
+        try {
+          String spec = Bytes.toString(e.getValue().get());
+          Matcher matcher = attrSpecMatch.matcher(spec);
+          if (matcher.matches()) {
+            Path path = new Path(matcher.group(1));
+            String className = matcher.group(2);
+            Coprocessor.Priority priority =
+              Coprocessor.Priority.valueOf(matcher.group(3));
+            load(path, className, priority);
+            LOG.info("Load coprocessor " + className + " from HTD of " +
+                Bytes.toString(region.getTableDesc().getName()) +
+                " successfully.");
+          } else {
+            LOG.warn("attribute '" + key + "' has invalid coprocessor spec");
+          }
+        } catch (IOException ex) {
+            LOG.warn(StringUtils.stringifyException(ex));
+        }
+      }
+    }
+  }
+
+  @Override
+  public RegionEnvironment createEnvironment(
+      Class<?> implClass, Coprocessor instance, Coprocessor.Priority priority) {
+    // Check if it's an Endpoint.
+    // Due to current dynamic protocol design, Endpoint
+    // uses a different way to be registered and executed.
+    // It uses a visitor pattern to invoke registered Endpoint
+    // method.
+    for (Class c : implClass.getInterfaces()) {
+      if (CoprocessorProtocol.class.isAssignableFrom(c)) {
+        region.registerProtocol(c, (CoprocessorProtocol)instance);
+        break;
+      }
+    }
+
+    return new RegionEnvironment(instance, priority, region, rsServices);
+  }
+
+  /**
+   * Invoked before a region open
+   */
+  public void preOpen() {
+    loadTableCoprocessors();
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preOpen(env);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Invoked after a region open
+   */
+  public void postOpen() {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postOpen(env);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Invoked before a region is closed
+   * @param abortRequested true if the server is aborting
+   */
+  public void preClose(boolean abortRequested) {
+    try {
+      coprocessorLock.writeLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preClose(env, abortRequested);
+        }
+      }
+    } finally {
+      coprocessorLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Invoked after a region is closed
+   * @param abortRequested true if the server is aborting
+   */
+  public void postClose(boolean abortRequested) {
+    try {
+      coprocessorLock.writeLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postClose(env, abortRequested);
+        }
+        shutdown(env);
+      }
+    } finally {
+      coprocessorLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Invoked before a region is compacted.
+   * @param willSplit true if the compaction is about to trigger a split
+   */
+  public void preCompact(boolean willSplit) {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preCompact(env, willSplit);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Invoked after a region is compacted.
+   * @param willSplit true if the compaction is about to trigger a split
+   */
+  public void postCompact(boolean willSplit) {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postCompact(env, willSplit);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Invoked before a memstore flush
+   */
+  public void preFlush() {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preFlush(env);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Invoked after a memstore flush
+   */
+  public void postFlush() {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postFlush(env);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Invoked just before a split
+   */
+  public void preSplit() {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preSplit(env);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Invoked just after a split
+   * @param l the new left-hand daughter region
+   * @param r the new right-hand daughter region
+   */
+  public void postSplit(HRegion l, HRegion r) {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postSplit(env, l, r);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  // RegionObserver support
+
+  /**
+   * @param row the row key
+   * @param family the family
+   * @param result the result set from the region
+   * @return true if default processing should be bypassed
+   * @exception IOException Exception
+   */
+  public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
+      final Result result) throws IOException {
+    try {
+      boolean bypass = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preGetClosestRowBefore(env, row, family,
+            result);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param row the row key
+   * @param family the family
+   * @param result the result set from the region
+   * @exception IOException Exception
+   */
+  public void postGetClosestRowBefore(final byte[] row, final byte[] family,
+      final Result result) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postGetClosestRowBefore(env, row, family,
+            result);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param get the Get request
+   * @return true if default processing should be bypassed
+   * @exception IOException Exception
+   */
+  public boolean preGet(final Get get, final List<KeyValue> results)
+      throws IOException {
+    try {
+      boolean bypass = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preGet(env, get, results);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param get the Get request
+   * @param results the result set
+   * @return the possibly transformed result set to use
+   * @exception IOException Exception
+   */
+  public void postGet(final Get get, final List<KeyValue> results)
+  throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postGet(env, get, results);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param get the Get request
+   * @return true or false to return to client if bypassing normal operation,
+   * or null otherwise
+   * @exception IOException Exception
+   */
+  public Boolean preExists(final Get get) throws IOException {
+    try {
+      boolean bypass = false;
+      boolean exists = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          exists = ((RegionObserver)env.getInstance()).preExists(env, get, exists);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+        }
+      }
+      }
+      return bypass ? exists : null;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param get the Get request
+   * @param exists the result returned by the region server
+   * @return the result to return to the client
+   * @exception IOException Exception
+   */
+  public boolean postExists(final Get get, boolean exists)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          exists = ((RegionObserver)env.getInstance()).postExists(env, get, exists);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return exists;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param familyMap map of family to edits for the given family.
+   * @param writeToWAL true if the change should be written to the WAL
+   * @return true if default processing should be bypassed
+   * @exception IOException Exception
+   */
+  public boolean prePut(final Map<byte[], List<KeyValue>> familyMap,
+      final boolean writeToWAL) throws IOException {
+    try {
+      boolean bypass = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).prePut(env, familyMap, writeToWAL);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param familyMap map of family to edits for the given family.
+   * @param writeToWAL true if the change should be written to the WAL
+   * @exception IOException Exception
+   */
+  public void postPut(final Map<byte[], List<KeyValue>> familyMap,
+      final boolean writeToWAL) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postPut(env, familyMap, writeToWAL);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param familyMap map of family to edits for the given family.
+   * @param writeToWAL true if the change should be written to the WAL
+   * @return true if default processing should be bypassed
+   * @exception IOException Exception
+   */
+  public boolean preDelete(final Map<byte[], List<KeyValue>> familyMap,
+      final boolean writeToWAL) throws IOException {
+    try {
+      boolean bypass = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preDelete(env, familyMap, writeToWAL);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param familyMap map of family to edits for the given family.
+   * @param writeToWAL true if the change should be written to the WAL
+   * @exception IOException Exception
+   */
+  public void postDelete(final Map<byte[], List<KeyValue>> familyMap,
+      final boolean writeToWAL) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postDelete(env, familyMap, writeToWAL);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param row row to check
+   * @param family column family
+   * @param qualifier column qualifier
+   * @param value the expected value
+   * @param put data to put if check succeeds
+   * @return true or false to return to client if default processing should
+   * be bypassed, or null otherwise
+   * @throws IOException e
+   */
+  public Boolean preCheckAndPut(final byte [] row, final byte [] family,
+      final byte [] qualifier, final byte [] value, Put put)
+    throws IOException
+  {
+    try {
+      boolean bypass = false;
+      boolean result = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          result = ((RegionObserver)env.getInstance()).preCheckAndPut(env, row, family,
+            qualifier, value, put, result);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass ? result : null;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param row row to check
+   * @param family column family
+   * @param qualifier column qualifier
+   * @param value the expected value
+   * @param put data to put if check succeeds
+   * @throws IOException e
+   */
+  public boolean postCheckAndPut(final byte [] row, final byte [] family,
+      final byte [] qualifier, final byte [] value, final Put put,
+      boolean result)
+    throws IOException
+  {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          result = ((RegionObserver)env.getInstance()).postCheckAndPut(env, row,
+            family, qualifier, value, put, result);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return result;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param row row to check
+   * @param family column family
+   * @param qualifier column qualifier
+   * @param value the expected value
+   * @param delete delete to commit if check succeeds
+   * @return true or false to return to client if default processing should
+   * be bypassed, or null otherwise
+   * @throws IOException e
+   */
+  public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
+      final byte [] qualifier, final byte [] value, Delete delete)
+    throws IOException
+  {
+    try {
+      boolean bypass = false;
+      boolean result = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          result = ((RegionObserver)env.getInstance()).preCheckAndDelete(env, row,
+            family, qualifier, value, delete, result);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass ? result : null;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param row row to check
+   * @param family column family
+   * @param qualifier column qualifier
+   * @param value the expected value
+   * @param delete delete to commit if check succeeds
+   * @throws IOException e
+   */
+  public boolean postCheckAndDelete(final byte [] row, final byte [] family,
+      final byte [] qualifier, final byte [] value, final Delete delete,
+      boolean result)
+    throws IOException
+  {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          result = ((RegionObserver)env.getInstance()).postCheckAndDelete(env, row,
+            family, qualifier, value, delete, result);
+          if (env.shouldComplete()) {
+            break;
+        }
+      }
+      }
+      return result;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param row row to check
+   * @param family column family
+   * @param qualifier column qualifier
+   * @param amount long amount to increment
+   * @param writeToWAL true if the change should be written to the WAL
+   * @return return value for client if default operation should be bypassed,
+   * or null otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public Long preIncrementColumnValue(final byte [] row, final byte [] family,
+      final byte [] qualifier, long amount, final boolean writeToWAL)
+      throws IOException {
+    try {
+      boolean bypass = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          amount = ((RegionObserver)env.getInstance()).preIncrementColumnValue(env,
+              row, family, qualifier, amount, writeToWAL);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass ? amount : null;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param row row to check
+   * @param family column family
+   * @param qualifier column qualifier
+   * @param amount long amount to increment
+   * @param writeToWAL true if the change should be written to the WAL
+   * @param result the result returned by incrementColumnValue
+   * @return the result to return to the client
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public long postIncrementColumnValue(final byte [] row, final byte [] family,
+      final byte [] qualifier, final long amount, final boolean writeToWAL,
+      long result) throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          result = ((RegionObserver)env.getInstance()).postIncrementColumnValue(env,
+              row, family, qualifier, amount, writeToWAL, result);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+    return result;
+  }
+
+  /**
+   * @param increment increment object
+   * @return result to return to client if default operation should be
+   * bypassed, null otherwise
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public Result preIncrement(Increment increment)
+      throws IOException {
+    try {
+      boolean bypass = false;
+      Result result = new Result();
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preIncrement(env, increment, result);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass ? result : null;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param increment increment object
+   * @param result the result returned by incrementColumnValue
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  public void postIncrement(final Increment increment, Result result)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postIncrement(env, increment, result);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param scan the Scan specification
+   * @return scanner id to return to client if default operation should be
+   * bypassed, false otherwise
+   * @exception IOException Exception
+   */
+  public InternalScanner preScannerOpen(Scan scan) throws IOException {
+    try {
+      boolean bypass = false;
+      InternalScanner s = null;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          s = ((RegionObserver)env.getInstance()).preScannerOpen(env, scan, s);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass ? s : null;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param scan the Scan specification
+   * @param s the scanner
+   * @return the scanner instance to use
+   * @exception IOException Exception
+   */
+  public InternalScanner postScannerOpen(final Scan scan, InternalScanner s)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          s = ((RegionObserver)env.getInstance()).postScannerOpen(env, scan, s);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return s;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param s the scanner
+   * @param results the result set returned by the region server
+   * @param limit the maximum number of results to return
+   * @return 'has next' indication to client if bypassing default behavior, or
+   * null otherwise
+   * @exception IOException Exception
+   */
+  public Boolean preScannerNext(final InternalScanner s,
+      final List<KeyValue> results, int limit) throws IOException {
+    try {
+      boolean bypass = false;
+      boolean hasNext = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          hasNext = ((RegionObserver)env.getInstance()).preScannerNext(env, s, results,
+            limit, hasNext);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass ? hasNext : null;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param s the scanner
+   * @param results the result set returned by the region server
+   * @param limit the maximum number of results to return
+   * @param hasMore
+   * @return 'has more' indication to give to client
+   * @exception IOException Exception
+   */
+  public boolean postScannerNext(final InternalScanner s,
+      final List<KeyValue> results, final int limit, boolean hasMore)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          hasMore = ((RegionObserver)env.getInstance()).postScannerNext(env, s,
+            results, limit, hasMore);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return hasMore;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param s the scanner
+   * @return true if default behavior should be bypassed, false otherwise
+   * @exception IOException Exception
+   */
+  public boolean preScannerClose(final InternalScanner s)
+      throws IOException {
+    try {
+      boolean bypass = false;
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).preScannerClose(env, s);
+          bypass |= env.shouldBypass();
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+      return bypass;
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @param s the scanner
+   * @exception IOException Exception
+   */
+  public void postScannerClose(final InternalScanner s)
+      throws IOException {
+    try {
+      coprocessorLock.readLock().lock();
+      for (RegionEnvironment env: coprocessors) {
+        if (env.getInstance() instanceof RegionObserver) {
+          ((RegionObserver)env.getInstance()).postScannerClose(env, s);
+          if (env.shouldComplete()) {
+            break;
+          }
+        }
+      }
+    } finally {
+      coprocessorLock.readLock().unlock();
+    }
+  }
+}

Modified: hbase/trunk/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/resources/hbase-default.xml?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/main/resources/hbase-default.xml (original)
+++ hbase/trunk/src/main/resources/hbase-default.xml Tue Dec 21 20:39:26 2010
@@ -480,17 +480,28 @@
   </property>
 
   <property>
-    <name>hbase.coprocessor.default.classes</name>
+    <name>hbase.coprocessor.region.classes</name>
     <value></value>
     <description>A comma-separated list of Coprocessors that are loaded by
-    default. For any override coprocessor method, these classes will be called
-    in order. After implement your own
-    Coprocessor, just put it in HBase's classpath and add the fully
-    qualified class name here.
+    default on all tables. For any override coprocessor method, these classes
+    will be called in order. After implementing your own Coprocessor, just put
+    it in HBase's classpath and add the fully qualified class name here.
     A coprocessor can also be loaded on demand by setting HTableDescriptor.
     </description>
   </property>
 
+  <property>
+    <name>hbase.coprocessor.master.classes</name>
+    <value></value>
+    <description>A comma-separated list of
+    org.apache.hadoop.hbase.coprocessor.MasterObserver coprocessors that are
+    loaded by default on the active HMaster process. For any implemented
+    coprocessor methods, the listed classes will be called in order. After
+    implementing your own MasterObserver, just put it in HBase's classpath
+    and add the fully qualified class name here.
+    </description>
+  </property>
+
   <!--
   The following three properties are used together to create the list of
   host:peer_port:leader_port quorum servers for ZooKeeper.

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java Tue Dec 21 20:39:26 2010
@@ -43,7 +43,8 @@ implements ColumnAggregationProtocol {
     scan.addColumn(family, qualifier);
     int sumResult = 0;
 
-    InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
+    InternalScanner scanner = ((RegionCoprocessorEnvironment)getEnvironment())
+        .getRegion().getScanner(scan);
     try {
       List<KeyValue> curVals = new ArrayList<KeyValue>();
       boolean done = false;

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Tue Dec 21 20:39:26 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -68,12 +69,12 @@ public class SimpleRegionObserver extend
   boolean hadPostIncrement = false;
 
   @Override
-  public void preOpen(CoprocessorEnvironment e) {
+  public void preOpen(RegionCoprocessorEnvironment e) {
     hadPreOpen = true;
   }
 
   @Override
-  public void postOpen(CoprocessorEnvironment e) {
+  public void postOpen(RegionCoprocessorEnvironment e) {
     hadPostOpen = true;
   }
 
@@ -82,12 +83,12 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void preClose(CoprocessorEnvironment e, boolean abortRequested) {
+  public void preClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
     hadPreClose = true;
   }
 
   @Override
-  public void postClose(CoprocessorEnvironment e, boolean abortRequested) {
+  public void postClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
     hadPostClose = true;
   }
 
@@ -96,12 +97,12 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void preFlush(CoprocessorEnvironment e) {
+  public void preFlush(RegionCoprocessorEnvironment e) {
     hadPreFlush = true;
   }
 
   @Override
-  public void postFlush(CoprocessorEnvironment e) {
+  public void postFlush(RegionCoprocessorEnvironment e) {
     hadPostFlush = true;
   }
 
@@ -110,12 +111,12 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void preSplit(CoprocessorEnvironment e) {
+  public void preSplit(RegionCoprocessorEnvironment e) {
     hadPreSplit = true;
   }
 
   @Override
-  public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) {
+  public void postSplit(RegionCoprocessorEnvironment e, HRegion l, HRegion r) {
     hadPostSplit = true;
   }
 
@@ -124,12 +125,12 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void preCompact(CoprocessorEnvironment e, boolean willSplit) {
+  public void preCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
     hadPreCompact = true;
   }
 
   @Override
-  public void postCompact(CoprocessorEnvironment e, boolean willSplit) {
+  public void postCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
     hadPostCompact = true;
   }
 
@@ -138,7 +139,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void preGet(final CoprocessorEnvironment e, final Get get,
+  public void preGet(final RegionCoprocessorEnvironment e, final Get get,
       final List<KeyValue> results) throws IOException {
     assertNotNull(e);
     assertNotNull(e.getRegion());
@@ -151,7 +152,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void postGet(final CoprocessorEnvironment e, final Get get,
+  public void postGet(final RegionCoprocessorEnvironment e, final Get get,
       final List<KeyValue> results) {
     assertNotNull(e);
     assertNotNull(e.getRegion());
@@ -181,7 +182,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void prePut(final CoprocessorEnvironment e, final Map<byte[],
+  public void prePut(final RegionCoprocessorEnvironment e, final Map<byte[],
       List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
     assertNotNull(e);
     assertNotNull(e.getRegion());
@@ -208,7 +209,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+  public void postPut(final RegionCoprocessorEnvironment e, final Map<byte[],
       List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
     assertNotNull(e);
     assertNotNull(e.getRegion());
@@ -235,7 +236,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
+  public void preDelete(final RegionCoprocessorEnvironment e, final Map<byte[],
       List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
     assertNotNull(e);
     assertNotNull(e.getRegion());
@@ -247,7 +248,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void postDelete(final CoprocessorEnvironment e, final Map<byte[],
+  public void postDelete(final RegionCoprocessorEnvironment e, final Map<byte[],
       List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
     assertNotNull(e);
     assertNotNull(e.getRegion());
@@ -260,7 +261,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void preGetClosestRowBefore(final CoprocessorEnvironment e,
+  public void preGetClosestRowBefore(final RegionCoprocessorEnvironment e,
       final byte[] row, final byte[] family, final Result result)
       throws IOException {
     assertNotNull(e);
@@ -274,7 +275,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void postGetClosestRowBefore(final CoprocessorEnvironment e,
+  public void postGetClosestRowBefore(final RegionCoprocessorEnvironment e,
       final byte[] row, final byte[] family, final Result result)
       throws IOException {
     assertNotNull(e);
@@ -288,7 +289,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void preIncrement(final CoprocessorEnvironment e,
+  public void preIncrement(final RegionCoprocessorEnvironment e,
       final Increment increment, final Result result) throws IOException {
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE_2)) {
@@ -297,7 +298,7 @@ public class SimpleRegionObserver extend
   }
 
   @Override
-  public void postIncrement(final CoprocessorEnvironment e,
+  public void postIncrement(final RegionCoprocessorEnvironment e,
       final Increment increment, final Result result) throws IOException {
     if (Arrays.equals(e.getRegion().getTableDesc().getName(),
         TestRegionObserverInterface.TEST_TABLE_2)) {

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java Tue Dec 21 20:39:26 2010
@@ -53,7 +53,7 @@ public class TestCoprocessorEndpoint {
   public static void setupBeforeClass() throws Exception {
     // set configure to indicate which cp should be loaded
     Configuration conf = util.getConfiguration();
-    conf.set("hbase.coprocessor.default.classes",
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
         "org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint");
 
     util.startMiniCluster(2);

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Tue Dec 21 20:39:26 2010
@@ -30,10 +30,8 @@ import org.apache.hadoop.hbase.HBaseTest
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.coprocessor.Coprocessor;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
-import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.SplitTransaction;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -75,43 +73,43 @@ public class TestCoprocessorInterface ex
     }
 
     @Override
-    public void preOpen(CoprocessorEnvironment e) {
+    public void preOpen(RegionCoprocessorEnvironment e) {
       preOpenCalled = true;
     }
     @Override
-    public void postOpen(CoprocessorEnvironment e) {
+    public void postOpen(RegionCoprocessorEnvironment e) {
       postOpenCalled = true;
     }
     @Override
-    public void preClose(CoprocessorEnvironment e, boolean abortRequested) {
+    public void preClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
       preCloseCalled = true;
     }
     @Override
-    public void postClose(CoprocessorEnvironment e, boolean abortRequested) {
+    public void postClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
       postCloseCalled = true;
     }
     @Override
-    public void preCompact(CoprocessorEnvironment e, boolean willSplit) {
+    public void preCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
       preCompactCalled = true;
     }
     @Override
-    public void postCompact(CoprocessorEnvironment e, boolean willSplit) {
+    public void postCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
       postCompactCalled = true;
     }
     @Override
-    public void preFlush(CoprocessorEnvironment e) {
+    public void preFlush(RegionCoprocessorEnvironment e) {
       preFlushCalled = true;
     }
     @Override
-    public void postFlush(CoprocessorEnvironment e) {
+    public void postFlush(RegionCoprocessorEnvironment e) {
       postFlushCalled = true;
     }
     @Override
-    public void preSplit(CoprocessorEnvironment e) {
+    public void preSplit(RegionCoprocessorEnvironment e) {
       preSplitCalled = true;
     }
     @Override
-    public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) {
+    public void postSplit(RegionCoprocessorEnvironment e, HRegion l, HRegion r) {
       postSplitCalled = true;
     }
 
@@ -191,7 +189,7 @@ public class TestCoprocessorInterface ex
     // is secretly loaded at OpenRegionHandler. we don't really
     // start a region server here, so just manually create cphost
     // and set it to region.
-    CoprocessorHost host = new CoprocessorHost(r, null, conf);
+    RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
     r.setCoprocessorHost(host);
 
     host.load(implClass, Priority.USER);
@@ -218,7 +216,7 @@ public class TestCoprocessorInterface ex
     HRegion r = HRegion.createHRegion(info, path, conf);
 
     // this following piece is a hack.
-    CoprocessorHost host = new CoprocessorHost(r, null, conf);
+    RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
     r.setCoprocessorHost(host);
 
     host.load(implClass, Priority.USER);

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java?rev=1051639&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java Tue Dec 21 20:39:26 2010
@@ -0,0 +1,499 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver}
+ * interface hooks at all appropriate times during normal HMaster operations.
+ */
+public class TestMasterObserver {
+
+  public static class CPMasterObserver implements MasterObserver {
+
+    private boolean preCreateTableCalled;
+    private boolean postCreateTableCalled;
+    private boolean preDeleteTableCalled;
+    private boolean postDeleteTableCalled;
+    private boolean preModifyTableCalled;
+    private boolean postModifyTableCalled;
+    private boolean preAddColumnCalled;
+    private boolean postAddColumnCalled;
+    private boolean preModifyColumnCalled;
+    private boolean postModifyColumnCalled;
+    private boolean preDeleteColumnCalled;
+    private boolean postDeleteColumnCalled;
+    private boolean preEnableTableCalled;
+    private boolean postEnableTableCalled;
+    private boolean preDisableTableCalled;
+    private boolean postDisableTableCalled;
+    private boolean preMoveCalled;
+    private boolean postMoveCalled;
+    private boolean preAssignCalled;
+    private boolean postAssignCalled;
+    private boolean preUnassignCalled;
+    private boolean postUnassignCalled;
+    private boolean preBalanceCalled;
+    private boolean postBalanceCalled;
+    private boolean preBalanceSwitchCalled;
+    private boolean postBalanceSwitchCalled;
+    private boolean preShutdownCalled;
+    private boolean preStopMasterCalled;
+    private boolean startCalled;
+    private boolean stopCalled;
+
+    @Override
+    public void preCreateTable(MasterCoprocessorEnvironment env,
+        HTableDescriptor desc, byte[][] splitKeys) throws IOException {
+      preCreateTableCalled = true;
+    }
+
+    @Override
+    public void postCreateTable(MasterCoprocessorEnvironment env,
+        HRegionInfo[] regions, boolean sync) throws IOException {
+      postCreateTableCalled = true;
+    }
+
+    public boolean wasCreateTableCalled() {
+      return preCreateTableCalled && postCreateTableCalled;
+    }
+
+    @Override
+    public void preDeleteTable(MasterCoprocessorEnvironment env,
+        byte[] tableName) throws IOException {
+      preDeleteTableCalled = true;
+    }
+
+    @Override
+    public void postDeleteTable(MasterCoprocessorEnvironment env,
+        byte[] tableName) throws IOException {
+      postDeleteTableCalled = true;
+    }
+
+    public boolean wasDeleteTableCalled() {
+      return preDeleteTableCalled && postDeleteTableCalled;
+    }
+
+    @Override
+    public void preModifyTable(MasterCoprocessorEnvironment env,
+        byte[] tableName, HTableDescriptor htd) throws IOException {
+      preModifyTableCalled = true;
+    }
+
+    @Override
+    public void postModifyTable(MasterCoprocessorEnvironment env,
+        byte[] tableName, HTableDescriptor htd) throws IOException {
+      postModifyTableCalled = true;
+    }
+
+    public boolean wasModifyTableCalled() {
+      return preModifyTableCalled && postModifyTableCalled;
+    }
+
+    @Override
+    public void preAddColumn(MasterCoprocessorEnvironment env,
+        byte[] tableName, HColumnDescriptor column) throws IOException {
+      preAddColumnCalled = true;
+    }
+
+    @Override
+    public void postAddColumn(MasterCoprocessorEnvironment env,
+        byte[] tableName, HColumnDescriptor column) throws IOException {
+      postAddColumnCalled = true;
+    }
+
+    public boolean wasAddColumnCalled() {
+      return preAddColumnCalled && postAddColumnCalled;
+    }
+
+    @Override
+    public void preModifyColumn(MasterCoprocessorEnvironment env,
+        byte[] tableName, HColumnDescriptor descriptor) throws IOException {
+      preModifyColumnCalled = true;
+    }
+
+    @Override
+    public void postModifyColumn(MasterCoprocessorEnvironment env,
+        byte[] tableName, HColumnDescriptor descriptor) throws IOException {
+      postModifyColumnCalled = true;
+    }
+
+    public boolean wasModifyColumnCalled() {
+      return preModifyColumnCalled && postModifyColumnCalled;
+    }
+
+    @Override
+    public void preDeleteColumn(MasterCoprocessorEnvironment env,
+        byte[] tableName, byte[] c) throws IOException {
+      preDeleteColumnCalled = true;
+    }
+
+    @Override
+    public void postDeleteColumn(MasterCoprocessorEnvironment env,
+        byte[] tableName, byte[] c) throws IOException {
+      postDeleteColumnCalled = true;
+    }
+
+    public boolean wasDeleteColumnCalled() {
+      return preDeleteColumnCalled && postDeleteColumnCalled;
+    }
+
+    @Override
+    public void preEnableTable(MasterCoprocessorEnvironment env,
+        byte[] tableName) throws IOException {
+      preEnableTableCalled = true;
+    }
+
+    @Override
+    public void postEnableTable(MasterCoprocessorEnvironment env,
+        byte[] tableName) throws IOException {
+      postEnableTableCalled = true;
+    }
+
+    public boolean wasEnableTableCalled() {
+      return preEnableTableCalled && postEnableTableCalled;
+    }
+
+    @Override
+    public void preDisableTable(MasterCoprocessorEnvironment env,
+        byte[] tableName) throws IOException {
+      preDisableTableCalled = true;
+    }
+
+    @Override
+    public void postDisableTable(MasterCoprocessorEnvironment env,
+        byte[] tableName) throws IOException {
+      postDisableTableCalled = true;
+    }
+
+    public boolean wasDisableTableCalled() {
+      return preDisableTableCalled && postDisableTableCalled;
+    }
+
+    @Override
+    public void preMove(MasterCoprocessorEnvironment env,
+        HRegionInfo region, HServerInfo srcServer, HServerInfo destServer)
+    throws UnknownRegionException {
+      preMoveCalled = true;
+    }
+
+    @Override
+    public void postMove(MasterCoprocessorEnvironment env, HRegionInfo region,
+        HServerInfo srcServer, HServerInfo destServer)
+    throws UnknownRegionException {
+      postMoveCalled = true;
+    }
+
+    public boolean wasMoveCalled() {
+      return preMoveCalled && postMoveCalled;
+    }
+
+    @Override
+    public void preAssign(MasterCoprocessorEnvironment env,
+        final byte [] regionName, final boolean force) throws IOException {
+      preAssignCalled = true;
+    }
+
+    @Override
+    public void postAssign(MasterCoprocessorEnvironment env,
+        final HRegionInfo regionInfo) throws IOException {
+      postAssignCalled = true;
+    }
+
+    public boolean wasAssignCalled() {
+      return preAssignCalled && postAssignCalled;
+    }
+
+    @Override
+    public void preUnassign(MasterCoprocessorEnvironment env,
+        final byte [] regionName, final boolean force) throws IOException {
+      preUnassignCalled = true;
+    }
+
+    @Override
+    public void postUnassign(MasterCoprocessorEnvironment env,
+        final HRegionInfo regionInfo, final boolean force) throws IOException {
+      postUnassignCalled = true;
+    }
+
+    public boolean wasUnassignCalled() {
+      return preUnassignCalled && postUnassignCalled;
+    }
+
+    @Override
+    public void preBalance(MasterCoprocessorEnvironment env)
+        throws IOException {
+      preBalanceCalled = true;
+    }
+
+    @Override
+    public void postBalance(MasterCoprocessorEnvironment env)
+        throws IOException {
+      postBalanceCalled = true;
+    }
+
+    public boolean wasBalanceCalled() {
+      return preBalanceCalled && postBalanceCalled;
+    }
+
+    @Override
+    public boolean preBalanceSwitch(MasterCoprocessorEnvironment env, boolean b)
+        throws IOException {
+      preBalanceSwitchCalled = true;
+      return b;
+    }
+
+    @Override
+    public void postBalanceSwitch(MasterCoprocessorEnvironment env,
+        boolean oldValue, boolean newValue) throws IOException {
+      postBalanceSwitchCalled = true;
+    }
+
+    public boolean wasBalanceSwitchCalled() {
+      return preBalanceSwitchCalled && postBalanceSwitchCalled;
+    }
+
+    @Override
+    public void preShutdown(MasterCoprocessorEnvironment env)
+        throws IOException {
+      preShutdownCalled = true;
+    }
+
+    @Override
+    public void preStopMaster(MasterCoprocessorEnvironment env)
+        throws IOException {
+      preStopMasterCalled = true;
+    }
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+      startCalled = true;
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+      stopCalled = true;
+    }
+
+    public boolean wasStarted() { return startCalled; }
+
+    public boolean wasStopped() { return stopCalled; }
+  }
+
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static byte[] TEST_TABLE = Bytes.toBytes("observed_table");
+  private static byte[] TEST_FAMILY = Bytes.toBytes("fam1");
+  private static byte[] TEST_FAMILY2 = Bytes.toBytes("fam2");
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+        CPMasterObserver.class.getName());
+
+    UTIL.startMiniCluster(2);
+  }
+
+  @AfterClass
+  public static void teardownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testStarted() throws Exception {
+    MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+
+    HMaster master = cluster.getMaster();
+    assertTrue("Master should be active", master.isActiveMaster());
+    MasterCoprocessorHost host = master.getCoprocessorHost();
+    assertNotNull("CoprocessorHost should not be null", host);
+    CPMasterObserver cp = (CPMasterObserver)host.findCoprocessor(
+        CPMasterObserver.class.getName());
+    assertNotNull("CPMasterObserver coprocessor not found or not installed!", cp);
+
+    // check basic lifecycle
+    assertTrue("MasterObserver should have been started", cp.wasStarted());
+  }
+
+  @Test
+  public void testTableOperations() throws Exception {
+    MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+
+    HMaster master = cluster.getMaster();
+    MasterCoprocessorHost host = master.getCoprocessorHost();
+    CPMasterObserver cp = (CPMasterObserver)host.findCoprocessor(
+        CPMasterObserver.class.getName());
+    assertFalse("No table created yet", cp.wasCreateTableCalled());
+
+    // create a table
+    HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
+    htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
+    HBaseAdmin admin = UTIL.getHBaseAdmin();
+    admin.createTable(htd);
+    assertTrue("Test table should be created", cp.wasCreateTableCalled());
+
+    // disable
+    assertFalse(cp.wasDisableTableCalled());
+    admin.disableTable(TEST_TABLE);
+    assertTrue(admin.isTableDisabled(TEST_TABLE));
+    assertTrue("Coprocessor should have been called on table disable",
+        cp.wasDisableTableCalled());
+
+    // modify table
+    htd.setMaxFileSize(512 * 1024 * 1024);
+    admin.modifyTable(TEST_TABLE, htd);
+    assertTrue("Test table should have been modified",
+        cp.wasModifyTableCalled());
+
+    // add a column family
+    admin.addColumn(TEST_TABLE, new HColumnDescriptor(TEST_FAMILY2));
+    assertTrue("New column family should have been added to test table",
+        cp.wasAddColumnCalled());
+
+    // modify a column family
+    HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY2);
+    hcd.setMaxVersions(25);
+    admin.modifyColumn(TEST_TABLE, hcd);
+    assertTrue("Second column family should be modified",
+        cp.wasModifyColumnCalled());
+
+    // enable
+    assertFalse(cp.wasEnableTableCalled());
+    admin.enableTable(TEST_TABLE);
+    assertTrue(admin.isTableEnabled(TEST_TABLE));
+    assertTrue("Coprocessor should have been called on table enable",
+        cp.wasEnableTableCalled());
+
+    // disable again
+    admin.disableTable(TEST_TABLE);
+    assertTrue(admin.isTableDisabled(TEST_TABLE));
+
+    // delete column
+    assertFalse("No column family deleted yet", cp.wasDeleteColumnCalled());
+    admin.deleteColumn(TEST_TABLE, TEST_FAMILY2);
+    HTableDescriptor tableDesc = admin.getTableDescriptor(TEST_TABLE);
+    assertNull("'"+Bytes.toString(TEST_FAMILY2)+"' should have been removed",
+        tableDesc.getFamily(TEST_FAMILY2));
+    assertTrue("Coprocessor should have been called on column delete",
+        cp.wasDeleteColumnCalled());
+
+    // delete table
+    assertFalse("No table deleted yet", cp.wasDeleteTableCalled());
+    admin.deleteTable(TEST_TABLE);
+    assertFalse("Test table should have been deleted",
+        admin.tableExists(TEST_TABLE));
+    assertTrue("Coprocessor should have been called on table delete",
+        cp.wasDeleteTableCalled());
+  }
+
+  @Test
+  public void testRegionTransitionOperations() throws Exception {
+    MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+
+    HMaster master = cluster.getMaster();
+    MasterCoprocessorHost host = master.getCoprocessorHost();
+    CPMasterObserver cp = (CPMasterObserver)host.findCoprocessor(
+        CPMasterObserver.class.getName());
+
+    HTable table = UTIL.createTable(TEST_TABLE, TEST_FAMILY);
+    UTIL.createMultiRegions(table, TEST_FAMILY);
+
+    Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
+    assertFalse(regions.isEmpty());
+    Map.Entry<HRegionInfo,HServerAddress> firstRegion =
+        regions.entrySet().iterator().next();
+
+    // try to force a move
+    Collection<HServerInfo> servers = master.getClusterStatus().getServerInfo();
+    String destName = null;
+    for (HServerInfo info : servers) {
+      if (!info.getServerAddress().equals(firstRegion.getValue())) {
+        destName = info.getServerName();
+        break;
+      }
+    }
+    master.move(firstRegion.getKey().getEncodedNameAsBytes(),
+        Bytes.toBytes(destName));
+    assertTrue("Coprocessor should have been called on region move",
+        cp.wasMoveCalled());
+
+    // make sure balancer is on
+    master.balanceSwitch(true);
+    assertTrue("Coprocessor should have been called on balance switch",
+        cp.wasBalanceSwitchCalled());
+
+    // force region rebalancing
+    master.balanceSwitch(false);
+    // move half the open regions from RS 0 to RS 1
+    HRegionServer rs = cluster.getRegionServer(0);
+    byte[] destRS = Bytes.toBytes(cluster.getRegionServer(1).getServerName());
+    List<HRegionInfo> openRegions = rs.getOnlineRegions();
+    int moveCnt = openRegions.size()/2;
+    for (int i=0; i<moveCnt; i++) {
+      HRegionInfo info = openRegions.get(i);
+      if (!(info.isMetaRegion() || info.isRootRegion())) {
+        master.move(openRegions.get(i).getEncodedNameAsBytes(), destRS);
+      }
+    }
+
+    // wait for assignments to finish
+    AssignmentManager mgr = master.getAssignmentManager();
+    Collection<AssignmentManager.RegionState> transRegions =
+        mgr.getRegionsInTransition().values();
+    for (AssignmentManager.RegionState state : transRegions) {
+      mgr.waitOnRegionToClearRegionsInTransition(state.getRegion());
+    }
+
+    // now trigger a balance
+    master.balanceSwitch(true);
+    boolean balanceRun = master.balance();
+    assertTrue("Balance request should have run", balanceRun);
+    assertTrue("Coprocessor should be called on region rebalancing",
+        cp.wasBalanceCalled());
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Tue Dec 21 20:39:26 2010
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
-import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -47,8 +47,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.hadoop.conf.Configuration;
-
 import static org.junit.Assert.*;
 
 public class TestRegionObserverInterface {
@@ -77,7 +75,7 @@ public class TestRegionObserverInterface
   public static void setupBeforeClass() throws Exception {
     // set configure to indicate which cp should be loaded
     Configuration conf = util.getConfiguration();
-    conf.set("hbase.coprocessor.default.classes",
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
         "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver");
 
     util.startMiniCluster(2);
@@ -114,7 +112,7 @@ public class TestRegionObserverInterface
     // start a region server here, so just manually create cphost
     // and set it to region.
     HRegion r = HRegion.createHRegion(info, path, conf);
-    CoprocessorHost host = new CoprocessorHost(r, null, conf);
+    RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
     r.setCoprocessorHost(host);
     host.load(implClass, Priority.USER);
     return r;
@@ -145,7 +143,7 @@ public class TestRegionObserverInterface
         if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE)) {
           continue;
         }
-        CoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
+        RegionCoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
           getCoprocessorHost();
         Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
         assertNotNull(c);
@@ -175,7 +173,7 @@ public class TestRegionObserverInterface
         if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE_2)) {
           continue;
         }
-        CoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
+        RegionCoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
           getCoprocessorHost();
         Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
         assertTrue(((SimpleRegionObserver)c).hadPreIncrement());