You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/09/27 19:45:35 UTC
[4/7] hbase git commit: HBASE-17732 Coprocessor Design Improvements
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java
index c59e5e7..d6dbcd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterSpaceQuotaObserver.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hbase.quotas;
import java.io.IOException;
+import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -24,6 +25,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -35,7 +37,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
* are deleted.
*/
@InterfaceAudience.Private
-public class MasterSpaceQuotaObserver implements MasterObserver {
+public class MasterSpaceQuotaObserver implements MasterCoprocessor, MasterObserver {
public static final String REMOVE_QUOTA_ON_TABLE_DELETE = "hbase.quota.remove.on.table.delete";
public static final boolean REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT = true;
@@ -44,6 +46,11 @@ public class MasterSpaceQuotaObserver implements MasterObserver {
private boolean quotasEnabled = false;
@Override
+ public Optional<MasterObserver> getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void start(CoprocessorEnvironment ctx) throws IOException {
this.cpEnv = ctx;
this.conf = cpEnv.getConfiguration();
http://git-wip-us.apache.org/repos/asf/hbase/blob/97513466/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 19491b4..84e9aa5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -33,7 +33,6 @@ import com.google.protobuf.Message;
import com.google.protobuf.Service;
import org.apache.commons.collections4.map.AbstractReferenceMap;
import org.apache.commons.collections4.map.ReferenceMap;
-import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -56,11 +55,15 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
+import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
@@ -68,7 +71,6 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
@@ -91,7 +93,7 @@ import org.apache.yetus.audience.InterfaceStability;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public class RegionCoprocessorHost
- extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
+ extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
// The shared data map
@@ -103,10 +105,10 @@ public class RegionCoprocessorHost
private final boolean hasCustomPostScannerFilterRow;
/**
- *
+ *
* Encapsulation of the environment of each coprocessor
*/
- static class RegionEnvironment extends CoprocessorHost.Environment
+ static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
implements RegionCoprocessorEnvironment {
private Region region;
@@ -119,7 +121,7 @@ public class RegionCoprocessorHost
* @param impl the coprocessor instance
* @param priority chaining priority
*/
- public RegionEnvironment(final Coprocessor impl, final int priority,
+ public RegionEnvironment(final RegionCoprocessor impl, final int priority,
final int seq, final Configuration conf, final Region region,
final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
super(impl, priority, seq, conf);
@@ -142,6 +144,7 @@ public class RegionCoprocessorHost
return rsServices;
}
+ @Override
public void shutdown() {
super.shutdown();
MetricsCoprocessor.removeRegistry(this.metricRegistry);
@@ -226,7 +229,7 @@ public class RegionCoprocessorHost
// now check whether any coprocessor implements postScannerFilterRow
boolean hasCustomPostScannerFilterRow = false;
- out: for (RegionEnvironment env: coprocessors) {
+ out: for (RegionCoprocessorEnvironment env: coprocEnvironments) {
if (env.getInstance() instanceof RegionObserver) {
Class<?> clazz = env.getInstance().getClass();
for(;;) {
@@ -361,13 +364,16 @@ public class RegionCoprocessorHost
// scan the table attributes for coprocessor load specifications
// initialize the coprocessors
- List<RegionEnvironment> configured = new ArrayList<>();
+ List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
region.getTableDescriptor())) {
// Load encompasses classloading and coprocessor initialization
try {
- RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
- attr.getConf());
+ RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(),
+ attr.getPriority(), attr.getConf());
+ if (env == null) {
+ continue;
+ }
configured.add(env);
LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
@@ -381,60 +387,101 @@ public class RegionCoprocessorHost
}
}
// add together to coprocessor set for COW efficiency
- coprocessors.addAll(configured);
+ coprocEnvironments.addAll(configured);
}
@Override
- public RegionEnvironment createEnvironment(Class<?> implClass,
- Coprocessor instance, int priority, int seq, Configuration conf) {
- // Check if it's an Endpoint.
- // Due to current dynamic protocol design, Endpoint
- // uses a different way to be registered and executed.
- // It uses a visitor pattern to invoke registered Endpoint
- // method.
- for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
- Class<?> c = (Class<?>) itf;
- if (CoprocessorService.class.isAssignableFrom(c)) {
- region.registerService( ((CoprocessorService)instance).getService() );
- }
- }
+ public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
+ Configuration conf) {
+ // Due to current dynamic protocol design, Endpoint uses a different way to be registered and
+ // executed. It uses a visitor pattern to invoke registered Endpoint method.
+ instance.getService().ifPresent(region::registerService);
ConcurrentMap<String, Object> classData;
// make sure only one thread can add maps
synchronized (SHARED_DATA_MAP) {
// as long as at least one RegionEnvironment holds on to its classData it will
// remain in this map
classData =
- SHARED_DATA_MAP.computeIfAbsent(implClass.getName(), k -> new ConcurrentHashMap<>());
+ SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
+ k -> new ConcurrentHashMap<>());
}
return new RegionEnvironment(instance, priority, seq, conf, region,
rsServices, classData);
}
+ @Override
+ public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
+ throws InstantiationException, IllegalAccessException {
+ if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
+ return (RegionCoprocessor)implClass.newInstance();
+ } else if (CoprocessorService.class.isAssignableFrom(implClass)) {
+ // For backward compatibility with old CoprocessorService impl which don't extend
+ // RegionCoprocessor.
+ return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(
+ (CoprocessorService)implClass.newInstance());
+ } else {
+ LOG.error(implClass.getName() + " is not of type RegionCoprocessor. Check the "
+ + "configuration " + CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
+ return null;
+ }
+ }
+
+ private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
+ RegionCoprocessor::getRegionObserver;
+
+ private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
+ RegionCoprocessor::getEndpointObserver;
+
+ abstract class RegionObserverOperation extends ObserverOperationWithoutResult<RegionObserver> {
+ public RegionObserverOperation() {
+ super(regionObserverGetter);
+ }
+
+ public RegionObserverOperation(User user) {
+ super(regionObserverGetter, user);
+ }
+ }
+
+ abstract class BulkLoadObserverOperation extends
+ ObserverOperationWithoutResult<BulkLoadObserver> {
+ public BulkLoadObserverOperation(User user) {
+ super(RegionCoprocessor::getBulkLoadObserver, user);
+ }
+ }
+
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // Observer operations
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // Observer operations
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
/**
* Invoked before a region open.
*
* @throws IOException Signals that an I/O exception has occurred.
*/
public void preOpen() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preOpen(ctx);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preOpen(this);
}
});
}
+
/**
* Invoked after a region open
*/
public void postOpen() {
try {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postOpen(ctx);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postOpen(this);
}
});
} catch (IOException e) {
@@ -447,11 +494,10 @@ public class RegionCoprocessorHost
*/
public void postLogReplay() {
try {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postLogReplay(ctx);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postLogReplay(this);
}
});
} catch (IOException e) {
@@ -464,11 +510,10 @@ public class RegionCoprocessorHost
* @param abortRequested true if the server is aborting
*/
public void preClose(final boolean abortRequested) throws IOException {
- execOperation(false, new RegionOperation() {
+ execOperation(false, new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preClose(ctx, abortRequested);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preClose(this, abortRequested);
}
});
}
@@ -479,14 +524,15 @@ public class RegionCoprocessorHost
*/
public void postClose(final boolean abortRequested) {
try {
- execOperation(false, new RegionOperation() {
+ execOperation(false, new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postClose(ctx, abortRequested);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postClose(this, abortRequested);
}
- public void postEnvCall(RegionEnvironment env) {
- shutdown(env);
+
+ @Override
+ public void postEnvCall() {
+ shutdown(this.getEnvironment());
}
});
} catch (IOException e) {
@@ -499,18 +545,19 @@ public class RegionCoprocessorHost
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionLifeCycleTracker, long)}
*/
- public InternalScanner preCompactScannerOpen(HStore store, List<StoreFileScanner> scanners,
- ScanType scanType, long earliestPutTs, CompactionLifeCycleTracker tracker, User user,
- long readPoint) throws IOException {
- return execOperationWithResult(null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
- earliestPutTs, getResult(), tracker, readPoint));
- }
- });
+ public InternalScanner preCompactScannerOpen(final HStore store,
+ final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
+ final CompactionLifeCycleTracker tracker, final User user, final long readPoint)
+ throws IOException {
+ return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, InternalScanner>(
+ regionObserverGetter, user) {
+ @Override
+ public InternalScanner call(RegionObserver observer) throws IOException {
+ return observer.preCompactScannerOpen(this, store, scanners, scanType,
+ earliestPutTs, getResult(), tracker, readPoint);
+ }
+ });
}
/**
@@ -522,13 +569,12 @@ public class RegionCoprocessorHost
* @return If {@code true}, skip the normal selection process and use the current list
* @throws IOException
*/
- public boolean preCompactSelection(HStore store, List<HStoreFile> candidates,
- CompactionLifeCycleTracker tracker, User user) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
+ public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
+ final CompactionLifeCycleTracker tracker, final User user) throws IOException {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preCompactSelection(ctx, store, candidates, tracker);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preCompactSelection(this, store, candidates, tracker);
}
});
}
@@ -540,13 +586,12 @@ public class RegionCoprocessorHost
* @param selected The store files selected to compact
* @param tracker used to track the life cycle of a compaction
*/
- public void postCompactSelection(HStore store, ImmutableList<HStoreFile> selected,
- CompactionLifeCycleTracker tracker, User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
+ public void postCompactSelection(final HStore store, final ImmutableList<HStoreFile> selected,
+ final CompactionLifeCycleTracker tracker, final User user) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postCompactSelection(ctx, store, selected, tracker);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postCompactSelection(this, store, selected, tracker);
}
});
}
@@ -559,16 +604,17 @@ public class RegionCoprocessorHost
* @param tracker used to track the life cycle of a compaction
* @throws IOException
*/
- public InternalScanner preCompact(HStore store, InternalScanner scanner, ScanType scanType,
- CompactionLifeCycleTracker tracker, User user) throws IOException {
- return execOperationWithResult(false, scanner,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preCompact(ctx, store, getResult(), scanType, tracker));
- }
- });
+ public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
+ final ScanType scanType, final CompactionLifeCycleTracker tracker, final User user)
+ throws IOException {
+ return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, InternalScanner>(
+ regionObserverGetter, user) {
+ @Override
+ public InternalScanner call(RegionObserver observer) throws IOException {
+ return observer.preCompact(this, store, getResult(), scanType, tracker);
+ }
+ });
}
/**
@@ -578,13 +624,12 @@ public class RegionCoprocessorHost
* @param tracker used to track the life cycle of a compaction
* @throws IOException
*/
- public void postCompact(HStore store, HStoreFile resultFile, CompactionLifeCycleTracker tracker,
- User user) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
+ public void postCompact(final HStore store, final HStoreFile resultFile,
+ final CompactionLifeCycleTracker tracker, final User user) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postCompact(ctx, store, resultFile, tracker);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postCompact(this, store, resultFile, tracker);
}
});
}
@@ -595,14 +640,13 @@ public class RegionCoprocessorHost
*/
public InternalScanner preFlush(HStore store, final InternalScanner scanner)
throws IOException {
- return execOperationWithResult(false, scanner,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preFlush(ctx, store, getResult()));
- }
- });
+ return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter) {
+ @Override
+ public InternalScanner call(RegionObserver observer) throws IOException {
+ return observer.preFlush(this, store, getResult());
+ }
+ });
}
/**
@@ -610,11 +654,10 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void preFlush() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preFlush(ctx);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preFlush(this);
}
});
}
@@ -623,16 +666,15 @@ public class RegionCoprocessorHost
* See
* {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
*/
- public InternalScanner preFlushScannerOpen(HStore store, List<KeyValueScanner> scanners,
- long readPoint) throws IOException {
- return execOperationWithResult(null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint));
- }
- });
+ public InternalScanner preFlushScannerOpen(final HStore store,
+ final List<KeyValueScanner> scanners, final long readPoint) throws IOException {
+ return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter) {
+ @Override
+ public InternalScanner call(RegionObserver observer) throws IOException {
+ return observer.preFlushScannerOpen(this, store, scanners, getResult(), readPoint);
+ }
+ });
}
/**
@@ -640,11 +682,10 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void postFlush() throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postFlush(ctx);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postFlush(this);
}
});
}
@@ -653,12 +694,11 @@ public class RegionCoprocessorHost
* Invoked after a memstore flush
* @throws IOException
*/
- public void postFlush(HStore store, HStoreFile storeFile) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ public void postFlush(final HStore store, final HStoreFile storeFile) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postFlush(ctx, store, storeFile);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postFlush(this, store, storeFile);
}
});
}
@@ -671,11 +711,10 @@ public class RegionCoprocessorHost
*/
public boolean preGet(final Get get, final List<Cell> results)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preGetOp(ctx, get, results);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preGetOp(this, get, results);
}
});
}
@@ -687,11 +726,10 @@ public class RegionCoprocessorHost
*/
public void postGet(final Get get, final List<Cell> results)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postGetOp(ctx, get, results);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postGetOp(this, get, results);
}
});
}
@@ -703,14 +741,13 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public Boolean preExists(final Get get) throws IOException {
- return execOperationWithResult(true, false,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preExists(ctx, get, getResult()));
- }
- });
+ return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preExists(this, get, getResult());
+ }
+ });
}
/**
@@ -721,14 +758,13 @@ public class RegionCoprocessorHost
*/
public boolean postExists(final Get get, boolean exists)
throws IOException {
- return execOperationWithResult(exists,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postExists(ctx, get, getResult()));
- }
- });
+ return execOperationWithResult(exists, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.postExists(this, get, getResult());
+ }
+ });
}
/**
@@ -740,11 +776,10 @@ public class RegionCoprocessorHost
*/
public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.prePut(ctx, put, edit, durability);
+ public void call(RegionObserver observer) throws IOException {
+ observer.prePut(this, put, edit, durability);
}
});
}
@@ -761,11 +796,10 @@ public class RegionCoprocessorHost
*/
public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
final Cell kv, final byte[] byteNow, final Get get) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
+ public void call(RegionObserver observer) throws IOException {
+ observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
}
});
}
@@ -778,11 +812,10 @@ public class RegionCoprocessorHost
*/
public void postPut(final Put put, final WALEdit edit, final Durability durability)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postPut(ctx, put, edit, durability);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postPut(this, put, edit, durability);
}
});
}
@@ -796,11 +829,10 @@ public class RegionCoprocessorHost
*/
public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preDelete(ctx, delete, edit, durability);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preDelete(this, delete, edit, durability);
}
});
}
@@ -813,11 +845,10 @@ public class RegionCoprocessorHost
*/
public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postDelete(ctx, delete, edit, durability);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postDelete(this, delete, edit, durability);
}
});
}
@@ -829,11 +860,10 @@ public class RegionCoprocessorHost
*/
public boolean preBatchMutate(
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preBatchMutate(ctx, miniBatchOp);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preBatchMutate(this, miniBatchOp);
}
});
}
@@ -844,11 +874,10 @@ public class RegionCoprocessorHost
*/
public void postBatchMutate(
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postBatchMutate(ctx, miniBatchOp);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postBatchMutate(this, miniBatchOp);
}
});
}
@@ -856,11 +885,10 @@ public class RegionCoprocessorHost
public void postBatchMutateIndispensably(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postBatchMutateIndispensably(this, miniBatchOp, success);
}
});
}
@@ -880,15 +908,14 @@ public class RegionCoprocessorHost
final byte [] qualifier, final CompareOperator op,
final ByteArrayComparable comparator, final Put put)
throws IOException {
- return execOperationWithResult(true, false,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
- op, comparator, put, getResult()));
- }
- });
+ return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndPut(this, row, family, qualifier,
+ op, comparator, put, getResult());
+ }
+ });
}
/**
@@ -902,18 +929,17 @@ public class RegionCoprocessorHost
* be bypassed, or null otherwise
* @throws IOException e
*/
- public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
- final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
- final Put put) throws IOException {
- return execOperationWithResult(true, false,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
- op, comparator, put, getResult()));
- }
- });
+ public Boolean preCheckAndPutAfterRowLock(
+ final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
+ final ByteArrayComparable comparator, final Put put) throws IOException {
+ return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier,
+ op, comparator, put, getResult());
+ }
+ });
}
/**
@@ -929,15 +955,14 @@ public class RegionCoprocessorHost
final byte [] qualifier, final CompareOperator op,
final ByteArrayComparable comparator, final Put put,
boolean result) throws IOException {
- return execOperationWithResult(result,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
- op, comparator, put, getResult()));
- }
- });
+ return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.postCheckAndPut(this, row, family, qualifier,
+ op, comparator, put, getResult());
+ }
+ });
}
/**
@@ -955,15 +980,14 @@ public class RegionCoprocessorHost
final byte [] qualifier, final CompareOperator op,
final ByteArrayComparable comparator, final Delete delete)
throws IOException {
- return execOperationWithResult(true, false,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preCheckAndDelete(ctx, row, family,
- qualifier, op, comparator, delete, getResult()));
- }
- });
+ return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndDelete(this, row, family,
+ qualifier, op, comparator, delete, getResult());
+ }
+ });
}
/**
@@ -978,17 +1002,16 @@ public class RegionCoprocessorHost
* @throws IOException e
*/
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
- final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
- final Delete delete) throws IOException {
- return execOperationWithResult(true, false,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
- family, qualifier, op, comparator, delete, getResult()));
- }
- });
+ final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
+ final Delete delete) throws IOException {
+ return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preCheckAndDeleteAfterRowLock(this, row,
+ family, qualifier, op, comparator, delete, getResult());
+ }
+ });
}
/**
@@ -1004,15 +1027,14 @@ public class RegionCoprocessorHost
final byte [] qualifier, final CompareOperator op,
final ByteArrayComparable comparator, final Delete delete,
boolean result) throws IOException {
- return execOperationWithResult(result,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postCheckAndDelete(ctx, row, family,
- qualifier, op, comparator, delete, getResult()));
- }
- });
+ return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.postCheckAndDelete(this, row, family,
+ qualifier, op, comparator, delete, getResult());
+ }
+ });
}
/**
@@ -1022,14 +1044,13 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result preAppend(final Append append) throws IOException {
- return execOperationWithResult(true, null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preAppend(ctx, append));
- }
- });
+ return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+ @Override
+ public Result call(RegionObserver observer) throws IOException {
+ return observer.preAppend(this, append);
+ }
+ });
}
/**
@@ -1039,14 +1060,13 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result preAppendAfterRowLock(final Append append) throws IOException {
- return execOperationWithResult(true, null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preAppendAfterRowLock(ctx, append));
- }
- });
+ return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+ @Override
+ public Result call(RegionObserver observer) throws IOException {
+ return observer.preAppendAfterRowLock(this, append);
+ }
+ });
}
/**
@@ -1056,14 +1076,13 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result preIncrement(final Increment increment) throws IOException {
- return execOperationWithResult(true, null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preIncrement(ctx, increment));
- }
- });
+ return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+ @Override
+ public Result call(RegionObserver observer) throws IOException {
+ return observer.preIncrement(this, increment);
+ }
+ });
}
/**
@@ -1073,14 +1092,13 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
- return execOperationWithResult(true, null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preIncrementAfterRowLock(ctx, increment));
- }
- });
+ return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+ @Override
+ public Result call(RegionObserver observer) throws IOException {
+ return observer.preIncrementAfterRowLock(this, increment);
+ }
+ });
}
/**
@@ -1089,14 +1107,13 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result postAppend(final Append append, final Result result) throws IOException {
- return execOperationWithResult(result,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postAppend(ctx, append, result));
- }
- });
+ return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+ @Override
+ public Result call(RegionObserver observer) throws IOException {
+ return observer.postAppend(this, append, result);
+ }
+ });
}
/**
@@ -1105,14 +1122,13 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result postIncrement(final Increment increment, Result result) throws IOException {
- return execOperationWithResult(result,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postIncrement(ctx, increment, getResult()));
- }
- });
+ return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter) {
+ @Override
+ public Result call(RegionObserver observer) throws IOException {
+ return observer.postIncrement(this, increment, getResult());
+ }
+ });
}
/**
@@ -1122,30 +1138,28 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public RegionScanner preScannerOpen(final Scan scan) throws IOException {
- return execOperationWithResult(true, null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preScannerOpen(ctx, scan, getResult()));
- }
- });
+ return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter) {
+ @Override
+ public RegionScanner call(RegionObserver observer) throws IOException {
+ return observer.preScannerOpen(this, scan, getResult());
+ }
+ });
}
/**
* See
* {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner, long)}
*/
- public KeyValueScanner preStoreScannerOpen(HStore store, Scan scan,
- NavigableSet<byte[]> targetCols, long readPt) throws IOException {
- return execOperationWithResult(null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult(), readPt));
- }
- });
+ public KeyValueScanner preStoreScannerOpen(final HStore store, final Scan scan,
+ final NavigableSet<byte[]> targetCols, final long readPt) throws IOException {
+ return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, KeyValueScanner>(regionObserverGetter) {
+ @Override
+ public KeyValueScanner call(RegionObserver observer) throws IOException {
+ return observer.preStoreScannerOpen(this, store, scan, targetCols, getResult(), readPt);
+ }
+ });
}
/**
@@ -1155,14 +1169,13 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
- return execOperationWithResult(s,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postScannerOpen(ctx, scan, getResult()));
- }
- });
+ return execOperationWithResult(s, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter) {
+ @Override
+ public RegionScanner call(RegionObserver observer) throws IOException {
+ return observer.postScannerOpen(this, scan, getResult());
+ }
+ });
}
/**
@@ -1175,14 +1188,13 @@ public class RegionCoprocessorHost
*/
public Boolean preScannerNext(final InternalScanner s,
final List<Result> results, final int limit) throws IOException {
- return execOperationWithResult(true, false,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
- }
- });
+ return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.preScannerNext(this, s, results, limit, getResult());
+ }
+ });
}
/**
@@ -1196,14 +1208,13 @@ public class RegionCoprocessorHost
public boolean postScannerNext(final InternalScanner s,
final List<Result> results, final int limit, boolean hasMore)
throws IOException {
- return execOperationWithResult(hasMore,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
- }
- });
+ return execOperationWithResult(hasMore, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.postScannerNext(this, s, results, limit, getResult());
+ }
+ });
}
/**
@@ -1218,14 +1229,13 @@ public class RegionCoprocessorHost
throws IOException {
// short circuit for performance
if (!hasCustomPostScannerFilterRow) return true;
- return execOperationWithResult(true,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postScannerFilterRow(ctx, s, curRowCell, getResult()));
- }
- });
+ return execOperationWithResult(true, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.postScannerFilterRow(this, s, curRowCell, getResult());
+ }
+ });
}
/**
@@ -1234,11 +1244,10 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public boolean preScannerClose(final InternalScanner s) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preScannerClose(ctx, s);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preScannerClose(this, s);
}
});
}
@@ -1247,11 +1256,10 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public void postScannerClose(final InternalScanner s) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postScannerClose(ctx, s);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postScannerClose(this, s);
}
});
}
@@ -1262,11 +1270,10 @@ public class RegionCoprocessorHost
* @throws IOException Exception
*/
public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preReplayWALs(ctx, info, edits);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preReplayWALs(this, info, edits);
}
});
}
@@ -1277,11 +1284,10 @@ public class RegionCoprocessorHost
* @throws IOException Exception
*/
public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postReplayWALs(ctx, info, edits);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postReplayWALs(this, info, edits);
}
});
}
@@ -1295,11 +1301,10 @@ public class RegionCoprocessorHost
*/
public boolean preWALRestore(final RegionInfo info, final WALKey logKey,
final WALEdit logEdit) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preWALRestore(ctx, info, logKey, logEdit);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preWALRestore(this, info, logKey, logEdit);
}
});
}
@@ -1312,11 +1317,10 @@ public class RegionCoprocessorHost
*/
public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postWALRestore(ctx, info, logKey, logEdit);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postWALRestore(this, info, logKey, logEdit);
}
});
}
@@ -1327,31 +1331,28 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preBulkLoadHFile(ctx, familyPaths);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preBulkLoadHFile(this, familyPaths);
}
});
}
public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
throws IOException {
- return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.preCommitStoreFile(ctx, family, pairs);
+ public void call(RegionObserver observer) throws IOException {
+ observer.preCommitStoreFile(this, family, pairs);
}
});
}
public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postCommitStoreFile(ctx, family, srcPath, dstPath);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postCommitStoreFile(this, family, srcPath, dstPath);
}
});
}
@@ -1365,32 +1366,29 @@ public class RegionCoprocessorHost
*/
public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
Map<byte[], List<Path>> map, boolean hasLoaded) throws IOException {
- return execOperationWithResult(hasLoaded,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postBulkLoadHFile(ctx, familyPaths, map, getResult()));
- }
- });
+ return execOperationWithResult(hasLoaded, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter) {
+ @Override
+ public Boolean call(RegionObserver observer) throws IOException {
+ return observer.postBulkLoadHFile(this, familyPaths, map, getResult());
+ }
+ });
}
public void postStartRegionOperation(final Operation op) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postStartRegionOperation(ctx, op);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postStartRegionOperation(this, op);
}
});
}
public void postCloseRegionOperation(final Operation op) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postCloseRegionOperation(ctx, op);
+ public void call(RegionObserver observer) throws IOException {
+ observer.postCloseRegionOperation(this, op);
}
});
}
@@ -1409,14 +1407,14 @@ public class RegionCoprocessorHost
public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r) throws IOException {
- return execOperationWithResult(null,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
- }
- });
+ return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter) {
+ @Override
+ public StoreFileReader call(RegionObserver observer) throws IOException {
+ return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
+ getResult());
+ }
+ });
}
/**
@@ -1433,192 +1431,77 @@ public class RegionCoprocessorHost
public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r, final StoreFileReader reader) throws IOException {
- return execOperationWithResult(reader,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
- }
- });
+ return execOperationWithResult(reader, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter) {
+ @Override
+ public StoreFileReader call(RegionObserver observer) throws IOException {
+ return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
+ getResult());
+ }
+ });
}
public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
final Cell oldCell, Cell newCell) throws IOException {
- return execOperationWithResult(newCell,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
- }
- });
+ return execOperationWithResult(newCell, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, Cell>(regionObserverGetter) {
+ @Override
+ public Cell call(RegionObserver observer) throws IOException {
+ return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult());
+ }
+ });
}
public Message preEndpointInvocation(final Service service, final String methodName,
Message request) throws IOException {
- return execOperationWithResult(request,
- coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
- @Override
- public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
- }
- });
+ return execOperationWithResult(request, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter) {
+ @Override
+ public Message call(EndpointObserver observer) throws IOException {
+ return observer.preEndpointInvocation(this, service, methodName, getResult());
+ }
+ });
}
public void postEndpointInvocation(final Service service, final String methodName,
final Message request, final Message.Builder responseBuilder) throws IOException {
- execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
- @Override
- public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
- }
- });
+ execOperation(coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
+ @Override
+ public void call(EndpointObserver observer) throws IOException {
+ observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
+ }
+ });
}
public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
- return execOperationWithResult(tracker,
- coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
- @Override
- public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
- }
- });
- }
-
- private static abstract class CoprocessorOperation
- extends ObserverContext<RegionCoprocessorEnvironment> {
- public CoprocessorOperation() {
- this(RpcServer.getRequestUser());
- }
-
- public CoprocessorOperation(User user) {
- super(user);
- }
-
- public abstract void call(Coprocessor observer,
- ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
- public abstract boolean hasCall(Coprocessor observer);
- public void postEnvCall(RegionEnvironment env) { }
- }
-
- private static abstract class RegionOperation extends CoprocessorOperation {
- public RegionOperation() {
- }
-
- public RegionOperation(User user) {
- super(user);
- }
-
- public abstract void call(RegionObserver observer,
- ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
-
- public boolean hasCall(Coprocessor observer) {
- return observer instanceof RegionObserver;
- }
-
- public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- call((RegionObserver)observer, ctx);
- }
- }
-
- private static abstract class RegionOperationWithResult<T> extends RegionOperation {
- public RegionOperationWithResult() {
- }
-
- public RegionOperationWithResult(User user) {
- super (user);
- }
-
- private T result = null;
- public void setResult(final T result) { this.result = result; }
- public T getResult() { return this.result; }
- }
-
- private static abstract class EndpointOperation extends CoprocessorOperation {
- public abstract void call(EndpointObserver observer,
- ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
-
- public boolean hasCall(Coprocessor observer) {
- return observer instanceof EndpointObserver;
- }
-
- public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- call((EndpointObserver)observer, ctx);
- }
- }
-
- private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
- private T result = null;
- public void setResult(final T result) { this.result = result; }
- public T getResult() { return this.result; }
- }
-
- private boolean execOperation(final CoprocessorOperation ctx)
- throws IOException {
- return execOperation(true, ctx);
- }
-
- private <T> T execOperationWithResult(final T defaultValue,
- final RegionOperationWithResult<T> ctx) throws IOException {
- if (ctx == null) return defaultValue;
- ctx.setResult(defaultValue);
- execOperation(true, ctx);
- return ctx.getResult();
- }
-
- private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
- final RegionOperationWithResult<T> ctx) throws IOException {
- boolean bypass = false;
- T result = defaultValue;
- if (ctx != null) {
- ctx.setResult(defaultValue);
- bypass = execOperation(true, ctx);
- result = ctx.getResult();
- }
- return bypass == ifBypass ? result : null;
+ return execOperationWithResult(tracker, coprocEnvironments.isEmpty() ? null :
+ new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter) {
+ @Override
+ public DeleteTracker call(RegionObserver observer) throws IOException {
+ return observer.postInstantiateDeleteTracker(this, getResult());
+ }
+ });
}
- private <T> T execOperationWithResult(final T defaultValue,
- final EndpointOperationWithResult<T> ctx) throws IOException {
- if (ctx == null) return defaultValue;
- ctx.setResult(defaultValue);
- execOperation(true, ctx);
- return ctx.getResult();
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // BulkLoadObserver hooks
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ public void prePrepareBulkLoad(User user) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null :
+ new BulkLoadObserverOperation(user) {
+ @Override protected void call(BulkLoadObserver observer) throws IOException {
+ observer.prePrepareBulkLoad(this);
+ }
+ });
}
- private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
- throws IOException {
- boolean bypass = false;
- List<RegionEnvironment> envs = coprocessors.get();
- for (int i = 0; i < envs.size(); i++) {
- RegionEnvironment env = envs.get(i);
- Coprocessor observer = env.getInstance();
- if (ctx.hasCall(observer)) {
- ctx.prepare(env);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ctx.call(observer, ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- if (earlyExit && ctx.shouldComplete()) {
- break;
- }
- }
-
- ctx.postEnvCall(env);
- }
- return bypass;
+ public void preCleanupBulkLoad(User user) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null :
+ new BulkLoadObserverOperation(user) {
+ @Override protected void call(BulkLoadObserver observer) throws IOException {
+ observer.preCleanupBulkLoad(this);
+ }
+ });
}
}