You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/17 02:49:11 UTC
svn commit: r1588116 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/coprocessor/environments/
main/java/org/apache/hadoop/hbase/coprocessor/observers...
Author: liyin
Date: Thu Apr 17 00:49:10 2014
New Revision: 1588116
URL: http://svn.apache.org/r1588116
Log:
[HBASE-2000] Adding RegionObserver
Author: adela
Summary: https://our.intern.facebook.com/intern/wiki/index.php/HBase/Coprocessor/Observers
Test Plan: unit test
Reviewers: daviddeng, liyintang
Reviewed By: daviddeng
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1269053
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/environments/
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/environments/RegionCoprocessorEnvironment.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/BaseRegionObserver.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/ObserverContext.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/RegionObserver.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Coprocessor.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Coprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Coprocessor.java?rev=1588116&r1=1588115&r2=1588116&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Coprocessor.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/Coprocessor.java Thu Apr 17 00:49:10 2014
@@ -16,15 +16,11 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
-import org.apache.hadoop.classification.InterfaceAudience;
-
/**
* Coprocessor interface.
*/
-@InterfaceAudience.Private
public interface Coprocessor {
int VERSION = 1;
-
/** Highest installation priority */
int PRIORITY_HIGHEST = 0;
/** High (system) installation priority */
@@ -41,8 +37,8 @@ public interface Coprocessor {
UNINSTALLED, INSTALLED, STARTING, ACTIVE, STOPPING, STOPPED
}
- void start(CoprocessorEnvironment env) throws IOException;
+ void start() throws IOException;
- void stop(CoprocessorEnvironment env) throws IOException;
+ void stop() throws IOException;
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java?rev=1588116&r1=1588115&r2=1588116&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/CoprocessorEnvironment.java Thu Apr 17 00:49:10 2014
@@ -47,19 +47,6 @@ public interface CoprocessorEnvironment
Configuration getConfiguration();
/**
- * @return an interface for accessing the given table
- * @throws IOException
- */
- HTableInterface getTable(String tableName) throws IOException;
-
- /**
- * @return an interface for accessing the given table using the passed executor to run batch
- * operations
- * @throws IOException
- */
- HTableInterface getTable(String tableName, ExecutorService service) throws IOException;
-
- /**
* @return the classloader for the loaded coprocessor instance
*/
ClassLoader getClassLoader();
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1588116&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Thu Apr 17 00:49:10 2014
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.util.coprocessor.CoprocessorClassLoader;
+import org.apache.hadoop.hbase.util.coprocessor.SortedCopyOnWriteSet;
+
+/**
+ * Provides the common setup framework and runtime services for coprocessor
+ * invocation from HBase services.
+ * @param <E> the specific environment extension that a concrete implementation
+ * provides
+ */
+public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
+ public static final String REGION_COPROCESSOR_CONF_KEY =
+ "hbase.coprocessor.region.classes";
+ public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
+ "hbase.coprocessor.regionserver.classes";
+ public static final String USER_REGION_COPROCESSOR_CONF_KEY =
+ "hbase.coprocessor.user.region.classes";
+ public static final String MASTER_COPROCESSOR_CONF_KEY =
+ "hbase.coprocessor.master.classes";
+ public static final String WAL_COPROCESSOR_CONF_KEY =
+ "hbase.coprocessor.wal.classes";
+ public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
+ public static final boolean DEFAULT_ABORT_ON_ERROR = true;
+
+ private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
+ protected ThriftClientInterface tcInter;
+ /** Ordered set of loaded coprocessors with lock */
+ protected SortedSet<E> coprocessors =
+ new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
+ protected Configuration conf;
+ // unique file prefix to use for local copies of jars when classloading
+ protected String pathPrefix;
+ protected AtomicInteger loadSequence = new AtomicInteger();
+
+ public CoprocessorHost() {
+ this.pathPrefix = UUID.randomUUID().toString();
+ }
+
+ /**
+ * Not to be confused with the per-object _coprocessors_ (above),
+ * coprocessorNames is static and stores the set of all coprocessors ever
+ * loaded by any thread in this JVM. It is strictly additive: coprocessors are
+ * added to coprocessorNames, by loadInstance() but are never removed, since
+ * the intention is to preserve a history of all loaded coprocessors for
+ * diagnosis in case of server crash (HBASE-4014).
+ */
+ private static Set<String> coprocessorNames =
+ Collections.synchronizedSet(new HashSet<String>());
+ public static Set<String> getEverLoadedCoprocessors() {
+ return coprocessorNames;
+ }
+
+ /**
+ * Used to create a parameter to the HServerLoad constructor so that
+ * HServerLoad can provide information about the coprocessors loaded by this
+ * regionserver.
+ * (HBASE-4070: Improve region server metrics to report loaded coprocessors
+ * to master).
+ */
+ public Set<String> getCoprocessors() {
+ Set<String> returnValue = new TreeSet<String>();
+ for(CoprocessorEnvironment e: coprocessors) {
+ returnValue.add(e.getInstance().getClass().getSimpleName());
+ }
+ return returnValue;
+ }
+
+ /**
+ * Load system coprocessors. Read the class names from configuration.
+ * Called by constructor.
+ */
+ protected void loadSystemCoprocessors(Configuration conf, String confKey) {
+ // load default coprocessors from configure file
+ String[] defaultCPClasses = conf.getStrings(confKey);
+ if (defaultCPClasses == null || defaultCPClasses.length == 0) {
+ LOG.error("We could not load coprocessors since there are no coprocessors specified in the configuration");
+ return;
+ }
+
+ int priority = Coprocessor.PRIORITY_SYSTEM;
+ List<E> configured = new ArrayList<E>();
+ for (String className : defaultCPClasses) {
+ className = className.trim();
+ if (findCoprocessor(className) != null) {
+ continue;
+ }
+ ClassLoader cl = this.getClass().getClassLoader();
+ Thread.currentThread().setContextClassLoader(cl);
+ try {
+ Class<?> implClass = cl.loadClass(className);
+ configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
+ LOG.info("System coprocessor " + className + " was loaded " +
+ "successfully with priority (" + priority++ + ").");
+ } catch (Throwable t) {
+ // We always abort if system coprocessors cannot be loaded
+ //TODO: check if we want to not start regionserver here.. but probably not
+ }
+ }
+ coprocessors.addAll(configured);
+ }
+
+ /**
+ * Load a coprocessor implementation into the host
+ * @param path path to implementation jar
+ * @param className the main class name
+ * @param priority chaining priority
+ * @param conf configuration for coprocessor
+ * @throws java.io.IOException Exception
+ */
+ public E load(Path path, String className, int priority,
+ Configuration conf) throws IOException {
+ Class<?> implClass = null;
+ LOG.debug("Loading coprocessor class " + className + " with path " +
+ path + " and priority " + priority);
+
+ ClassLoader cl = null;
+ if (path == null) {
+ try {
+ implClass = getClass().getClassLoader().loadClass(className);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("No jar path specified for " + className);
+ }
+ } else {
+ cl = CoprocessorClassLoader.getClassLoader(
+ path, getClass().getClassLoader(), pathPrefix, conf);
+ try {
+ implClass = cl.loadClass(className);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot load external coprocessor class " + className, e);
+ }
+ }
+ //load custom code for coprocessor
+ try (ContextResetter ctxResetter = new ContextResetter(cl)) {
+// switch temporarily to the thread classloader for custom CP
+ E cpInstance = loadInstance(implClass, priority, conf);
+ return cpInstance;
+ } catch (Exception e) {
+ String msg = new StringBuilder()
+ .append("Cannot load external coprocessor class ").append(className)
+ .toString();
+ throw new IOException(msg, e);
+ }
+ }
+
+ /**
+ * @param implClass Implementation class
+ * @param priority priority
+ * @param conf configuration
+ * @throws java.io.IOException Exception
+ */
+ public void load(Class<?> implClass, int priority, Configuration conf)
+ throws IOException {
+ E env = loadInstance(implClass, priority, conf);
+ coprocessors.add(env);
+ }
+
+ /**
+ * @param implClass Implementation class
+ * @param priority priority
+ * @param conf configuration
+ * @throws java.io.IOException Exception
+ */
+ public E loadInstance(Class<?> implClass, int priority, Configuration conf)
+ throws IOException {
+ if (!Coprocessor.class.isAssignableFrom(implClass)) {
+ throw new IOException("Configured class " + implClass.getName() + " must implement "
+ + Coprocessor.class.getName() + " interface ");
+ }
+ // create the instance
+ Coprocessor impl = null;
+ try {
+ impl = (Coprocessor)implClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ // create the environment
+ E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
+ if (env instanceof Environment) {
+ ((Environment)env).startup();
+ }
+ // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
+ // if server (master or regionserver) aborts.
+ coprocessorNames.add(implClass.getName());
+ return env;
+ }
+
+ /**
+ * Called when a new Coprocessor class is loaded
+ */
+ public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
+ int priority, int sequence, Configuration conf);
+
+ public void shutdown(CoprocessorEnvironment e) {
+ if (e instanceof Environment) {
+ ((Environment)e).shutdown();
+ } else {
+ LOG.warn("Shutdown called on unknown environment: "+
+ e.getClass().getName());
+ }
+ }
+
+ /**
+ * Find a coprocessor implementation by class name
+ * @param className the class name
+ * @return the coprocessor, or null if not found
+ */
+ public Coprocessor findCoprocessor(String className) {
+ for (E env: coprocessors) {
+ if (env.getInstance().getClass().getName().equals(className) ||
+ env.getInstance().getClass().getSimpleName().equals(className)) {
+ return env.getInstance();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find a coprocessor environment by class name
+ * @param className the class name
+ * @return the coprocessor, or null if not found
+ */
+ public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
+ for (E env: coprocessors) {
+ if (env.getInstance().getClass().getName().equals(className) ||
+ env.getInstance().getClass().getSimpleName().equals(className)) {
+ return env;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
+ * jar files.
+ * @return A set of ClassLoader instances
+ */
+ Set<ClassLoader> getExternalClassLoaders() {
+ Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
+ final ClassLoader systemClassLoader = this.getClass().getClassLoader();
+ for (E env : coprocessors) {
+ ClassLoader cl = env.getInstance().getClass().getClassLoader();
+ if (cl != systemClassLoader ){
+ //do not include system classloader
+ externalClassLoaders.add(cl);
+ }
+ }
+ return externalClassLoaders;
+ }
+
+ /**
+ * Environment priority comparator.
+ * Coprocessors are chained in sorted order.
+ */
+ static class EnvironmentPriorityComparator
+ implements Comparator<CoprocessorEnvironment> {
+ public int compare(final CoprocessorEnvironment env1,
+ final CoprocessorEnvironment env2) {
+ if (env1.getPriority() < env2.getPriority()) {
+ return -1;
+ } else if (env1.getPriority() > env2.getPriority()) {
+ return 1;
+ }
+ if (env1.getLoadSequence() < env2.getLoadSequence()) {
+ return -1;
+ } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
+ return 1;
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * Encapsulation of the environment of each coprocessor
+ */
+ public static class Environment implements CoprocessorEnvironment {
+
+ /** The coprocessor */
+ public Coprocessor impl;
+ /** Chaining priority */
+ protected int priority = Coprocessor.PRIORITY_USER;
+ /** Current coprocessor state */
+ Coprocessor.State state = Coprocessor.State.UNINSTALLED;
+ /** Accounting for tables opened by the coprocessor */
+ protected List<HTableInterface> openTables =
+ Collections.synchronizedList(new ArrayList<HTableInterface>());
+ private int seq;
+ private Configuration conf;
+
+ /**
+ * Constructor
+ * @param impl the coprocessor instance
+ * @param priority chaining priority
+ */
+ public Environment(final Coprocessor impl, final int priority,
+ final int seq, final Configuration conf) {
+ this.impl = impl;
+ this.priority = priority;
+ this.state = Coprocessor.State.INSTALLED;
+ this.seq = seq;
+ this.conf = conf;
+ }
+
+ /** Initialize the environment */
+ public void startup() throws IOException {
+ if (state == Coprocessor.State.INSTALLED
+ || state == Coprocessor.State.STOPPED) {
+ state = Coprocessor.State.STARTING;
+ try (ContextResetter ctxResetter = new ContextResetter(
+ this.getClassLoader())) {
+ impl.start();
+ state = Coprocessor.State.ACTIVE;
+ } catch (Exception e) {
+ LOG.error("Setting class loader failed", e);
+ }
+ } else {
+ LOG.warn("Not starting coprocessor " + impl.getClass().getName()
+ + " because not inactive (state=" + state.toString() + ")");
+ }
+ }
+
+ /** Clean up the environment */
+ protected void shutdown() {
+ if (state == Coprocessor.State.ACTIVE) {
+ state = Coprocessor.State.STOPPING;
+ try (ContextResetter ctxResetter = new ContextResetter(
+ this.getClassLoader())) {
+ impl.stop();
+ state = Coprocessor.State.STOPPED;
+ } catch (Exception e) {
+ LOG.error("Error stopping coprocessor " + impl.getClass().getName(),
+ e);
+ }
+ } else {
+ LOG.warn("Not stopping coprocessor " + impl.getClass().getName()
+ + " because not active (state=" + state.toString() + ")");
+ }
+ // clean up any table references
+ for (HTableInterface table : openTables) {
+ try {
+ table.close();
+ ;
+ } catch (IOException e) {
+ // nothing can be done here
+ LOG.warn(
+ "Failed to close " + Bytes.toStringBinary(table.getTableName()),
+ e);
+ }
+ }
+ }
+
+ @Override
+ public Coprocessor getInstance() {
+ return impl;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return impl.getClass().getClassLoader();
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ @Override
+ public int getLoadSequence() {
+ return seq;
+ }
+
+ /** @return the coprocessor environment version */
+ @Override
+ public int getVersion() {
+ return Coprocessor.VERSION;
+ }
+
+ /** @return the HBase release */
+ @Override
+ public String getHBaseVersion() {
+ return VersionInfo.getVersion();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ }
+
+
+ /**
+ * This is used by coprocessor hooks which are declared to throw IOException
+ * (or its subtypes). For such hooks, we should handle throwable objects
+ * depending on the Throwable's type. Those which are instances of
+ * IOException should be passed on to the client. This is in conformance with
+ * the HBase idiom regarding IOException: that it represents a circumstance
+ * that should be passed along to the client for its own handling. For
+ * example, a coprocessor that implements access controls would throw a
+ * subclass of IOException, such as AccessDeniedException, in its preGet()
+ * method to prevent an unauthorized client's performing a Get on a particular
+ * table.
+ * @param env Coprocessor Environment
+ * @param e Throwable object thrown by coprocessor.
+ * @exception IOException Exception
+ */
+ protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
+ throws IOException {
+ if (e instanceof IOException) {
+ throw (IOException)e;
+ }
+ // If we got here, e is not an IOException. A loaded coprocessor has a
+ // fatal bug, and the server (master or regionserver) should remove the
+ // faulty coprocessor from its set of active coprocessors. Setting
+ // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
+ // which may be useful in development and testing environments where
+ // 'failing fast' for error analysis is desired.
+ if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
+ // server is configured to abort.
+ // TODO: see if we want to abort regionserver here
+ } else {
+ LOG.error("Removing coprocessor '" + env.toString() + "' from " +
+ "environment because it threw: " + e,e);
+ coprocessors.remove(env);
+ try {
+ shutdown(env);
+ } catch (Exception x) {
+ LOG.error("Uncaught exception when shutting down coprocessor '"
+ + env.toString() + "'", x);
+ }
+ throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
+ "' threw: '" + e + "' and has been removed from the active " +
+ "coprocessor set.", e);
+ }
+ }
+
+ /**
+ * Used just to set the contextClassLoader on the current thread in case of
+ * exception - code in {@link #close()} will be executed
+ *
+ */
+ static class ContextResetter implements AutoCloseable {
+ final ClassLoader currentLoader;
+
+ public ContextResetter(ClassLoader cl) {
+ this.currentLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(cl);
+ }
+
+ @Override
+ public void close() throws Exception {
+ Thread.currentThread().setContextClassLoader(currentLoader);
+ }
+
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java?rev=1588116&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java Thu Apr 17 00:49:10 2014
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.collections.map.AbstractReferenceMap;
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.environments.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.observers.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.observers.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Implements the coprocessor environment and runtime support for coprocessors
+ * loaded within a {@link HRegion}.
+ */
+public class RegionCoprocessorHost
+ extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
+
+ private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
+ // The shared data map
+ private static ReferenceMap sharedDataMap = new ReferenceMap(
+ AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
+ /** The region */
+ HRegion region;
+
+ /**
+ * Constructor
+ * @param region the region
+ * @param rsServices interface to available region server functionality
+ * @param conf the configuration
+ */
+ public RegionCoprocessorHost(final HRegion region,
+ final Configuration conf) {
+ this.conf = conf;
+ this.region = region;
+ this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
+
+ // load system default cp's from configuration.
+ loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
+
+ // load system default cp's for user tables from configuration.
+ //TODO: check whether this checks for ROOT too
+ if (!region.getRegionInfo().getTableDesc().isMetaTable()
+ && !region.getRegionInfo().getTableDesc().isRootRegion()) {
+ loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
+ }
+ }
+
+
+ @Override
+ public RegionEnvironment createEnvironment(Class<?> implClass,
+ Coprocessor instance, int priority, int seq, Configuration conf) {
+ // Check if it's an Endpoint.
+ // Due to current dynamic protocol design, Endpoint
+ // uses a different way to be registered and executed.
+ // It uses a visitor pattern to invoke registered Endpoint
+ // method.
+ //TODO: we probably are not going to need this since
+// for (Class<?> c : implClass.getInterfaces()) {
+// if (CoprocessorService.class.isAssignableFrom(c)) {
+// region.registerService( ((CoprocessorService)instance).getService() );
+// }
+// }
+ ConcurrentMap<String, Object> classData;
+ // make sure only one thread can add maps
+ synchronized (sharedDataMap) {
+ // as long as at least one RegionEnvironment holds on to its classData it will
+ // remain in this map
+ classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
+ if (classData == null) {
+ classData = new ConcurrentHashMap<String, Object>();
+ sharedDataMap.put(implClass.getName(), classData);
+ }
+ }
+ return new RegionEnvironment(instance, priority, seq, conf, region,
+ classData);
+ }
+
+ /**
+ * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
+ *
+ * For example, {@link
+ * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
+ * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
+ *
+ * See also
+ * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
+ * CoprocessorEnvironment, Throwable)}
+ * @param env The coprocessor that threw the exception.
+ * @param e The exception that was thrown.
+ */
+ private void handleCoprocessorThrowableNoRethrow(
+ final CoprocessorEnvironment env, final Throwable e) {
+ try {
+ handleCoprocessorThrowable(env,e);
+ } catch (IOException ioe) {
+ // We cannot throw exceptions from the caller hook, so ignore.
+ LOG.warn(
+ "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
+ e + ". Ignoring.",e);
+ }
+ }
+
+ /**
+ * @param put The Put object
+ * @param edit The WALEdit object.
+ * @param durability The durability used
+ * @return true if default processing should be bypassed
+ * @exception IOException Exception
+ */
+ public boolean prePut(final Put put, final WALEdit edit, boolean durability)
+ throws IOException {
+ boolean bypass = false;
+ ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ Thread currentThread = Thread.currentThread();
+ ClassLoader cl = currentThread.getContextClassLoader();
+ try {
+ currentThread.setContextClassLoader(env.getClassLoader());
+ ((RegionObserver)env.getInstance()).prePut(ctx, put, edit, durability);
+ } catch (Throwable e) {
+ handleCoprocessorThrowable(env, e);
+ } finally {
+ currentThread.setContextClassLoader(cl);
+ }
+ bypass |= ctx.shouldBypass();
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ }
+
+ /**
+ * Encapsulation of the environment of each coprocessor
+ */
+ static class RegionEnvironment extends CoprocessorHost.Environment
+ implements RegionCoprocessorEnvironment {
+
+ private HRegion region;
+ ConcurrentMap<String, Object> sharedData;
+
+ /**
+ * Constructor
+ * @param impl the coprocessor instance
+ * @param priority chaining priority
+ */
+ public RegionEnvironment(final Coprocessor impl, final int priority,
+ final int seq, final Configuration conf, final HRegion region,
+ final ConcurrentMap<String, Object> sharedData) {
+ super(impl, priority, seq, conf);
+ this.region = region;
+ this.sharedData = sharedData;
+ }
+
+ /** @return the region */
+ @Override
+ public HRegion getRegion() {
+ return region;
+ }
+
+
+ public void shutdown() {
+ super.shutdown();
+ }
+
+ @Override
+ public ConcurrentMap<String, Object> getSharedData() {
+ return sharedData;
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/environments/RegionCoprocessorEnvironment.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/environments/RegionCoprocessorEnvironment.java?rev=1588116&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/environments/RegionCoprocessorEnvironment.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/environments/RegionCoprocessorEnvironment.java Thu Apr 17 00:49:10 2014
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.environments;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment {
+ /** @return shared data between all instances of this coprocessor */
+ ConcurrentMap<String, Object> getSharedData();
+
+ HRegion getRegion();
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/BaseRegionObserver.java?rev=1588116&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/BaseRegionObserver.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/BaseRegionObserver.java Thu Apr 17 00:49:10 2014
@@ -0,0 +1,47 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.observers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.environments.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+public abstract class BaseRegionObserver implements RegionObserver {
+
+ @Override
+ public void start() throws IOException { }
+
+ @Override
+ public void stop() throws IOException { }
+
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put,
+ WALEdit edit, boolean writeToWAL) throws IOException {
+ }
+
+ @Override
+ public void postPut(ObserverContext<RegionCoprocessorEnvironment> c, Put put,
+ WALEdit edit, boolean writeToWAL) throws IOException {
+ }
+
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/ObserverContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/ObserverContext.java?rev=1588116&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/ObserverContext.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/ObserverContext.java Thu Apr 17 00:49:10 2014
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.observers;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+
+/**
+ * Carries the execution state for a given invocation of an Observer coprocessor
+ * ({@link RegionObserver}, {@link MasterObserver}, or {@link WALObserver})
+ * method. The same ObserverContext instance is passed sequentially to all
+ * loaded coprocessors for a given Observer method trigger, with the
+ * <code>CoprocessorEnvironment</code> reference swapped out for each
+ * coprocessor.
+ *
+ * @param <E>
+ * The {@link CoprocessorEnvironment} subclass applicable to the
+ * revelant Observer interface.
+ */
+public class ObserverContext<E extends CoprocessorEnvironment> {
+ private E env;
+ private boolean bypass;
+ private boolean complete;
+
+ public ObserverContext() {
+ }
+
+ public E getEnvironment() {
+ return env;
+ }
+
+ public void prepare(E env) {
+ this.env = env;
+ }
+
+ /**
+ * Call to indicate that the current coprocessor's return value should be used
+ * in place of the normal HBase obtained value.
+ */
+ public void bypass() {
+ bypass = true;
+ }
+
+ /**
+ * Call to indicate that additional coprocessors further down the execution
+ * chain do not need to be invoked. Implies that this coprocessor's response
+ * is definitive.
+ */
+ public void complete() {
+ complete = true;
+ }
+
+ /**
+ * For use by the coprocessor framework.
+ *
+ * @return <code>true</code> if {@link ObserverContext#bypass()} was called by
+ * one of the loaded coprocessors, <code>false</code> otherwise.
+ */
+ public boolean shouldBypass() {
+ boolean current = bypass;
+ bypass = false;
+ return current;
+ }
+
+ /**
+ * For use by the coprocessor framework.
+ *
+ * @return <code>true</code> if {@link ObserverContext#complete()} was called
+ * by one of the loaded coprocessors, <code>false</code> otherwise.
+ */
+ public boolean shouldComplete() {
+ boolean current = complete;
+ complete = false;
+ return current;
+ }
+
+ /**
+ * Instantiates a new ObserverContext instance if the passed reference is
+ * <code>null</code> and sets the environment in the new or existing instance.
+ * This allows deferring the instantiation of a ObserverContext until it is
+ * actually needed.
+ *
+ * @param env
+ * The coprocessor environment to set
+ * @param context
+ * An existing ObserverContext instance to use, or <code>null</code>
+ * to create a new instance
+ * @param <T>
+ * The environment type for the context
+ * @return An instance of <code>ObserverContext</code> with the environment
+ * set
+ */
+ public static <T extends CoprocessorEnvironment> ObserverContext<T> createAndPrepare(
+ T env, ObserverContext<T> context) {
+ if (context == null) {
+ context = new ObserverContext<T>();
+ }
+ context.prepare(env);
+ return context;
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/RegionObserver.java?rev=1588116&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/RegionObserver.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/observers/RegionObserver.java Thu Apr 17 00:49:10 2014
@@ -0,0 +1,63 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.observers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.environments.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Coprocessors implement this interface to observe and mediate client actions
+ * on the region.
+ * TODO: this contains just a handful of calls - we should extend this vastly in future
+ */
+public interface RegionObserver extends Coprocessor {
+
+ /**
+ * Called before the client stores a value.
+ * <p>
+ * Call CoprocessorEnvironment#bypass to skip default actions
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
+ * @param c the environment provided by the region server
+ * @param put The Put object
+ * @param edit The WALEdit object that will be written to the wal
+ * @param durability Persistence guarantee for this Put
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Put put, final WALEdit edit, final boolean writeToWAL)
+ throws IOException;
+
+ /**
+ * Called after the client stores a value.
+ * <p>
+ * Call CoprocessorEnvironment#complete to skip any subsequent chained
+ * coprocessors
+ * @param c the environment provided by the region server
+ * @param put The Put object
+ * @param edit The WALEdit object for the wal
+ * @param durability Persistence guarantee for this Put
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Put put, final WALEdit edit, final boolean writeToWAL)
+ throws IOException;
+
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1588116&r1=1588115&r2=1588116&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Apr 17 00:49:10 2014
@@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.client.Ro
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TRowMutations;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorHost;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -210,6 +211,9 @@ public class HRegion implements HeapSize
private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
+ // Coprocessor host
+ private RegionCoprocessorHost coprocessorHost;
+
/*
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
@@ -545,6 +549,8 @@ public class HRegion implements HeapSize
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
// initialize dynamic parameters with current configuration
this.loadDynamicConf(conf);
+ this.coprocessorHost = new RegionCoprocessorHost(this,
+ this.conf);
}
@Override
@@ -2125,6 +2131,7 @@ public class HRegion implements HeapSize
long newSize;
splitsAndClosesLock.readLock().lock();
try {
+ doPreMutationHook(batchOp);
long addedSize = doMiniBatchOp(batchOp, methodName);
newSize = this.incMemoryUsage(addedSize);
} finally {
@@ -2138,6 +2145,39 @@ public class HRegion implements HeapSize
return batchOp.retCodes;
}
+
+ /**
+ * Execute coprocessor hooks for mutations
+ * TODO: adela currently can work only with prePut and we need to implement the rest..
+ * @param batchOp
+ * @throws IOException
+ */
+ private void doPreMutationHook(
+ BatchOperationInProgress<Pair<Mutation, Integer>> batchOp)
+ throws IOException {
+ /* Run coprocessor pre hook outside of locks to avoid deadlock */
+ WALEdit walEdit = new WALEdit();
+ if (coprocessorHost != null) {
+ for (int i = 0; i < batchOp.operations.length; i++) {
+ Mutation m = batchOp.operations[i].getFirst();
+ if (m instanceof Put) {
+ if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
+ // pre hook says skip this Put
+ // mark as success and skip in doMiniBatchMutation
+ batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
+ }
+ } else if (m instanceof Delete) {
+ // TODO: adela
+ } else {
+ // TODO: adela
+ }
+ if (!walEdit.isEmpty()) {
+ // TODO: adela
+ }
+ }
+ }
+ }
+
private long doMiniBatchOp(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
String methodNameForMetricsUpdate) throws IOException {
String signature = null;
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java?rev=1588116&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java Thu Apr 17 00:49:10 2014
@@ -0,0 +1,120 @@
+package org.apache.hadoop.hbase.coprocessor.observers;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.environments.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHRegionObserverBypassCoprocessor {
+
+ private static HBaseTestingUtility util;
+ private static final byte[] tableName = Bytes.toBytes("test");
+ private static final byte[] dummy = Bytes.toBytes("dummy");
+ private static final byte[] row1 = Bytes.toBytes("r1");
+ private static final byte[] row2 = Bytes.toBytes("r2");
+ private static final byte[] row3 = Bytes.toBytes("r3");
+ private static final byte[] test = Bytes.toBytes("test");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ TestCoprocessor.class.getName());
+ util = new HBaseTestingUtility(conf);
+ util.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ HBaseAdmin admin = util.getHBaseAdmin();
+ if (admin.tableExists(tableName)) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
+ }
+ admin.deleteTable(tableName);
+ }
+ util.createTable(tableName, new byte[][] {dummy, test});
+ }
+
+ /**
+ * Make 3 puts, one of them will have test column family - so it should be
+ * skipped during insertion
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSimple() throws Exception {
+ HTable t = new HTable(util.getConfiguration(), tableName);
+ List<Put> puts = new ArrayList<Put>();
+ Put p = new Put(row1);
+ p.add(test,dummy,dummy);
+ p.add(dummy, dummy, dummy);
+ puts.add(p);
+
+ p = new Put(row2);
+ p.add(dummy, dummy, dummy);
+ puts.add(p);
+
+ p = new Put(row3);
+ p.add(dummy, dummy, dummy);
+ puts.add(p);
+
+ t.put(puts);
+
+ Result r = t.get(new Get(row1));
+ Assert.assertTrue(
+ "There should be zero results since the put contains \"test\" CF",
+ r.isEmpty());
+
+ r = t.get(new Get(row2));
+ Assert.assertEquals(1, r.getKvs().size());
+
+ r = t.get(new Get(row3));
+ Assert.assertEquals(1, r.getKvs().size());
+
+ t.close();
+ }
+
+ /**
+ * Dummy coprocessor which skips put containing "test" as a column family
+ *
+ */
+ public static class TestCoprocessor extends BaseRegionObserver {
+ @Override
+ public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Put put, final WALEdit edit, final boolean durability)
+ throws IOException {
+ Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
+ if (familyMap.containsKey(test)) {
+ e.bypass();
+ System.out.println("bypassing put: " + put);
+ }
+ }
+ }
+
+}