You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/09/27 19:45:34 UTC
[3/7] hbase git commit: HBASE-17732 Coprocessor Design Improvements
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
index 5cb87b5..3325ba3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
@@ -19,33 +19,29 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService;
-import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public class RegionServerCoprocessorHost extends
- CoprocessorHost<RegionServerCoprocessorHost.RegionServerEnvironment> {
+ CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
private static final Log LOG = LogFactory.getLog(RegionServerCoprocessorHost.class);
@@ -70,242 +66,149 @@ public class RegionServerCoprocessorHost extends
}
@Override
- public RegionServerEnvironment createEnvironment(Class<?> implClass,
- Coprocessor instance, int priority, int sequence, Configuration conf) {
- return new RegionServerEnvironment(implClass, instance, priority,
- sequence, conf, this.rsServices);
+ public RegionServerEnvironment createEnvironment(
+ RegionServerCoprocessor instance, int priority, int sequence, Configuration conf) {
+ return new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices);
+ }
+
+ @Override
+ public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass)
+ throws InstantiationException, IllegalAccessException {
+ if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) {
+ return (RegionServerCoprocessor)implClass.newInstance();
+ } else if (SingletonCoprocessorService.class.isAssignableFrom(implClass)) {
+ // For backward compatibility with old CoprocessorService impl which don't extend
+ // RegionCoprocessor.
+ return new CoprocessorServiceBackwardCompatiblity.RegionServerCoprocessorService(
+ (SingletonCoprocessorService)implClass.newInstance());
+ } else {
+ LOG.error(implClass.getName() + " is not of type RegionServerCoprocessor. Check the "
+ + "configuration " + CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY);
+ return null;
+ }
+ }
+
+ private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter =
+ RegionServerCoprocessor::getRegionServerObserver;
+
+ abstract class RegionServerObserverOperation extends
+ ObserverOperationWithoutResult<RegionServerObserver> {
+ public RegionServerObserverOperation() {
+ super(rsObserverGetter);
+ }
+
+ public RegionServerObserverOperation(User user) {
+ super(rsObserverGetter, user);
+ }
}
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // RegionServerObserver operations
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
public void preStop(String message, User user) throws IOException {
// While stopping the region server all coprocessors method should be executed first then the
// coprocessor should be cleaned up.
- execShutdown(coprocessors.isEmpty() ? null : new CoprocessorOperation(user) {
+ execShutdown(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation(user) {
@Override
- public void call(RegionServerObserver oserver,
- ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
- oserver.preStopRegionServer(ctx);
+ public void call(RegionServerObserver observer) throws IOException {
+ observer.preStopRegionServer(this);
}
+
@Override
- public void postEnvCall(RegionServerEnvironment env) {
+ public void postEnvCall() {
// invoke coprocessor stop method
- shutdown(env);
+ shutdown(this.getEnvironment());
}
});
}
public void preRollWALWriterRequest() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
- public void call(RegionServerObserver oserver,
- ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
- oserver.preRollWALWriterRequest(ctx);
+ public void call(RegionServerObserver observer) throws IOException {
+ observer.preRollWALWriterRequest(this);
}
});
}
public void postRollWALWriterRequest() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
- public void call(RegionServerObserver oserver,
- ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
- oserver.postRollWALWriterRequest(ctx);
+ public void call(RegionServerObserver observer) throws IOException {
+ observer.postRollWALWriterRequest(this);
}
});
}
public void preReplicateLogEntries()
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
- public void call(RegionServerObserver oserver,
- ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
- oserver.preReplicateLogEntries(ctx);
+ public void call(RegionServerObserver observer) throws IOException {
+ observer.preReplicateLogEntries(this);
}
});
}
public void postReplicateLogEntries()
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
- public void call(RegionServerObserver oserver,
- ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
- oserver.postReplicateLogEntries(ctx);
+ public void call(RegionServerObserver observer) throws IOException {
+ observer.postReplicateLogEntries(this);
}
});
}
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException {
- return execOperationWithResult(endpoint, coprocessors.isEmpty() ? null
- : new CoprocessOperationWithResult<ReplicationEndpoint>() {
- @Override
- public void call(RegionServerObserver oserver,
- ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
- setResult(oserver.postCreateReplicationEndPoint(ctx, getResult()));
- }
- });
+ return execOperationWithResult(endpoint, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
+ rsObserverGetter) {
+ @Override
+ public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
+ return observer.postCreateReplicationEndPoint(this, getResult());
+ }
+ });
}
public void preClearCompactionQueues() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
- public void call(RegionServerObserver oserver,
- ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
- oserver.preClearCompactionQueues(ctx);
+ public void call(RegionServerObserver observer) throws IOException {
+ observer.preClearCompactionQueues(this);
}
});
}
public void postClearCompactionQueues() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
- public void call(RegionServerObserver oserver,
- ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
- oserver.postClearCompactionQueues(ctx);
+ public void call(RegionServerObserver observer) throws IOException {
+ observer.postClearCompactionQueues(this);
}
});
}
- private <T> T execOperationWithResult(final T defaultValue,
- final CoprocessOperationWithResult<T> ctx) throws IOException {
- if (ctx == null)
- return defaultValue;
- ctx.setResult(defaultValue);
- execOperation(ctx);
- return ctx.getResult();
- }
-
- private static abstract class CoprocessorOperation
- extends ObserverContext<RegionServerCoprocessorEnvironment> {
- public CoprocessorOperation() {
- this(RpcServer.getRequestUser());
- }
-
- public CoprocessorOperation(User user) {
- super(user);
- }
-
- public abstract void call(RegionServerObserver oserver,
- ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException;
-
- public void postEnvCall(RegionServerEnvironment env) {
- }
- }
-
- private static abstract class CoprocessOperationWithResult<T> extends CoprocessorOperation {
- private T result = null;
-
- public void setResult(final T result) {
- this.result = result;
- }
-
- public T getResult() {
- return this.result;
- }
- }
-
- private boolean execOperation(final CoprocessorOperation ctx) throws IOException {
- if (ctx == null) return false;
- boolean bypass = false;
- List<RegionServerEnvironment> envs = coprocessors.get();
- for (int i = 0; i < envs.size(); i++) {
- RegionServerEnvironment env = envs.get(i);
- if (env.getInstance() instanceof RegionServerObserver) {
- ctx.prepare(env);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ctx.call((RegionServerObserver)env.getInstance(), ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- if (ctx.shouldComplete()) {
- break;
- }
- }
- ctx.postEnvCall(env);
- }
- return bypass;
- }
-
- /**
- * RegionServer coprocessor classes can be configured in any order, based on that priority is set
- * and chained in a sorted order. For preStop(), coprocessor methods are invoked in call() and
- * environment is shutdown in postEnvCall(). <br>
- * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors
- * may remain shutdown if any exception occurs during next coprocessor execution which prevent
- * RegionServer stop. (Refer:
- * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a>
- * @param ctx CoprocessorOperation
- * @return true if bypaas coprocessor execution, false if not.
- * @throws IOException
- */
- private boolean execShutdown(final CoprocessorOperation ctx) throws IOException {
- if (ctx == null) return false;
- boolean bypass = false;
- List<RegionServerEnvironment> envs = coprocessors.get();
- int envsSize = envs.size();
- // Iterate the coprocessors and execute CoprocessorOperation's call()
- for (int i = 0; i < envsSize; i++) {
- RegionServerEnvironment env = envs.get(i);
- if (env.getInstance() instanceof RegionServerObserver) {
- ctx.prepare(env);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ctx.call((RegionServerObserver) env.getInstance(), ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- if (ctx.shouldComplete()) {
- break;
- }
- }
- }
-
- // Iterate the coprocessors and execute CoprocessorOperation's postEnvCall()
- for (int i = 0; i < envsSize; i++) {
- RegionServerEnvironment env = envs.get(i);
- ctx.postEnvCall(env);
- }
- return bypass;
- }
-
/**
* Coprocessor environment extension providing access to region server
* related services.
*/
- static class RegionServerEnvironment extends CoprocessorHost.Environment
+ private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
implements RegionServerCoprocessorEnvironment {
private final RegionServerServices regionServerServices;
private final MetricRegistry metricRegistry;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
justification="Intentional; FB has trouble detecting isAssignableFrom")
- public RegionServerEnvironment(final Class<?> implClass,
- final Coprocessor impl, final int priority, final int seq,
- final Configuration conf, final RegionServerServices services) {
+ public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority,
+ final int seq, final Configuration conf, final RegionServerServices services) {
super(impl, priority, seq, conf);
this.regionServerServices = services;
- for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
- Class<?> c = (Class<?>) itf;
- if (SingletonCoprocessorService.class.isAssignableFrom(c)) {// FindBugs: BC_UNCONFIRMED_CAST
- this.regionServerServices.registerService(
- ((SingletonCoprocessorService) impl).getService());
- break;
- }
- }
+ impl.getService().ifPresent(regionServerServices::registerService);
this.metricRegistry =
- MetricsCoprocessor.createRegistryForRSCoprocessor(implClass.getName());
+ MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
}
@Override
@@ -319,32 +222,9 @@ public class RegionServerCoprocessorHost extends
}
@Override
- protected void shutdown() {
+ public void shutdown() {
super.shutdown();
MetricsCoprocessor.removeRegistry(metricRegistry);
}
}
-
- /**
- * Environment priority comparator. Coprocessors are chained in sorted
- * order.
- */
- static class EnvironmentPriorityComparator implements
- Comparator<CoprocessorEnvironment> {
- @Override
- public int compare(final CoprocessorEnvironment env1,
- final CoprocessorEnvironment env2) {
- if (env1.getPriority() < env2.getPriority()) {
- return -1;
- } else if (env1.getPriority() > env2.getPriority()) {
- return 1;
- }
- if (env1.getLoadSequence() < env2.getLoadSequence()) {
- return -1;
- } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
- return 1;
- }
- return 0;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index d2b8567..c7d0ead 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -138,38 +140,17 @@ public class SecureBulkLoadManager {
public String prepareBulkLoad(final Region region, final PrepareBulkLoadRequest request)
throws IOException {
- List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
+ region.getCoprocessorHost().prePrepareBulkLoad(getActiveUser());
- if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
- ObserverContext<RegionCoprocessorEnvironment> ctx = new ObserverContext<>(getActiveUser());
- ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
- .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
-
- for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
- bulkLoadObserver.prePrepareBulkLoad(ctx);
- }
- }
-
- String bulkToken =
- createStagingDir(baseStagingDir, getActiveUser(), region.getTableDescriptor().getTableName())
- .toString();
+ String bulkToken = createStagingDir(baseStagingDir, getActiveUser(),
+ region.getTableDescriptor().getTableName()).toString();
return bulkToken;
}
public void cleanupBulkLoad(final Region region, final CleanupBulkLoadRequest request)
throws IOException {
- List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
-
- if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
- ObserverContext<RegionCoprocessorEnvironment> ctx = new ObserverContext<>(getActiveUser());
- ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
- .findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
-
- for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
- bulkLoadObserver.preCleanupBulkLoad(ctx);
- }
- }
+ region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
Path path = new Path(request.getBulkToken());
if (!fs.delete(path, true)) {
@@ -275,13 +256,6 @@ public class SecureBulkLoadManager {
return map;
}
- private List<BulkLoadObserver> getBulkLoadObservers(Region region) {
- List<BulkLoadObserver> coprocessorList =
- region.getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
-
- return coprocessorList;
- }
-
private Path createStagingDir(Path baseDir,
User user,
TableName tableName) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
index b6d23bf..73ba776 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
@@ -21,22 +21,23 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.WALObserver;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Implements the coprocessor environment and runtime support for coprocessors
@@ -44,12 +45,13 @@ import org.apache.hadoop.hbase.wal.WALKey;
*/
@InterfaceAudience.Private
public class WALCoprocessorHost
- extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
+ extends CoprocessorHost<WALCoprocessor, WALCoprocessorEnvironment> {
+ private static final Log LOG = LogFactory.getLog(WALCoprocessorHost.class);
/**
* Encapsulation of the environment of each coprocessor
*/
- static class WALEnvironment extends CoprocessorHost.Environment
+ static class WALEnvironment extends BaseEnvironment<WALCoprocessor>
implements WALCoprocessorEnvironment {
private final WAL wal;
@@ -63,19 +65,18 @@ public class WALCoprocessorHost
/**
* Constructor
- * @param implClass - not used
* @param impl the coprocessor instance
* @param priority chaining priority
* @param seq load sequence
* @param conf configuration
* @param wal WAL
*/
- public WALEnvironment(Class<?> implClass, final Coprocessor impl,
- final int priority, final int seq, final Configuration conf,
- final WAL wal) {
+ private WALEnvironment(final WALCoprocessor impl, final int priority, final int seq,
+ final Configuration conf, final WAL wal) {
super(impl, priority, seq, conf);
this.wal = wal;
- this.metricRegistry = MetricsCoprocessor.createRegistryForWALCoprocessor(implClass.getName());
+ this.metricRegistry = MetricsCoprocessor.createRegistryForWALCoprocessor(
+ impl.getClass().getName());
}
@Override
@@ -84,7 +85,7 @@ public class WALCoprocessorHost
}
@Override
- protected void shutdown() {
+ public void shutdown() {
super.shutdown();
MetricsCoprocessor.removeRegistry(this.metricRegistry);
}
@@ -111,13 +112,34 @@ public class WALCoprocessorHost
}
@Override
- public WALEnvironment createEnvironment(final Class<?> implClass,
- final Coprocessor instance, final int priority, final int seq,
- final Configuration conf) {
- return new WALEnvironment(implClass, instance, priority, seq, conf,
- this.wal);
+ public WALEnvironment createEnvironment(final WALCoprocessor instance, final int priority,
+ final int seq, final Configuration conf) {
+ return new WALEnvironment(instance, priority, seq, conf, this.wal);
+ }
+
+ @Override
+ public WALCoprocessor checkAndGetInstance(Class<?> implClass)
+ throws InstantiationException, IllegalAccessException {
+ if (WALCoprocessor.class.isAssignableFrom(implClass)) {
+ return (WALCoprocessor)implClass.newInstance();
+ } else {
+ LOG.error(implClass.getName() + " is not of type WALCoprocessor. Check the "
+ + "configuration " + CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
+ return null;
+ }
}
+ private ObserverGetter<WALCoprocessor, WALObserver> walObserverGetter =
+ WALCoprocessor::getWALObserver;
+
+ abstract class WALObserverOperation extends
+ ObserverOperationWithoutResult<WALObserver> {
+ public WALObserverOperation() {
+ super(walObserverGetter);
+ }
+ }
+
+
/**
* @param info
* @param logKey
@@ -127,32 +149,13 @@ public class WALCoprocessorHost
*/
public boolean preWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
- boolean bypass = false;
- if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass;
- ObserverContext<WALCoprocessorEnvironment> ctx = null;
- List<WALEnvironment> envs = coprocessors.get();
- for (int i = 0; i < envs.size(); i++) {
- WALEnvironment env = envs.get(i);
- if (env.getInstance() instanceof WALObserver) {
- final WALObserver observer = (WALObserver)env.getInstance();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- observer.preWALWrite(ctx, info, logKey, logEdit);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(false, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<WALObserver, Boolean>(walObserverGetter) {
+ @Override
+ public Boolean call(WALObserver oserver) throws IOException {
+ return oserver.preWALWrite(this, info, logKey, logEdit);
}
- }
- return bypass;
+ });
}
/**
@@ -163,29 +166,12 @@ public class WALCoprocessorHost
*/
public void postWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
- if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
- ObserverContext<WALCoprocessorEnvironment> ctx = null;
- List<WALEnvironment> envs = coprocessors.get();
- for (int i = 0; i < envs.size(); i++) {
- WALEnvironment env = envs.get(i);
- if (env.getInstance() instanceof WALObserver) {
- final WALObserver observer = (WALObserver)env.getInstance();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- observer.postWALWrite(ctx, info, logKey, logEdit);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
+ @Override
+ protected void call(WALObserver observer) throws IOException {
+ observer.postWALWrite(this, info, logKey, logEdit);
}
- }
+ });
}
/**
@@ -194,29 +180,12 @@ public class WALCoprocessorHost
* @param newPath the path of the wal we are going to create
*/
public void preWALRoll(Path oldPath, Path newPath) throws IOException {
- if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
- ObserverContext<WALCoprocessorEnvironment> ctx = null;
- List<WALEnvironment> envs = coprocessors.get();
- for (int i = 0; i < envs.size(); i++) {
- WALEnvironment env = envs.get(i);
- if (env.getInstance() instanceof WALObserver) {
- final WALObserver observer = (WALObserver)env.getInstance();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- observer.preWALRoll(ctx, oldPath, newPath);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
+ @Override
+ protected void call(WALObserver observer) throws IOException {
+ observer.preWALRoll(this, oldPath, newPath);
}
- }
+ });
}
/**
@@ -225,28 +194,11 @@ public class WALCoprocessorHost
* @param newPath the path of the wal we have created and now is the current
*/
public void postWALRoll(Path oldPath, Path newPath) throws IOException {
- if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
- ObserverContext<WALCoprocessorEnvironment> ctx = null;
- List<WALEnvironment> envs = coprocessors.get();
- for (int i = 0; i < envs.size(); i++) {
- WALEnvironment env = envs.get(i);
- if (env.getInstance() instanceof WALObserver) {
- final WALObserver observer = (WALObserver)env.getInstance();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- observer.postWALRoll(ctx, oldPath, newPath);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
+ @Override
+ protected void call(WALObserver observer) throws IOException {
+ observer.postWALRoll(this, oldPath, newPath);
}
- }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
index fdb951b..32ec617 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
@@ -21,11 +21,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
@@ -40,10 +42,15 @@ import org.apache.hadoop.hbase.util.Pair;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class ReplicationObserver implements RegionObserver {
+public class ReplicationObserver implements RegionCoprocessor, RegionObserver {
private static final Log LOG = LogFactory.getLog(ReplicationObserver.class);
@Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
RegionCoprocessorEnvironment env = ctx.getEnvironment();
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 7081ea1..d66b754 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -74,13 +75,15 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
@@ -169,8 +172,10 @@ import org.apache.yetus.audience.InterfaceAudience;
* </p>
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class AccessController implements MasterObserver, RegionObserver, RegionServerObserver,
- AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
+public class AccessController implements MasterCoprocessor, RegionCoprocessor,
+ RegionServerCoprocessor, AccessControlService.Interface,
+ MasterObserver, RegionObserver, RegionServerObserver, EndpointObserver, BulkLoadObserver {
+ // TODO: encapsulate observer functions into separate class/sub-class.
private static final Log LOG = LogFactory.getLog(AccessController.class);
@@ -987,6 +992,39 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
}
}
+ /*********************************** Observer/Service Getters ***********************************/
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public Optional<EndpointObserver> getEndpointObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public Optional<BulkLoadObserver> getBulkLoadObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public Optional<RegionServerObserver> getRegionServerObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public Optional<Service> getService() {
+ return Optional.of(AccessControlProtos.AccessControlService.newReflectiveService(this));
+ }
+
+ /*********************************** Observer implementations ***********************************/
+
@Override
public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
TableDescriptor desc, RegionInfo[] regions) throws IOException {
@@ -2448,11 +2486,6 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
done.run(response);
}
- @Override
- public Service getService() {
- return AccessControlProtos.AccessControlService.newReflectiveService(this);
- }
-
private Region getRegion(RegionCoprocessorEnvironment e) {
return e.getRegion();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
index 0b765d7..5b4acbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/CoprocessorWhitelistMasterObserver.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.security.access;
import java.io.IOException;
import java.util.Collection;
+import java.util.Optional;
import java.util.regex.Matcher;
import org.apache.commons.io.FilenameUtils;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -43,7 +45,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Master observer for restricting coprocessor assignments.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class CoprocessorWhitelistMasterObserver implements MasterObserver {
+public class CoprocessorWhitelistMasterObserver implements MasterCoprocessor, MasterObserver {
public static final String CP_COPROCESSOR_WHITELIST_PATHS_KEY =
"hbase.coprocessor.region.whitelist.paths";
@@ -52,6 +54,11 @@ public class CoprocessorWhitelistMasterObserver implements MasterObserver {
.getLog(CoprocessorWhitelistMasterObserver.class);
@Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName, TableDescriptor htd) throws IOException {
verifyCoprocessors(ctx, htd);
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
index 50b8765..4b1f28e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenProvider.java
@@ -19,13 +19,12 @@
package org.apache.hadoop.hbase.security.token;
import java.io.IOException;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcServer;
@@ -42,6 +41,7 @@ import org.apache.hadoop.security.token.Token;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Provides a service for obtaining authentication tokens via the
@@ -49,7 +49,7 @@ import com.google.protobuf.Service;
*/
@InterfaceAudience.Private
public class TokenProvider implements AuthenticationProtos.AuthenticationService.Interface,
- Coprocessor, CoprocessorService {
+ RegionCoprocessor {
private static final Log LOG = LogFactory.getLog(TokenProvider.class);
@@ -96,8 +96,8 @@ public class TokenProvider implements AuthenticationProtos.AuthenticationService
// AuthenticationService implementation
@Override
- public Service getService() {
- return AuthenticationProtos.AuthenticationService.newReflectiveService(this);
+ public Optional<Service> getService() {
+ return Optional.of(AuthenticationProtos.AuthenticationService.newReflectiveService(this));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index b3b1bc4..671e989 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -64,14 +65,13 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.filter.Filter;
@@ -101,7 +101,6 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
@@ -122,8 +121,9 @@ import com.google.protobuf.Service;
* visibility labels
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class VisibilityController implements MasterObserver, RegionObserver,
- VisibilityLabelsService.Interface, CoprocessorService {
+// TODO: break out Observer functions into separate class/sub-class.
+public class VisibilityController implements MasterCoprocessor, RegionCoprocessor,
+ VisibilityLabelsService.Interface, MasterObserver, RegionObserver {
private static final Log LOG = LogFactory.getLog(VisibilityController.class);
private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."
@@ -176,10 +176,6 @@ public class VisibilityController implements MasterObserver, RegionObserver,
+ " accordingly.");
}
- if (env instanceof RegionServerCoprocessorEnvironment) {
- throw new RuntimeException("Visibility controller should not be configured as "
- + "'hbase.coprocessor.regionserver.classes'.");
- }
// Do not create for master CPs
if (!(env instanceof MasterCoprocessorEnvironment)) {
visibilityLabelService = VisibilityLabelServiceManager.getInstance()
@@ -192,6 +188,22 @@ public class VisibilityController implements MasterObserver, RegionObserver,
}
+ /**************************** Observer/Service Getters ************************************/
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public Optional<Service> getService() {
+ return Optional.of(VisibilityLabelsProtos.VisibilityLabelsService.newReflectiveService(this));
+ }
+
/********************************* Master related hooks **********************************/
@Override
@@ -761,11 +773,6 @@ public class VisibilityController implements MasterObserver, RegionObserver,
}
@Override
- public Service getService() {
- return VisibilityLabelsProtos.VisibilityLabelsService.newReflectiveService(this);
- }
-
- @Override
public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
final InternalScanner s, final Cell curRowCell, final boolean hasMore) throws IOException {
// 'default' in RegionObserver might do unnecessary copy for Off heap backed Cells.
@@ -1087,35 +1094,6 @@ public class VisibilityController implements MasterObserver, RegionObserver,
}
/**
- * A RegionServerObserver impl that provides the custom
- * VisibilityReplicationEndpoint. This class should be configured as the
- * 'hbase.coprocessor.regionserver.classes' for the visibility tags to be
- * replicated as string. The value for the configuration should be
- * 'org.apache.hadoop.hbase.security.visibility.VisibilityController$VisibilityReplication'.
- */
- public static class VisibilityReplication implements RegionServerObserver {
- private Configuration conf;
- private VisibilityLabelService visibilityLabelService;
-
- @Override
- public void start(CoprocessorEnvironment env) throws IOException {
- this.conf = env.getConfiguration();
- visibilityLabelService = VisibilityLabelServiceManager.getInstance()
- .getVisibilityLabelService(this.conf);
- }
-
- @Override
- public void stop(CoprocessorEnvironment env) throws IOException {
- }
-
- @Override
- public ReplicationEndpoint postCreateReplicationEndPoint(
- ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
- return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
- }
- }
-
- /**
* @param t
* @return NameValuePair of the exception name to stringified version os exception.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplication.java
new file mode 100644
index 0000000..6887c31
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplication.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.visibility;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * A RegionServerObserver impl that provides the custom
+ * VisibilityReplicationEndpoint. This class should be configured as the
+ * 'hbase.coprocessor.regionserver.classes' for the visibility tags to be
+ * replicated as string. The value for the configuration should be
+ * 'org.apache.hadoop.hbase.security.visibility.VisibilityController$VisibilityReplication'.
+ */
+public class VisibilityReplication implements RegionServerCoprocessor, RegionServerObserver {
+ private Configuration conf;
+ private VisibilityLabelService visibilityLabelService;
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ this.conf = env.getConfiguration();
+ visibilityLabelService = VisibilityLabelServiceManager.getInstance()
+ .getVisibilityLabelService(this.conf);
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ }
+
+ @Override public Optional<RegionServerObserver> getRegionServerObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public ReplicationEndpoint postCreateReplicationEndPoint(
+ ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
+ return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java
index 5ec61d4..60fd22d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/WriteSinkCoprocessor.java
@@ -23,12 +23,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -58,9 +60,15 @@ import java.util.concurrent.atomic.AtomicLong;
* 0 row(s) in 0.0050 seconds
* </p>
*/
-public class WriteSinkCoprocessor implements RegionObserver {
+public class WriteSinkCoprocessor implements RegionCoprocessor, RegionObserver {
private static final Log LOG = LogFactory.getLog(WriteSinkCoprocessor.class);
private final AtomicLong ops = new AtomicLong();
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
private String regionName;
@Override
@@ -68,7 +76,6 @@ public class WriteSinkCoprocessor implements RegionObserver {
regionName = e.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString();
}
-
@Override
public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Mutation> miniBatchOp)
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 682709e..cfe4d1f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -159,13 +161,18 @@ public class HConnectionTestingUtility {
/**
* This coproceesor sleep 2s at first increment/append rpc call.
*/
- public static class SleepAtFirstRpcCall implements RegionObserver {
+ public static class SleepAtFirstRpcCall implements RegionCoprocessor, RegionObserver {
static final AtomicLong ct = new AtomicLong(0);
static final String SLEEP_TIME_CONF_KEY =
"hbase.coprocessor.SleepAtFirstRpcCall.sleepTime";
static final long DEFAULT_SLEEP_TIME = 2000;
static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
public SleepAtFirstRpcCall() {
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
index 350bf6e..05324aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +36,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -170,10 +172,15 @@ public class TestAsyncAdminBuilder {
}
}
- public static class TestRpcTimeoutCoprocessor implements MasterObserver {
+ public static class TestRpcTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
public TestRpcTimeoutCoprocessor() {
}
+
+ @Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
@Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
String namespace) throws IOException {
@@ -181,13 +188,18 @@ public class TestAsyncAdminBuilder {
}
}
- public static class TestOperationTimeoutCoprocessor implements MasterObserver {
+ public static class TestOperationTimeoutCoprocessor implements MasterCoprocessor, MasterObserver {
AtomicLong sleepTime = new AtomicLong(0);
public TestOperationTimeoutCoprocessor() {
}
@Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
String namespace) throws IOException {
Threads.sleep(DEFAULT_RPC_TIMEOUT / 2);
@@ -197,13 +209,18 @@ public class TestAsyncAdminBuilder {
}
}
- public static class TestMaxRetriesCoprocessor implements MasterObserver {
+ public static class TestMaxRetriesCoprocessor implements MasterCoprocessor, MasterObserver {
AtomicLong retryNum = new AtomicLong(0);
public TestMaxRetriesCoprocessor() {
}
@Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
String namespace) throws IOException {
if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
index 70df318..efa2c1e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -73,7 +75,12 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit {
private static AtomicInteger MAX_CONCURRENCY = new AtomicInteger(0);
- public static final class CountingRegionObserver implements RegionObserver {
+ public static final class CountingRegionObserver implements RegionCoprocessor, RegionObserver {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
index e4c343a..8a341b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTimeout.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -65,7 +67,11 @@ public class TestAsyncRegionLocatorTimeout {
private static volatile long SLEEP_MS = 0L;
- public static class SleepRegionObserver implements RegionObserver {
+ public static class SleepRegionObserver implements RegionCoprocessor, RegionObserver {
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index 6c9dd86..fce9041 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -28,6 +28,7 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -202,7 +204,12 @@ public class TestAsyncTableBatch {
assertEquals(4, Bytes.toInt(appendValue, 8));
}
- public static final class ErrorInjectObserver implements RegionObserver {
+ public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
index b389d9e..30fe731 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -253,7 +255,12 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
}
}
- public static class CompactorRegionObserver implements RegionObserver {
+ public static class CompactorRegionObserver implements RegionCoprocessor, RegionObserver {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
index e1b31e7..d558307 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -1549,7 +1551,7 @@ public class TestBlockEvictionFromClient {
}
}
- public static class CustomInnerRegionObserver implements RegionObserver {
+ public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
static final AtomicLong sleepTime = new AtomicLong(0);
static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
static final AtomicInteger countOfNext = new AtomicInteger(0);
@@ -1560,6 +1562,11 @@ public class TestBlockEvictionFromClient {
new CountDownLatch(0));
@Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
slowdownCode(e, false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
index 62ceca3..e92ba23 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -45,6 +46,7 @@ import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@Category({MediumTests.class, ClientTests.class})
@@ -58,7 +60,12 @@ public class TestClientOperationInterrupt {
private static final byte[] test = Bytes.toBytes("test");
private static Configuration conf;
- public static class TestCoprocessor implements RegionObserver {
+ public static class TestCoprocessor implements RegionCoprocessor, RegionObserver {
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
index 379ab31..6b03594 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -189,11 +191,16 @@ public class TestEnableTable {
}
}
- public static class MasterSyncObserver implements MasterObserver {
+ public static class MasterSyncObserver implements MasterCoprocessor, MasterObserver {
volatile CountDownLatch tableCreationLatch = null;
volatile CountDownLatch tableDeletionLatch = null;
@Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void postCompletedCreateTableAction(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableDescriptor desc,
@@ -222,8 +229,8 @@ public class TestEnableTable {
throws Exception {
// NOTE: We need a latch because admin is not sync,
// so the postOp coprocessor method may be called after the admin operation returned.
- MasterSyncObserver observer = (MasterSyncObserver)testUtil.getHBaseCluster().getMaster()
- .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class.getName());
+ MasterSyncObserver observer = testUtil.getHBaseCluster().getMaster()
+ .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class);
observer.tableCreationLatch = new CountDownLatch(1);
Admin admin = testUtil.getAdmin();
if (splitKeys != null) {
@@ -240,8 +247,8 @@ public class TestEnableTable {
throws Exception {
// NOTE: We need a latch because admin is not sync,
// so the postOp coprocessor method may be called after the admin operation returned.
- MasterSyncObserver observer = (MasterSyncObserver)testUtil.getHBaseCluster().getMaster()
- .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class.getName());
+ MasterSyncObserver observer = testUtil.getHBaseCluster().getMaster()
+ .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class);
observer.tableDeletionLatch = new CountDownLatch(1);
Admin admin = testUtil.getAdmin();
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 10169ab..a938db6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@@ -543,7 +545,7 @@ public class TestFromClientSide {
* This is a coprocessor to inject a test failure so that a store scanner.reseek() call will
* fail with an IOException() on the first call.
*/
- public static class ExceptionInReseekRegionObserver implements RegionObserver {
+ public static class ExceptionInReseekRegionObserver implements RegionCoprocessor, RegionObserver {
static AtomicLong reqCount = new AtomicLong(0);
static AtomicBoolean isDoNotRetry = new AtomicBoolean(false); // whether to throw DNRIOE
static AtomicBoolean throwOnce = new AtomicBoolean(true); // whether to only throw once
@@ -554,6 +556,11 @@ public class TestFromClientSide {
throwOnce.set(true);
}
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
class MyStoreScanner extends StoreScanner {
public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 89ea5b7..ca0a5ea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -688,7 +690,7 @@ public class TestFromClientSide3 {
private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception {
HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addCoprocessor(WatiingForScanObserver.class.getName());
+ desc.addCoprocessor(WaitingForScanObserver.class.getName());
desc.addFamily(new HColumnDescriptor(FAMILY));
TEST_UTIL.getAdmin().createTable(desc);
ExecutorService service = Executors.newFixedThreadPool(2);
@@ -720,7 +722,7 @@ public class TestFromClientSide3 {
public void testLockLeakWithDelta() throws Exception, Throwable {
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
+ desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
desc.addFamily(new HColumnDescriptor(FAMILY));
TEST_UTIL.getAdmin().createTable(desc);
@@ -735,7 +737,7 @@ public class TestFromClientSide3 {
try (Table table = con.getTable(tableName)) {
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE);
- // the put will be blocked by WatiingForMultiMutationsObserver.
+ // the put will be blocked by WaitingForMultiMutationsObserver.
table.put(put);
} catch (IOException ex) {
throw new RuntimeException(ex);
@@ -753,7 +755,7 @@ public class TestFromClientSide3 {
});
appendService.shutdown();
appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
- WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
+ WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class);
observer.latch.countDown();
putService.shutdown();
putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
@@ -774,7 +776,7 @@ public class TestFromClientSide3 {
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
- desc.addCoprocessor(WatiingForMultiMutationsObserver.class.getName());
+ desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
desc.addFamily(new HColumnDescriptor(FAMILY));
TEST_UTIL.getAdmin().createTable(desc);
@@ -793,7 +795,7 @@ public class TestFromClientSide3 {
try (Table table = con.getTable(tableName)) {
Put put0 = new Put(rowLocked);
put0.addColumn(FAMILY, QUALIFIER, value0);
- // the put will be blocked by WatiingForMultiMutationsObserver.
+ // the put will be blocked by WaitingForMultiMutationsObserver.
table.put(put0);
} catch (IOException ex) {
throw new RuntimeException(ex);
@@ -830,7 +832,7 @@ public class TestFromClientSide3 {
});
cpService.shutdown();
cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
- WatiingForMultiMutationsObserver observer = find(tableName, WatiingForMultiMutationsObserver.class);
+ WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class);
observer.latch.countDown();
putService.shutdown();
putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
@@ -975,8 +977,15 @@ public class TestFromClientSide3 {
return clz.cast(cp);
}
- public static class WatiingForMultiMutationsObserver implements RegionObserver {
+ public static class WaitingForMultiMutationsObserver
+ implements RegionCoprocessor, RegionObserver {
final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -988,8 +997,14 @@ public class TestFromClientSide3 {
}
}
- public static class WatiingForScanObserver implements RegionObserver {
+ public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
private final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 56c8c7c..1a67457 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -31,6 +31,7 @@ import java.lang.reflect.Modifier;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
@@ -118,7 +120,7 @@ public class TestHCM {
/**
* This copro sleeps 20 second. The first call it fails. The second time, it works.
*/
- public static class SleepAndFailFirstTime implements RegionObserver {
+ public static class SleepAndFailFirstTime implements RegionCoprocessor, RegionObserver {
static final AtomicLong ct = new AtomicLong(0);
static final String SLEEP_TIME_CONF_KEY =
"hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
@@ -129,6 +131,11 @@ public class TestHCM {
}
@Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
@@ -175,8 +182,14 @@ public class TestHCM {
}
- public static class SleepCoprocessor implements RegionObserver {
+ public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver {
public static final int SLEEP_TIME = 5000;
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
@@ -204,9 +217,15 @@ public class TestHCM {
}
- public static class SleepLongerAtFirstCoprocessor implements RegionObserver {
+ public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver {
public static final int SLEEP_TIME = 2000;
static final AtomicLong ct = new AtomicLong(0);
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
index b938f7e..1745c82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.Optional;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
@@ -137,7 +139,13 @@ public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient
/**
* This coprocessor is used to delay the flush.
*/
- public static class DelayFlushCoprocessor implements RegionObserver {
+ public static class DelayFlushCoprocessor implements RegionCoprocessor, RegionObserver {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
if (delayFlush) {