You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2010/12/21 21:39:37 UTC
svn commit: r1051639 [2/3] - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/coprocessor/
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/resources/
src/test/java/org/apache/hadoop/hb...
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java?rev=1051639&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java Tue Dec 21 20:39:26 2010
@@ -0,0 +1,553 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.coprocessor.*;
+
+import java.io.IOException;
+
+/**
+ * Provides the coprocessor framework and environment for master oriented
+ * operations. {@link HMaster} interacts with the loaded coprocessors
+ * through this class.
+ */
+public class MasterCoprocessorHost
+ extends CoprocessorHost<MasterCoprocessorHost.MasterEnvironment> {
+
+ /**
+ * Coprocessor environment extension providing access to master related
+ * services.
+ */
+ static class MasterEnvironment extends CoprocessorHost.Environment
+ implements MasterCoprocessorEnvironment {
+ private MasterServices masterServices;
+
+ public MasterEnvironment(Class<?> implClass, Coprocessor impl,
+ Coprocessor.Priority priority, MasterServices services) {
+ super(impl, priority);
+ this.masterServices = services;
+ }
+
+ public MasterServices getMasterServices() {
+ return masterServices;
+ }
+ }
+
+ private MasterServices masterServices;
+
+ MasterCoprocessorHost(final MasterServices services, final Configuration conf) {
+ this.masterServices = services;
+
+ loadSystemCoprocessors(conf, MASTER_COPROCESSOR_CONF_KEY);
+ }
+
+ @Override
+ public MasterEnvironment createEnvironment(Class<?> implClass,
+ Coprocessor instance, Coprocessor.Priority priority) {
+ return new MasterEnvironment(implClass, instance, priority, masterServices);
+ }
+
+ /* Implementation of hooks for invoking MasterObservers */
+ void preCreateTable(HTableDescriptor desc, byte[][] splitKeys)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preCreateTable(env, desc, splitKeys);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postCreateTable(HRegionInfo[] regions, boolean sync) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postCreateTable(env, regions, sync);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preDeleteTable(byte[] tableName) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preDeleteTable(env, tableName);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postDeleteTable(byte[] tableName) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postDeleteTable(env, tableName);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preModifyTable(final byte[] tableName, HTableDescriptor htd)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preModifyTable(env, tableName, htd);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postModifyTable(final byte[] tableName, HTableDescriptor htd)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postModifyTable(env, tableName, htd);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preAddColumn(byte [] tableName, HColumnDescriptor column)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preAddColumn(env, tableName, column);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postAddColumn(byte [] tableName, HColumnDescriptor column)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postAddColumn(env, tableName, column);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preModifyColumn(byte [] tableName, HColumnDescriptor descriptor)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preModifyColumn(
+ env, tableName, descriptor);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postModifyColumn(byte [] tableName, HColumnDescriptor descriptor)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postModifyColumn(
+ env, tableName, descriptor);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preDeleteColumn(final byte [] tableName, final byte [] c)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preDeleteColumn(env, tableName, c);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postDeleteColumn(final byte [] tableName, final byte [] c)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postDeleteColumn(env, tableName, c);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preEnableTable(final byte [] tableName) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preEnableTable(env, tableName);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postEnableTable(final byte [] tableName) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postEnableTable(env, tableName);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preDisableTable(final byte [] tableName) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preDisableTable(env, tableName);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postDisableTable(final byte [] tableName) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postDisableTable(env, tableName);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
+ throws UnknownRegionException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preMove(
+ env, region, srcServer, destServer);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
+ throws UnknownRegionException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postMove(
+ env, region, srcServer, destServer);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ boolean preAssign(final byte [] regionName, final boolean force)
+ throws IOException {
+ boolean bypass = false;
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preAssign(env, regionName, force);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ return bypass;
+ }
+
+ void postAssign(final HRegionInfo regionInfo) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postAssign(env, regionInfo);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ boolean preUnassign(final byte [] regionName, final boolean force)
+ throws IOException {
+ boolean bypass = false;
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preUnassign(
+ env, regionName, force);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ return bypass;
+ }
+
+ void postUnassign(final HRegionInfo regionInfo, final boolean force)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postUnassign(
+ env, regionInfo, force);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ boolean preBalance() throws IOException {
+ try {
+ boolean bypass = false;
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preBalance(env);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void postBalance() throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postBalance(env);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ boolean preBalanceSwitch(final boolean b) throws IOException {
+ boolean balance = b;
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ balance = ((MasterObserver)env.getInstance()).preBalanceSwitch(
+ env, balance);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ return balance;
+ }
+
+ void postBalanceSwitch(final boolean oldValue, final boolean newValue)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).postBalanceSwitch(
+ env, oldValue, newValue);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preShutdown() throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preShutdown(env);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ void preStopMaster() throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (MasterEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof MasterObserver) {
+ ((MasterObserver)env.getInstance()).preStopMaster(env);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java Tue Dec 21 20:39:26 2010
@@ -23,7 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* Services Master supplies
@@ -56,4 +58,15 @@ public interface MasterServices {
* @throws TableNotFoundException
*/
public void checkTableModifiable(final byte [] tableName) throws IOException;
+
+ /**
+ * @return Implementation of {@link org.apache.hadoop.hbase.catalog.CatalogTracker} or null.
+ */
+ public CatalogTracker getCatalogTracker();
+
+ /*
+ * @return Implementation of {@link ZooKeeperWatcher} or null.
+ */
+ public ZooKeeperWatcher getZooKeeperWatcher();
+
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Dec 21 20:39:26 2010
@@ -248,7 +248,7 @@ public class HRegion implements HeapSize
new ReadWriteConsistencyControl();
// Coprocessor host
- private CoprocessorHost coprocessorHost;
+ private RegionCoprocessorHost coprocessorHost;
/**
* Name of the region info file that resides just under the region directory.
@@ -319,7 +319,7 @@ public class HRegion implements HeapSize
// don't initialize coprocessors if not running within a regionserver
// TODO: revisit if coprocessors should load in other cases
if (rsServices != null) {
- this.coprocessorHost = new CoprocessorHost(this, rsServices, conf);
+ this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
}
if (LOG.isDebugEnabled()) {
// Write out region name as string and its encoded name.
@@ -3557,12 +3557,12 @@ public class HRegion implements HeapSize
}
/** @return the coprocessor host */
- public CoprocessorHost getCoprocessorHost() {
+ public RegionCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
}
/** @param coprocessorHost the new coprocessor host */
- public void setCoprocessorHost(final CoprocessorHost coprocessorHost) {
+ public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
this.coprocessorHost = coprocessorHost;
}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1051639&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Tue Dec 21 20:39:26 2010
@@ -0,0 +1,1007 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.coprocessor.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Implements the coprocessor environment and runtime support for coprocessors
+ * loaded within a {@link HRegion}.
+ */
+public class RegionCoprocessorHost
+ extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
+
+ private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
+
+ /**
+ * Encapsulation of the environment of each coprocessor
+ */
+ static class RegionEnvironment extends CoprocessorHost.Environment
+ implements RegionCoprocessorEnvironment {
+
+ private HRegion region;
+ private RegionServerServices rsServices;
+
+ /**
+ * Constructor
+ * @param impl the coprocessor instance
+ * @param priority chaining priority
+ */
+ public RegionEnvironment(final Coprocessor impl,
+ Coprocessor.Priority priority, final HRegion region,
+ final RegionServerServices services) {
+ super(impl, priority);
+ this.region = region;
+ this.rsServices = services;
+ }
+
+ /** @return the region */
+ @Override
+ public HRegion getRegion() {
+ return region;
+ }
+
+ /** @return reference to the region server services */
+ @Override
+ public RegionServerServices getRegionServerServices() {
+ return rsServices;
+ }
+
+ public void shutdown() {
+ super.shutdown();
+ }
+ }
+
+ static final Pattern attrSpecMatch = Pattern.compile("(.+):(.+):(.+)");
+
+ /** The region server services */
+ RegionServerServices rsServices;
+ /** The region */
+ HRegion region;
+
+ /**
+ * Constructor
+ * @param region the region
+ * @param rsServices interface to available region server functionality
+ * @param conf the configuration
+ */
+ public RegionCoprocessorHost(final HRegion region,
+ final RegionServerServices rsServices, final Configuration conf) {
+ this.rsServices = rsServices;
+ this.region = region;
+ this.pathPrefix = this.region.getRegionNameAsString().replace(',', '_');
+
+ // load system default cp's from configuration.
+ loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
+
+ // load Coprocessor From HDFS
+ loadTableCoprocessors();
+ }
+
+ void loadTableCoprocessors () {
+ // scan the table attributes for coprocessor load specifications
+ // initialize the coprocessors
+ for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
+ region.getTableDesc().getValues().entrySet()) {
+ String key = Bytes.toString(e.getKey().get());
+ if (key.startsWith("COPROCESSOR")) {
+ // found one
+ try {
+ String spec = Bytes.toString(e.getValue().get());
+ Matcher matcher = attrSpecMatch.matcher(spec);
+ if (matcher.matches()) {
+ Path path = new Path(matcher.group(1));
+ String className = matcher.group(2);
+ Coprocessor.Priority priority =
+ Coprocessor.Priority.valueOf(matcher.group(3));
+ load(path, className, priority);
+ LOG.info("Load coprocessor " + className + " from HTD of " +
+ Bytes.toString(region.getTableDesc().getName()) +
+ " successfully.");
+ } else {
+ LOG.warn("attribute '" + key + "' has invalid coprocessor spec");
+ }
+ } catch (IOException ex) {
+ LOG.warn(StringUtils.stringifyException(ex));
+ }
+ }
+ }
+ }
+
+ @Override
+ public RegionEnvironment createEnvironment(
+ Class<?> implClass, Coprocessor instance, Coprocessor.Priority priority) {
+ // Check if it's an Endpoint.
+ // Due to current dynamic protocol design, Endpoint
+ // uses a different way to be registered and executed.
+ // It uses a visitor pattern to invoke registered Endpoint
+ // method.
+ for (Class c : implClass.getInterfaces()) {
+ if (CoprocessorProtocol.class.isAssignableFrom(c)) {
+ region.registerProtocol(c, (CoprocessorProtocol)instance);
+ break;
+ }
+ }
+
+ return new RegionEnvironment(instance, priority, region, rsServices);
+ }
+
+ /**
+ * Invoked before a region open
+ */
+ public void preOpen() {
+ loadTableCoprocessors();
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preOpen(env);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Invoked after a region open
+ */
+ public void postOpen() {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postOpen(env);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Invoked before a region is closed
+ * @param abortRequested true if the server is aborting
+ */
+ public void preClose(boolean abortRequested) {
+ try {
+ coprocessorLock.writeLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preClose(env, abortRequested);
+ }
+ }
+ } finally {
+ coprocessorLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Invoked after a region is closed
+ * @param abortRequested true if the server is aborting
+ */
+ public void postClose(boolean abortRequested) {
+ try {
+ coprocessorLock.writeLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postClose(env, abortRequested);
+ }
+ shutdown(env);
+ }
+ } finally {
+ coprocessorLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Invoked before a region is compacted.
+ * @param willSplit true if the compaction is about to trigger a split
+ */
+ public void preCompact(boolean willSplit) {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preCompact(env, willSplit);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Invoked after a region is compacted.
+ * @param willSplit true if the compaction is about to trigger a split
+ */
+ public void postCompact(boolean willSplit) {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postCompact(env, willSplit);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Invoked before a memstore flush
+ */
+ public void preFlush() {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preFlush(env);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Invoked after a memstore flush
+ */
+ public void postFlush() {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postFlush(env);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Invoked just before a split
+ */
+ public void preSplit() {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preSplit(env);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Invoked just after a split
+ * @param l the new left-hand daughter region
+ * @param r the new right-hand daughter region
+ */
+ public void postSplit(HRegion l, HRegion r) {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postSplit(env, l, r);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ // RegionObserver support
+
+ /**
+ * @param row the row key
+ * @param family the family
+ * @param result the result set from the region
+ * @return true if default processing should be bypassed
+ * @exception IOException Exception
+ */
+ public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
+ final Result result) throws IOException {
+ try {
+ boolean bypass = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preGetClosestRowBefore(env, row, family,
+ result);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param row the row key
+ * @param family the family
+ * @param result the result set from the region
+ * @exception IOException Exception
+ */
+ public void postGetClosestRowBefore(final byte[] row, final byte[] family,
+ final Result result) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postGetClosestRowBefore(env, row, family,
+ result);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param get the Get request
+ * @return true if default processing should be bypassed
+ * @exception IOException Exception
+ */
+ public boolean preGet(final Get get, final List<KeyValue> results)
+ throws IOException {
+ try {
+ boolean bypass = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preGet(env, get, results);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param get the Get request
+ * @param results the result set
+ * @return the possibly transformed result set to use
+ * @exception IOException Exception
+ */
+ public void postGet(final Get get, final List<KeyValue> results)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postGet(env, get, results);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param get the Get request
+ * @return true or false to return to client if bypassing normal operation,
+ * or null otherwise
+ * @exception IOException Exception
+ */
+ public Boolean preExists(final Get get) throws IOException {
+ try {
+ boolean bypass = false;
+ boolean exists = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ exists = ((RegionObserver)env.getInstance()).preExists(env, get, exists);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? exists : null;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param get the Get request
+ * @param exists the result returned by the region server
+ * @return the result to return to the client
+ * @exception IOException Exception
+ */
+ public boolean postExists(final Get get, boolean exists)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ exists = ((RegionObserver)env.getInstance()).postExists(env, get, exists);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return exists;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param familyMap map of family to edits for the given family.
+ * @param writeToWAL true if the change should be written to the WAL
+ * @return true if default processing should be bypassed
+ * @exception IOException Exception
+ */
+ public boolean prePut(final Map<byte[], List<KeyValue>> familyMap,
+ final boolean writeToWAL) throws IOException {
+ try {
+ boolean bypass = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).prePut(env, familyMap, writeToWAL);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param familyMap map of family to edits for the given family.
+ * @param writeToWAL true if the change should be written to the WAL
+ * @exception IOException Exception
+ */
+ public void postPut(final Map<byte[], List<KeyValue>> familyMap,
+ final boolean writeToWAL) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postPut(env, familyMap, writeToWAL);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param familyMap map of family to edits for the given family.
+ * @param writeToWAL true if the change should be written to the WAL
+ * @return true if default processing should be bypassed
+ * @exception IOException Exception
+ */
+ public boolean preDelete(final Map<byte[], List<KeyValue>> familyMap,
+ final boolean writeToWAL) throws IOException {
+ try {
+ boolean bypass = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preDelete(env, familyMap, writeToWAL);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param familyMap map of family to edits for the given family.
+ * @param writeToWAL true if the change should be written to the WAL
+ * @exception IOException Exception
+ */
+ public void postDelete(final Map<byte[], List<KeyValue>> familyMap,
+ final boolean writeToWAL) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postDelete(env, familyMap, writeToWAL);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param value the expected value
+ * @param put data to put if check succeeds
+ * @return true or false to return to client if default processing should
+ * be bypassed, or null otherwise
+ * @throws IOException e
+ */
+ public Boolean preCheckAndPut(final byte [] row, final byte [] family,
+ final byte [] qualifier, final byte [] value, Put put)
+ throws IOException
+ {
+ try {
+ boolean bypass = false;
+ boolean result = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ result = ((RegionObserver)env.getInstance()).preCheckAndPut(env, row, family,
+ qualifier, value, put, result);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? result : null;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param value the expected value
+ * @param put data to put if check succeeds
+ * @throws IOException e
+ */
+ public boolean postCheckAndPut(final byte [] row, final byte [] family,
+ final byte [] qualifier, final byte [] value, final Put put,
+ boolean result)
+ throws IOException
+ {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ result = ((RegionObserver)env.getInstance()).postCheckAndPut(env, row,
+ family, qualifier, value, put, result);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return result;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param value the expected value
+ * @param delete delete to commit if check succeeds
+ * @return true or false to return to client if default processing should
+ * be bypassed, or null otherwise
+ * @throws IOException e
+ */
+ public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
+ final byte [] qualifier, final byte [] value, Delete delete)
+ throws IOException
+ {
+ try {
+ boolean bypass = false;
+ boolean result = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ result = ((RegionObserver)env.getInstance()).preCheckAndDelete(env, row,
+ family, qualifier, value, delete, result);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? result : null;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param value the expected value
+ * @param delete delete to commit if check succeeds
+ * @throws IOException e
+ */
+ public boolean postCheckAndDelete(final byte [] row, final byte [] family,
+ final byte [] qualifier, final byte [] value, final Delete delete,
+ boolean result)
+ throws IOException
+ {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ result = ((RegionObserver)env.getInstance()).postCheckAndDelete(env, row,
+ family, qualifier, value, delete, result);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return result;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param amount long amount to increment
+ * @param writeToWAL true if the change should be written to the WAL
+ * @return return value for client if default operation should be bypassed,
+ * or null otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Long preIncrementColumnValue(final byte [] row, final byte [] family,
+ final byte [] qualifier, long amount, final boolean writeToWAL)
+ throws IOException {
+ try {
+ boolean bypass = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ amount = ((RegionObserver)env.getInstance()).preIncrementColumnValue(env,
+ row, family, qualifier, amount, writeToWAL);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? amount : null;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param row row to check
+ * @param family column family
+ * @param qualifier column qualifier
+ * @param amount long amount to increment
+ * @param writeToWAL true if the change should be written to the WAL
+ * @param result the result returned by incrementColumnValue
+ * @return the result to return to the client
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public long postIncrementColumnValue(final byte [] row, final byte [] family,
+ final byte [] qualifier, final long amount, final boolean writeToWAL,
+ long result) throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ result = ((RegionObserver)env.getInstance()).postIncrementColumnValue(env,
+ row, family, qualifier, amount, writeToWAL, result);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ /**
+ * @param increment increment object
+ * @return result to return to client if default operation should be
+ * bypassed, null otherwise
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public Result preIncrement(Increment increment)
+ throws IOException {
+ try {
+ boolean bypass = false;
+ Result result = new Result();
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preIncrement(env, increment, result);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? result : null;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param increment increment object
+ * @param result the result returned by incrementColumnValue
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ public void postIncrement(final Increment increment, Result result)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postIncrement(env, increment, result);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param scan the Scan specification
+ * @return scanner id to return to client if default operation should be
+ * bypassed, false otherwise
+ * @exception IOException Exception
+ */
+ public InternalScanner preScannerOpen(Scan scan) throws IOException {
+ try {
+ boolean bypass = false;
+ InternalScanner s = null;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ s = ((RegionObserver)env.getInstance()).preScannerOpen(env, scan, s);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? s : null;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param scan the Scan specification
+ * @param s the scanner
+ * @return the scanner instance to use
+ * @exception IOException Exception
+ */
+ public InternalScanner postScannerOpen(final Scan scan, InternalScanner s)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ s = ((RegionObserver)env.getInstance()).postScannerOpen(env, scan, s);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return s;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param s the scanner
+ * @param results the result set returned by the region server
+ * @param limit the maximum number of results to return
+ * @return 'has next' indication to client if bypassing default behavior, or
+ * null otherwise
+ * @exception IOException Exception
+ */
+ public Boolean preScannerNext(final InternalScanner s,
+ final List<KeyValue> results, int limit) throws IOException {
+ try {
+ boolean bypass = false;
+ boolean hasNext = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ hasNext = ((RegionObserver)env.getInstance()).preScannerNext(env, s, results,
+ limit, hasNext);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass ? hasNext : null;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param s the scanner
+ * @param results the result set returned by the region server
+ * @param limit the maximum number of results to return
+ * @param hasMore
+ * @return 'has more' indication to give to client
+ * @exception IOException Exception
+ */
+ public boolean postScannerNext(final InternalScanner s,
+ final List<KeyValue> results, final int limit, boolean hasMore)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ hasMore = ((RegionObserver)env.getInstance()).postScannerNext(env, s,
+ results, limit, hasMore);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return hasMore;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param s the scanner
+ * @return true if default behavior should be bypassed, false otherwise
+ * @exception IOException Exception
+ */
+ public boolean preScannerClose(final InternalScanner s)
+ throws IOException {
+ try {
+ boolean bypass = false;
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).preScannerClose(env, s);
+ bypass |= env.shouldBypass();
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ return bypass;
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * @param s the scanner
+ * @exception IOException Exception
+ */
+ public void postScannerClose(final InternalScanner s)
+ throws IOException {
+ try {
+ coprocessorLock.readLock().lock();
+ for (RegionEnvironment env: coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ((RegionObserver)env.getInstance()).postScannerClose(env, s);
+ if (env.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ } finally {
+ coprocessorLock.readLock().unlock();
+ }
+ }
+}
Modified: hbase/trunk/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/resources/hbase-default.xml?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/main/resources/hbase-default.xml (original)
+++ hbase/trunk/src/main/resources/hbase-default.xml Tue Dec 21 20:39:26 2010
@@ -480,17 +480,28 @@
</property>
<property>
- <name>hbase.coprocessor.default.classes</name>
+ <name>hbase.coprocessor.region.classes</name>
<value></value>
<description>A comma-separated list of Coprocessors that are loaded by
- default. For any override coprocessor method, these classes will be called
- in order. After implement your own
- Coprocessor, just put it in HBase's classpath and add the fully
- qualified class name here.
+ default on all tables. For any override coprocessor method, these classes
+ will be called in order. After implementing your own Coprocessor, just put
+ it in HBase's classpath and add the fully qualified class name here.
A coprocessor can also be loaded on demand by setting HTableDescriptor.
</description>
</property>
+ <property>
+ <name>hbase.coprocessor.master.classes</name>
+ <value></value>
+ <description>A comma-separated list of
+ org.apache.hadoop.hbase.coprocessor.MasterObserver coprocessors that are
+ loaded by default on the active HMaster process. For any implemented
+ coprocessor methods, the listed classes will be called in order. After
+ implementing your own MasterObserver, just put it in HBase's classpath
+ and add the fully qualified class name here.
+ </description>
+ </property>
+
<!--
The following three properties are used together to create the list of
host:peer_port:leader_port quorum servers for ZooKeeper.
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java Tue Dec 21 20:39:26 2010
@@ -43,7 +43,8 @@ implements ColumnAggregationProtocol {
scan.addColumn(family, qualifier);
int sumResult = 0;
- InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
+ InternalScanner scanner = ((RegionCoprocessorEnvironment)getEnvironment())
+ .getRegion().getScanner(scan);
try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean done = false;
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Tue Dec 21 20:39:26 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
@@ -68,12 +69,12 @@ public class SimpleRegionObserver extend
boolean hadPostIncrement = false;
@Override
- public void preOpen(CoprocessorEnvironment e) {
+ public void preOpen(RegionCoprocessorEnvironment e) {
hadPreOpen = true;
}
@Override
- public void postOpen(CoprocessorEnvironment e) {
+ public void postOpen(RegionCoprocessorEnvironment e) {
hadPostOpen = true;
}
@@ -82,12 +83,12 @@ public class SimpleRegionObserver extend
}
@Override
- public void preClose(CoprocessorEnvironment e, boolean abortRequested) {
+ public void preClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
hadPreClose = true;
}
@Override
- public void postClose(CoprocessorEnvironment e, boolean abortRequested) {
+ public void postClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
hadPostClose = true;
}
@@ -96,12 +97,12 @@ public class SimpleRegionObserver extend
}
@Override
- public void preFlush(CoprocessorEnvironment e) {
+ public void preFlush(RegionCoprocessorEnvironment e) {
hadPreFlush = true;
}
@Override
- public void postFlush(CoprocessorEnvironment e) {
+ public void postFlush(RegionCoprocessorEnvironment e) {
hadPostFlush = true;
}
@@ -110,12 +111,12 @@ public class SimpleRegionObserver extend
}
@Override
- public void preSplit(CoprocessorEnvironment e) {
+ public void preSplit(RegionCoprocessorEnvironment e) {
hadPreSplit = true;
}
@Override
- public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) {
+ public void postSplit(RegionCoprocessorEnvironment e, HRegion l, HRegion r) {
hadPostSplit = true;
}
@@ -124,12 +125,12 @@ public class SimpleRegionObserver extend
}
@Override
- public void preCompact(CoprocessorEnvironment e, boolean willSplit) {
+ public void preCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
hadPreCompact = true;
}
@Override
- public void postCompact(CoprocessorEnvironment e, boolean willSplit) {
+ public void postCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
hadPostCompact = true;
}
@@ -138,7 +139,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void preGet(final CoprocessorEnvironment e, final Get get,
+ public void preGet(final RegionCoprocessorEnvironment e, final Get get,
final List<KeyValue> results) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
@@ -151,7 +152,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void postGet(final CoprocessorEnvironment e, final Get get,
+ public void postGet(final RegionCoprocessorEnvironment e, final Get get,
final List<KeyValue> results) {
assertNotNull(e);
assertNotNull(e.getRegion());
@@ -181,7 +182,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void prePut(final CoprocessorEnvironment e, final Map<byte[],
+ public void prePut(final RegionCoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
@@ -208,7 +209,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void postPut(final CoprocessorEnvironment e, final Map<byte[],
+ public void postPut(final RegionCoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
@@ -235,7 +236,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void preDelete(final CoprocessorEnvironment e, final Map<byte[],
+ public void preDelete(final RegionCoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
@@ -247,7 +248,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void postDelete(final CoprocessorEnvironment e, final Map<byte[],
+ public void postDelete(final RegionCoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
assertNotNull(e);
assertNotNull(e.getRegion());
@@ -260,7 +261,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void preGetClosestRowBefore(final CoprocessorEnvironment e,
+ public void preGetClosestRowBefore(final RegionCoprocessorEnvironment e,
final byte[] row, final byte[] family, final Result result)
throws IOException {
assertNotNull(e);
@@ -274,7 +275,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void postGetClosestRowBefore(final CoprocessorEnvironment e,
+ public void postGetClosestRowBefore(final RegionCoprocessorEnvironment e,
final byte[] row, final byte[] family, final Result result)
throws IOException {
assertNotNull(e);
@@ -288,7 +289,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void preIncrement(final CoprocessorEnvironment e,
+ public void preIncrement(final RegionCoprocessorEnvironment e,
final Increment increment, final Result result) throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
@@ -297,7 +298,7 @@ public class SimpleRegionObserver extend
}
@Override
- public void postIncrement(final CoprocessorEnvironment e,
+ public void postIncrement(final RegionCoprocessorEnvironment e,
final Increment increment, final Result result) throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java Tue Dec 21 20:39:26 2010
@@ -53,7 +53,7 @@ public class TestCoprocessorEndpoint {
public static void setupBeforeClass() throws Exception {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
- conf.set("hbase.coprocessor.default.classes",
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint");
util.startMiniCluster(2);
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java Tue Dec 21 20:39:26 2010
@@ -30,10 +30,8 @@ import org.apache.hadoop.hbase.HBaseTest
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.coprocessor.Coprocessor;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
-import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
import org.apache.hadoop.hbase.util.Bytes;
@@ -75,43 +73,43 @@ public class TestCoprocessorInterface ex
}
@Override
- public void preOpen(CoprocessorEnvironment e) {
+ public void preOpen(RegionCoprocessorEnvironment e) {
preOpenCalled = true;
}
@Override
- public void postOpen(CoprocessorEnvironment e) {
+ public void postOpen(RegionCoprocessorEnvironment e) {
postOpenCalled = true;
}
@Override
- public void preClose(CoprocessorEnvironment e, boolean abortRequested) {
+ public void preClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
preCloseCalled = true;
}
@Override
- public void postClose(CoprocessorEnvironment e, boolean abortRequested) {
+ public void postClose(RegionCoprocessorEnvironment e, boolean abortRequested) {
postCloseCalled = true;
}
@Override
- public void preCompact(CoprocessorEnvironment e, boolean willSplit) {
+ public void preCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
preCompactCalled = true;
}
@Override
- public void postCompact(CoprocessorEnvironment e, boolean willSplit) {
+ public void postCompact(RegionCoprocessorEnvironment e, boolean willSplit) {
postCompactCalled = true;
}
@Override
- public void preFlush(CoprocessorEnvironment e) {
+ public void preFlush(RegionCoprocessorEnvironment e) {
preFlushCalled = true;
}
@Override
- public void postFlush(CoprocessorEnvironment e) {
+ public void postFlush(RegionCoprocessorEnvironment e) {
postFlushCalled = true;
}
@Override
- public void preSplit(CoprocessorEnvironment e) {
+ public void preSplit(RegionCoprocessorEnvironment e) {
preSplitCalled = true;
}
@Override
- public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) {
+ public void postSplit(RegionCoprocessorEnvironment e, HRegion l, HRegion r) {
postSplitCalled = true;
}
@@ -191,7 +189,7 @@ public class TestCoprocessorInterface ex
// is secretly loaded at OpenRegionHandler. we don't really
// start a region server here, so just manually create cphost
// and set it to region.
- CoprocessorHost host = new CoprocessorHost(r, null, conf);
+ RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
host.load(implClass, Priority.USER);
@@ -218,7 +216,7 @@ public class TestCoprocessorInterface ex
HRegion r = HRegion.createHRegion(info, path, conf);
// this following piece is a hack.
- CoprocessorHost host = new CoprocessorHost(r, null, conf);
+ RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
host.load(implClass, Priority.USER);
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java?rev=1051639&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java Tue Dec 21 20:39:26 2010
@@ -0,0 +1,499 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests invocation of the {@link org.apache.hadoop.hbase.coprocessor.MasterObserver}
+ * interface hooks at all appropriate times during normal HMaster operations.
+ */
+public class TestMasterObserver {
+
+ public static class CPMasterObserver implements MasterObserver {
+
+ private boolean preCreateTableCalled;
+ private boolean postCreateTableCalled;
+ private boolean preDeleteTableCalled;
+ private boolean postDeleteTableCalled;
+ private boolean preModifyTableCalled;
+ private boolean postModifyTableCalled;
+ private boolean preAddColumnCalled;
+ private boolean postAddColumnCalled;
+ private boolean preModifyColumnCalled;
+ private boolean postModifyColumnCalled;
+ private boolean preDeleteColumnCalled;
+ private boolean postDeleteColumnCalled;
+ private boolean preEnableTableCalled;
+ private boolean postEnableTableCalled;
+ private boolean preDisableTableCalled;
+ private boolean postDisableTableCalled;
+ private boolean preMoveCalled;
+ private boolean postMoveCalled;
+ private boolean preAssignCalled;
+ private boolean postAssignCalled;
+ private boolean preUnassignCalled;
+ private boolean postUnassignCalled;
+ private boolean preBalanceCalled;
+ private boolean postBalanceCalled;
+ private boolean preBalanceSwitchCalled;
+ private boolean postBalanceSwitchCalled;
+ private boolean preShutdownCalled;
+ private boolean preStopMasterCalled;
+ private boolean startCalled;
+ private boolean stopCalled;
+
+ @Override
+ public void preCreateTable(MasterCoprocessorEnvironment env,
+ HTableDescriptor desc, byte[][] splitKeys) throws IOException {
+ preCreateTableCalled = true;
+ }
+
+ @Override
+ public void postCreateTable(MasterCoprocessorEnvironment env,
+ HRegionInfo[] regions, boolean sync) throws IOException {
+ postCreateTableCalled = true;
+ }
+
+ public boolean wasCreateTableCalled() {
+ return preCreateTableCalled && postCreateTableCalled;
+ }
+
+ @Override
+ public void preDeleteTable(MasterCoprocessorEnvironment env,
+ byte[] tableName) throws IOException {
+ preDeleteTableCalled = true;
+ }
+
+ @Override
+ public void postDeleteTable(MasterCoprocessorEnvironment env,
+ byte[] tableName) throws IOException {
+ postDeleteTableCalled = true;
+ }
+
+ public boolean wasDeleteTableCalled() {
+ return preDeleteTableCalled && postDeleteTableCalled;
+ }
+
+ @Override
+ public void preModifyTable(MasterCoprocessorEnvironment env,
+ byte[] tableName, HTableDescriptor htd) throws IOException {
+ preModifyTableCalled = true;
+ }
+
+ @Override
+ public void postModifyTable(MasterCoprocessorEnvironment env,
+ byte[] tableName, HTableDescriptor htd) throws IOException {
+ postModifyTableCalled = true;
+ }
+
+ public boolean wasModifyTableCalled() {
+ return preModifyTableCalled && postModifyTableCalled;
+ }
+
+ @Override
+ public void preAddColumn(MasterCoprocessorEnvironment env,
+ byte[] tableName, HColumnDescriptor column) throws IOException {
+ preAddColumnCalled = true;
+ }
+
+ @Override
+ public void postAddColumn(MasterCoprocessorEnvironment env,
+ byte[] tableName, HColumnDescriptor column) throws IOException {
+ postAddColumnCalled = true;
+ }
+
+ public boolean wasAddColumnCalled() {
+ return preAddColumnCalled && postAddColumnCalled;
+ }
+
+ @Override
+ public void preModifyColumn(MasterCoprocessorEnvironment env,
+ byte[] tableName, HColumnDescriptor descriptor) throws IOException {
+ preModifyColumnCalled = true;
+ }
+
+ @Override
+ public void postModifyColumn(MasterCoprocessorEnvironment env,
+ byte[] tableName, HColumnDescriptor descriptor) throws IOException {
+ postModifyColumnCalled = true;
+ }
+
+ public boolean wasModifyColumnCalled() {
+ return preModifyColumnCalled && postModifyColumnCalled;
+ }
+
+ @Override
+ public void preDeleteColumn(MasterCoprocessorEnvironment env,
+ byte[] tableName, byte[] c) throws IOException {
+ preDeleteColumnCalled = true;
+ }
+
+ @Override
+ public void postDeleteColumn(MasterCoprocessorEnvironment env,
+ byte[] tableName, byte[] c) throws IOException {
+ postDeleteColumnCalled = true;
+ }
+
+ public boolean wasDeleteColumnCalled() {
+ return preDeleteColumnCalled && postDeleteColumnCalled;
+ }
+
+ @Override
+ public void preEnableTable(MasterCoprocessorEnvironment env,
+ byte[] tableName) throws IOException {
+ preEnableTableCalled = true;
+ }
+
+ @Override
+ public void postEnableTable(MasterCoprocessorEnvironment env,
+ byte[] tableName) throws IOException {
+ postEnableTableCalled = true;
+ }
+
+ public boolean wasEnableTableCalled() {
+ return preEnableTableCalled && postEnableTableCalled;
+ }
+
+ @Override
+ public void preDisableTable(MasterCoprocessorEnvironment env,
+ byte[] tableName) throws IOException {
+ preDisableTableCalled = true;
+ }
+
+ @Override
+ public void postDisableTable(MasterCoprocessorEnvironment env,
+ byte[] tableName) throws IOException {
+ postDisableTableCalled = true;
+ }
+
+ public boolean wasDisableTableCalled() {
+ return preDisableTableCalled && postDisableTableCalled;
+ }
+
+ @Override
+ public void preMove(MasterCoprocessorEnvironment env,
+ HRegionInfo region, HServerInfo srcServer, HServerInfo destServer)
+ throws UnknownRegionException {
+ preMoveCalled = true;
+ }
+
+ @Override
+ public void postMove(MasterCoprocessorEnvironment env, HRegionInfo region,
+ HServerInfo srcServer, HServerInfo destServer)
+ throws UnknownRegionException {
+ postMoveCalled = true;
+ }
+
+ public boolean wasMoveCalled() {
+ return preMoveCalled && postMoveCalled;
+ }
+
+ @Override
+ public void preAssign(MasterCoprocessorEnvironment env,
+ final byte [] regionName, final boolean force) throws IOException {
+ preAssignCalled = true;
+ }
+
+ @Override
+ public void postAssign(MasterCoprocessorEnvironment env,
+ final HRegionInfo regionInfo) throws IOException {
+ postAssignCalled = true;
+ }
+
+ public boolean wasAssignCalled() {
+ return preAssignCalled && postAssignCalled;
+ }
+
+ @Override
+ public void preUnassign(MasterCoprocessorEnvironment env,
+ final byte [] regionName, final boolean force) throws IOException {
+ preUnassignCalled = true;
+ }
+
+ @Override
+ public void postUnassign(MasterCoprocessorEnvironment env,
+ final HRegionInfo regionInfo, final boolean force) throws IOException {
+ postUnassignCalled = true;
+ }
+
+ public boolean wasUnassignCalled() {
+ return preUnassignCalled && postUnassignCalled;
+ }
+
+ @Override
+ public void preBalance(MasterCoprocessorEnvironment env)
+ throws IOException {
+ preBalanceCalled = true;
+ }
+
+ @Override
+ public void postBalance(MasterCoprocessorEnvironment env)
+ throws IOException {
+ postBalanceCalled = true;
+ }
+
+ public boolean wasBalanceCalled() {
+ return preBalanceCalled && postBalanceCalled;
+ }
+
+ @Override
+ public boolean preBalanceSwitch(MasterCoprocessorEnvironment env, boolean b)
+ throws IOException {
+ preBalanceSwitchCalled = true;
+ return b;
+ }
+
+ @Override
+ public void postBalanceSwitch(MasterCoprocessorEnvironment env,
+ boolean oldValue, boolean newValue) throws IOException {
+ postBalanceSwitchCalled = true;
+ }
+
+ public boolean wasBalanceSwitchCalled() {
+ return preBalanceSwitchCalled && postBalanceSwitchCalled;
+ }
+
+ @Override
+ public void preShutdown(MasterCoprocessorEnvironment env)
+ throws IOException {
+ preShutdownCalled = true;
+ }
+
+ @Override
+ public void preStopMaster(MasterCoprocessorEnvironment env)
+ throws IOException {
+ preStopMasterCalled = true;
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ startCalled = true;
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ stopCalled = true;
+ }
+
+ public boolean wasStarted() { return startCalled; }
+
+ public boolean wasStopped() { return stopCalled; }
+ }
+
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static byte[] TEST_TABLE = Bytes.toBytes("observed_table");
+ private static byte[] TEST_FAMILY = Bytes.toBytes("fam1");
+ private static byte[] TEST_FAMILY2 = Bytes.toBytes("fam2");
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ CPMasterObserver.class.getName());
+
+ UTIL.startMiniCluster(2);
+ }
+
+ @AfterClass
+ public static void teardownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testStarted() throws Exception {
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+
+ HMaster master = cluster.getMaster();
+ assertTrue("Master should be active", master.isActiveMaster());
+ MasterCoprocessorHost host = master.getCoprocessorHost();
+ assertNotNull("CoprocessorHost should not be null", host);
+ CPMasterObserver cp = (CPMasterObserver)host.findCoprocessor(
+ CPMasterObserver.class.getName());
+ assertNotNull("CPMasterObserver coprocessor not found or not installed!", cp);
+
+ // check basic lifecycle
+ assertTrue("MasterObserver should have been started", cp.wasStarted());
+ }
+
+ @Test
+ public void testTableOperations() throws Exception {
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+
+ HMaster master = cluster.getMaster();
+ MasterCoprocessorHost host = master.getCoprocessorHost();
+ CPMasterObserver cp = (CPMasterObserver)host.findCoprocessor(
+ CPMasterObserver.class.getName());
+ assertFalse("No table created yet", cp.wasCreateTableCalled());
+
+ // create a table
+ HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
+ htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+ admin.createTable(htd);
+ assertTrue("Test table should be created", cp.wasCreateTableCalled());
+
+ // disable
+ assertFalse(cp.wasDisableTableCalled());
+ admin.disableTable(TEST_TABLE);
+ assertTrue(admin.isTableDisabled(TEST_TABLE));
+ assertTrue("Coprocessor should have been called on table disable",
+ cp.wasDisableTableCalled());
+
+ // modify table
+ htd.setMaxFileSize(512 * 1024 * 1024);
+ admin.modifyTable(TEST_TABLE, htd);
+ assertTrue("Test table should have been modified",
+ cp.wasModifyTableCalled());
+
+ // add a column family
+ admin.addColumn(TEST_TABLE, new HColumnDescriptor(TEST_FAMILY2));
+ assertTrue("New column family should have been added to test table",
+ cp.wasAddColumnCalled());
+
+ // modify a column family
+ HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY2);
+ hcd.setMaxVersions(25);
+ admin.modifyColumn(TEST_TABLE, hcd);
+ assertTrue("Second column family should be modified",
+ cp.wasModifyColumnCalled());
+
+ // enable
+ assertFalse(cp.wasEnableTableCalled());
+ admin.enableTable(TEST_TABLE);
+ assertTrue(admin.isTableEnabled(TEST_TABLE));
+ assertTrue("Coprocessor should have been called on table enable",
+ cp.wasEnableTableCalled());
+
+ // disable again
+ admin.disableTable(TEST_TABLE);
+ assertTrue(admin.isTableDisabled(TEST_TABLE));
+
+ // delete column
+ assertFalse("No column family deleted yet", cp.wasDeleteColumnCalled());
+ admin.deleteColumn(TEST_TABLE, TEST_FAMILY2);
+ HTableDescriptor tableDesc = admin.getTableDescriptor(TEST_TABLE);
+ assertNull("'"+Bytes.toString(TEST_FAMILY2)+"' should have been removed",
+ tableDesc.getFamily(TEST_FAMILY2));
+ assertTrue("Coprocessor should have been called on column delete",
+ cp.wasDeleteColumnCalled());
+
+ // delete table
+ assertFalse("No table deleted yet", cp.wasDeleteTableCalled());
+ admin.deleteTable(TEST_TABLE);
+ assertFalse("Test table should have been deleted",
+ admin.tableExists(TEST_TABLE));
+ assertTrue("Coprocessor should have been called on table delete",
+ cp.wasDeleteTableCalled());
+ }
+
+ @Test
+ public void testRegionTransitionOperations() throws Exception {
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+
+ HMaster master = cluster.getMaster();
+ MasterCoprocessorHost host = master.getCoprocessorHost();
+ CPMasterObserver cp = (CPMasterObserver)host.findCoprocessor(
+ CPMasterObserver.class.getName());
+
+ HTable table = UTIL.createTable(TEST_TABLE, TEST_FAMILY);
+ UTIL.createMultiRegions(table, TEST_FAMILY);
+
+ Map<HRegionInfo,HServerAddress> regions = table.getRegionsInfo();
+ assertFalse(regions.isEmpty());
+ Map.Entry<HRegionInfo,HServerAddress> firstRegion =
+ regions.entrySet().iterator().next();
+
+ // try to force a move
+ Collection<HServerInfo> servers = master.getClusterStatus().getServerInfo();
+ String destName = null;
+ for (HServerInfo info : servers) {
+ if (!info.getServerAddress().equals(firstRegion.getValue())) {
+ destName = info.getServerName();
+ break;
+ }
+ }
+ master.move(firstRegion.getKey().getEncodedNameAsBytes(),
+ Bytes.toBytes(destName));
+ assertTrue("Coprocessor should have been called on region move",
+ cp.wasMoveCalled());
+
+ // make sure balancer is on
+ master.balanceSwitch(true);
+ assertTrue("Coprocessor should have been called on balance switch",
+ cp.wasBalanceSwitchCalled());
+
+ // force region rebalancing
+ master.balanceSwitch(false);
+ // move half the open regions from RS 0 to RS 1
+ HRegionServer rs = cluster.getRegionServer(0);
+ byte[] destRS = Bytes.toBytes(cluster.getRegionServer(1).getServerName());
+ List<HRegionInfo> openRegions = rs.getOnlineRegions();
+ int moveCnt = openRegions.size()/2;
+ for (int i=0; i<moveCnt; i++) {
+ HRegionInfo info = openRegions.get(i);
+ if (!(info.isMetaRegion() || info.isRootRegion())) {
+ master.move(openRegions.get(i).getEncodedNameAsBytes(), destRS);
+ }
+ }
+
+ // wait for assignments to finish
+ AssignmentManager mgr = master.getAssignmentManager();
+ Collection<AssignmentManager.RegionState> transRegions =
+ mgr.getRegionsInTransition().values();
+ for (AssignmentManager.RegionState state : transRegions) {
+ mgr.waitOnRegionToClearRegionsInTransition(state.getRegion());
+ }
+
+ // now trigger a balance
+ master.balanceSwitch(true);
+ boolean balanceRun = master.balance();
+ assertTrue("Balance request should have run", balanceRun);
+ assertTrue("Coprocessor should be called on region rebalancing",
+ cp.wasBalanceCalled());
+ }
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1051639&r1=1051638&r2=1051639&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Tue Dec 21 20:39:26 2010
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
-import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -47,8 +47,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-
import static org.junit.Assert.*;
public class TestRegionObserverInterface {
@@ -77,7 +75,7 @@ public class TestRegionObserverInterface
public static void setupBeforeClass() throws Exception {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
- conf.set("hbase.coprocessor.default.classes",
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver");
util.startMiniCluster(2);
@@ -114,7 +112,7 @@ public class TestRegionObserverInterface
// start a region server here, so just manually create cphost
// and set it to region.
HRegion r = HRegion.createHRegion(info, path, conf);
- CoprocessorHost host = new CoprocessorHost(r, null, conf);
+ RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
host.load(implClass, Priority.USER);
return r;
@@ -145,7 +143,7 @@ public class TestRegionObserverInterface
if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE)) {
continue;
}
- CoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
+ RegionCoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
getCoprocessorHost();
Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
assertNotNull(c);
@@ -175,7 +173,7 @@ public class TestRegionObserverInterface
if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE_2)) {
continue;
}
- CoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
+ RegionCoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
getCoprocessorHost();
Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
assertTrue(((SimpleRegionObserver)c).hadPreIncrement());