You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/08/16 09:35:06 UTC
[8/9] HBASE-11733 Avoid copy-paste in Master/Region CoprocessorHost
http://git-wip-us.apache.org/repos/asf/hbase/blob/7995d9b9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 6329d47..0ef863d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -309,83 +309,46 @@ public class RegionCoprocessorHost
* @throws IOException Signals that an I/O exception has occurred.
*/
public void preOpen() throws IOException {
-
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).preOpen(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preOpen(ctx);
}
- }
-
+ });
}
/**
* Invoked after a region open
*/
public void postOpen() {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).postOpen(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowableNoRethrow(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
+ try {
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postOpen(ctx);
}
- }
+ });
+ } catch (IOException e) {
+ LOG.warn(e);
}
-
}
/**
* Invoked after log replay on region
*/
public void postLogReplay() {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).postLogReplay(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowableNoRethrow(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
+ try {
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postLogReplay(ctx);
}
- }
+ });
+ } catch (IOException e) {
+ LOG.warn(e);
}
}
@@ -394,25 +357,13 @@ public class RegionCoprocessorHost
* @param abortRequested true if the server is aborting
*/
public void preClose(final boolean abortRequested) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).preClose(ctx, abortRequested);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
+ execOperation(false, new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preClose(ctx, abortRequested);
}
- }
-
+ });
}
/**
@@ -420,26 +371,20 @@ public class RegionCoprocessorHost
* @param abortRequested true if the server is aborting
*/
public void postClose(final boolean abortRequested) {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).postClose(ctx, abortRequested);
- } catch (Throwable e) {
- handleCoprocessorThrowableNoRethrow(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
+ try {
+ execOperation(false, new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postClose(ctx, abortRequested);
}
- env.offerExecutionLatency(System.nanoTime() - startTime);
- }
- shutdown(env);
+ public void postEnvCall(RegionEnvironment env) {
+ shutdown(env);
+ }
+ });
+ } catch (IOException e) {
+ LOG.warn(e);
}
-
}
/**
@@ -449,30 +394,15 @@ public class RegionCoprocessorHost
public InternalScanner preCompactScannerOpen(final Store store,
final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
final CompactionRequest request) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- InternalScanner s = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store,
- scanners, scanType, earliestPutTs, s, request);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env,e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(null,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
+ earliestPutTs, getResult(), request));
}
- }
- return s;
+ });
}
/**
@@ -486,31 +416,13 @@ public class RegionCoprocessorHost
*/
public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
final CompactionRequest request) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- boolean bypass = false;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates,
- request);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env,e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preCompactSelection(ctx, store, candidates, request);
}
- }
- return bypass;
+ });
}
/**
@@ -522,29 +434,17 @@ public class RegionCoprocessorHost
*/
public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
final CompactionRequest request) {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected,
- request);
- } catch (Throwable e) {
- handleCoprocessorThrowableNoRethrow(env,e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
+ try {
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postCompactSelection(ctx, store, selected, request);
}
- }
+ });
+ } catch (IOException e) {
+ LOG.warn(e);
}
-
}
/**
@@ -557,32 +457,14 @@ public class RegionCoprocessorHost
*/
public InternalScanner preCompact(final Store store, final InternalScanner scanner,
final ScanType scanType, final CompactionRequest request) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- boolean bypass = false;
- InternalScanner s = scanner;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- s = ((RegionObserver) env.getInstance()).preCompact(ctx, store, s, scanType,
- request);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env,e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(false, scanner,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
}
- }
- return bypass ? null : s;
+ });
}
/**
@@ -594,58 +476,29 @@ public class RegionCoprocessorHost
*/
public void postCompact(final Store store, final StoreFile resultFile,
final CompactionRequest request) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postCompact(ctx, store, resultFile, request);
}
- }
+ });
}
/**
* Invoked before a memstore flush
* @throws IOException
*/
- public InternalScanner preFlush(final Store store, final InternalScanner scanner) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- boolean bypass = false;
- InternalScanner s = scanner;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- s = ((RegionObserver)env.getInstance()).preFlush(ctx, store, s);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env,e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ public InternalScanner preFlush(final Store store, final InternalScanner scanner)
+ throws IOException {
+ return execOperationWithResult(false, scanner,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preFlush(ctx, store, getResult()));
}
- }
- return bypass ? null : s;
+ });
}
/**
@@ -653,27 +506,13 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void preFlush() throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).preFlush(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preFlush(ctx);
}
- }
+ });
}
/**
@@ -683,30 +522,14 @@ public class RegionCoprocessorHost
*/
public InternalScanner preFlushScannerOpen(final Store store,
final KeyValueScanner memstoreScanner) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- InternalScanner s = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store,
- memstoreScanner, s);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(null,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
}
- }
- return s;
+ });
}
/**
@@ -714,27 +537,13 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void postFlush() throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postFlush(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postFlush(ctx);
}
- }
+ });
}
/**
@@ -742,27 +551,13 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postFlush(ctx, store, storeFile);
}
- }
+ });
}
/**
@@ -770,28 +565,13 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void preSplit() throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).preSplit(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preSplit(ctx);
}
- }
-
+ });
}
/**
@@ -799,28 +579,13 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void preSplit(final byte[] splitRow) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).preSplit(ctx, splitRow);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preSplit(ctx, splitRow);
}
- }
-
+ });
}
/**
@@ -830,79 +595,34 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void postSplit(final HRegion l, final HRegion r) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postSplit(ctx, l, r);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postSplit(ctx, l, r);
}
- }
+ });
}
- public boolean preSplitBeforePONR(final byte[] splitKey,
+ public boolean preSplitBeforePONR(final byte[] splitKey,
final List<Mutation> metaEntries) throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).preSplitBeforePONR(ctx, splitKey, metaEntries);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
}
- }
- return bypass;
+ });
}
public void preSplitAfterPONR() throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).preSplitAfterPONR(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preSplitAfterPONR(ctx);
}
- }
+ });
}
/**
@@ -910,27 +630,13 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void preRollBackSplit() throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).preRollBackSplit(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preRollBackSplit(ctx);
}
- }
+ });
}
/**
@@ -938,27 +644,13 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void postRollBackSplit() throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).postRollBackSplit(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postRollBackSplit(ctx);
}
- }
+ });
}
/**
@@ -966,27 +658,13 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public void postCompleteSplit() throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).postCompleteSplit(ctx);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postCompleteSplit(ctx);
}
- }
+ });
}
// RegionObserver support
@@ -1000,30 +678,13 @@ public class RegionCoprocessorHost
*/
public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
final Result result) throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).preGetClosestRowBefore(ctx, row, family, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preGetClosestRowBefore(ctx, row, family, result);
}
- }
- return bypass;
+ });
}
/**
@@ -1034,27 +695,13 @@ public class RegionCoprocessorHost
*/
public void postGetClosestRowBefore(final byte[] row, final byte[] family,
final Result result) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postGetClosestRowBefore(ctx, row, family, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postGetClosestRowBefore(ctx, row, family, result);
}
- }
+ });
}
/**
@@ -1064,30 +711,13 @@ public class RegionCoprocessorHost
*/
public boolean preGet(final Get get, final List<Cell> results)
throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).preGetOp(ctx, get, results);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preGetOp(ctx, get, results);
}
- }
- return bypass;
+ });
}
/**
@@ -1096,28 +726,14 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public void postGet(final Get get, final List<Cell> results)
- throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postGetOp(ctx, get, results);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ throws IOException {
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postGetOp(ctx, get, results);
}
- }
+ });
}
/**
@@ -1127,31 +743,14 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public Boolean preExists(final Get get) throws IOException {
- boolean bypass = false;
- boolean exists = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- exists = ((RegionObserver)env.getInstance()).preExists(ctx, get, exists);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, false,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preExists(ctx, get, getResult()));
}
- }
- return bypass ? exists : null;
+ });
}
/**
@@ -1162,28 +761,14 @@ public class RegionCoprocessorHost
*/
public boolean postExists(final Get get, boolean exists)
throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- exists = ((RegionObserver)env.getInstance()).postExists(ctx, get, exists);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(exists,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.postExists(ctx, get, getResult()));
}
- }
- return exists;
+ });
}
/**
@@ -1195,30 +780,13 @@ public class RegionCoprocessorHost
*/
public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).prePut(ctx, put, edit, durability);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.prePut(ctx, put, edit, durability);
}
- }
- return bypass;
+ });
}
/**
@@ -1231,34 +799,15 @@ public class RegionCoprocessorHost
* @exception IOException
* Exception
*/
- public boolean prePrepareTimeStampForDeleteVersion(Mutation mutation,
- Cell kv, byte[] byteNow, Get get) throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance())
- .prePrepareTimeStampForDeleteVersion(ctx, mutation, kv,
- byteNow, get);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
+ final Cell kv, final byte[] byteNow, final Get get) throws IOException {
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
}
- }
- return bypass;
+ });
}
/**
@@ -1269,27 +818,13 @@ public class RegionCoprocessorHost
*/
public void postPut(final Put put, final WALEdit edit, final Durability durability)
throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postPut(ctx, put, edit, durability);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postPut(ctx, put, edit, durability);
}
- }
+ });
}
/**
@@ -1301,30 +836,13 @@ public class RegionCoprocessorHost
*/
public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).preDelete(ctx, delete, edit, durability);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preDelete(ctx, delete, edit, durability);
}
- }
- return bypass;
+ });
}
/**
@@ -1335,29 +853,15 @@ public class RegionCoprocessorHost
*/
public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postDelete(ctx, delete, edit, durability);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postDelete(ctx, delete, edit, durability);
}
- }
+ });
}
-
+
/**
* @param miniBatchOp
* @return true if default processing should be bypassed
@@ -1365,31 +869,13 @@ public class RegionCoprocessorHost
*/
public boolean preBatchMutate(
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preBatchMutate(ctx, miniBatchOp);
}
- }
-
- return bypass;
+ });
}
/**
@@ -1398,54 +884,25 @@ public class RegionCoprocessorHost
*/
public void postBatchMutate(
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postBatchMutate(ctx, miniBatchOp);
}
- }
+ });
}
public void postBatchMutateIndispensably(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver) env.getInstance()).postBatchMutateIndispensably(ctx, miniBatchOp,
- success);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
}
- }
+ });
}
/**
@@ -1462,33 +919,16 @@ public class RegionCoprocessorHost
public Boolean preCheckAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOp compareOp,
final ByteArrayComparable comparator, final Put put)
- throws IOException {
- boolean bypass = false;
- boolean result = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver)env.getInstance()).preCheckAndPut(ctx, row, family, qualifier,
- compareOp, comparator, put, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ throws IOException {
+ return execOperationWithResult(true, false,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
+ compareOp, comparator, put, getResult()));
}
- }
- return bypass ? result : null;
+ });
}
/**
@@ -1505,32 +945,15 @@ public class RegionCoprocessorHost
public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
final Put put) throws IOException {
- boolean bypass = false;
- boolean result = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver) env.getInstance()).preCheckAndPutAfterRowLock(ctx, row,
- family, qualifier, compareOp, comparator, put, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, false,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
+ compareOp, comparator, put, getResult()));
}
- }
- return bypass ? result : null;
+ });
}
/**
@@ -1545,31 +968,16 @@ public class RegionCoprocessorHost
public boolean postCheckAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOp compareOp,
final ByteArrayComparable comparator, final Put put,
- boolean result)
- throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver)env.getInstance()).postCheckAndPut(ctx, row, family,
- qualifier, compareOp, comparator, put, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ boolean result) throws IOException {
+ return execOperationWithResult(result,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
+ compareOp, comparator, put, getResult()));
}
- }
- return result;
+ });
}
/**
@@ -1587,32 +995,15 @@ public class RegionCoprocessorHost
final byte [] qualifier, final CompareOp compareOp,
final ByteArrayComparable comparator, final Delete delete)
throws IOException {
- boolean bypass = false;
- boolean result = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver)env.getInstance()).preCheckAndDelete(ctx, row, family,
- qualifier, compareOp, comparator, delete, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, false,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preCheckAndDelete(ctx, row, family,
+ qualifier, compareOp, comparator, delete, getResult()));
}
- }
- return bypass ? result : null;
+ });
}
/**
@@ -1629,32 +1020,15 @@ public class RegionCoprocessorHost
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
final Delete delete) throws IOException {
- boolean bypass = false;
- boolean result = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver) env.getInstance()).preCheckAndDeleteAfterRowLock(ctx, row,
- family, qualifier, compareOp, comparator, delete, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, false,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
+ family, qualifier, compareOp, comparator, delete, getResult()));
}
- }
- return bypass ? result : null;
+ });
}
/**
@@ -1670,29 +1044,15 @@ public class RegionCoprocessorHost
final byte [] qualifier, final CompareOp compareOp,
final ByteArrayComparable comparator, final Delete delete,
boolean result) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver)env.getInstance()).postCheckAndDelete(ctx, row, family,
- qualifier, compareOp, comparator, delete, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(result,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.postCheckAndDelete(ctx, row, family,
+ qualifier, compareOp, comparator, delete, getResult()));
}
- }
- return result;
+ });
}
/**
@@ -1702,31 +1062,14 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result preAppend(final Append append) throws IOException {
- boolean bypass = false;
- Result result = null;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver)env.getInstance()).preAppend(ctx, append);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, null,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preAppend(ctx, append));
}
- }
- return bypass ? result : null;
+ });
}
/**
@@ -1736,31 +1079,14 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result preAppendAfterRowLock(final Append append) throws IOException {
- boolean bypass = false;
- Result result = null;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver) env.getInstance()).preAppendAfterRowLock(ctx, append);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, null,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preAppendAfterRowLock(ctx, append));
}
- }
- return bypass ? result : null;
+ });
}
/**
@@ -1770,31 +1096,14 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result preIncrement(final Increment increment) throws IOException {
- boolean bypass = false;
- Result result = null;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, null,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preIncrement(ctx, increment));
}
- }
- return bypass ? result : null;
+ });
}
/**
@@ -1804,31 +1113,14 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
- boolean bypass = false;
- Result result = null;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver) env.getInstance()).preIncrementAfterRowLock(ctx, increment);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, null,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preIncrementAfterRowLock(ctx, increment));
}
- }
- return bypass ? result : null;
+ });
}
/**
@@ -1837,27 +1129,13 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public void postAppend(final Append append, final Result result) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postAppend(ctx, append, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postAppend(ctx, append, result);
}
- }
+ });
}
/**
@@ -1866,28 +1144,14 @@ public class RegionCoprocessorHost
* @throws IOException if an error occurred on the coprocessor
*/
public Result postIncrement(final Increment increment, Result result) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(result,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.postIncrement(ctx, increment, getResult()));
}
- }
- return result;
+ });
}
/**
@@ -1897,31 +1161,14 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public RegionScanner preScannerOpen(final Scan scan) throws IOException {
- boolean bypass = false;
- RegionScanner s = null;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- s = ((RegionObserver)env.getInstance()).preScannerOpen(ctx, scan, s);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, null,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preScannerOpen(ctx, scan, getResult()));
}
- }
- return bypass ? s : null;
+ });
}
/**
@@ -1931,30 +1178,14 @@ public class RegionCoprocessorHost
*/
public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
final NavigableSet<byte[]> targetCols) throws IOException {
- KeyValueScanner s = null;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan,
- targetCols, s);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(null,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
}
- }
- return s;
+ });
}
/**
@@ -1964,28 +1195,14 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- s = ((RegionObserver)env.getInstance()).postScannerOpen(ctx, scan, s);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(s,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.postScannerOpen(ctx, scan, getResult()));
}
- }
- return s;
+ });
}
/**
@@ -1998,32 +1215,14 @@ public class RegionCoprocessorHost
*/
public Boolean preScannerNext(final InternalScanner s,
final List<Result> results, final int limit) throws IOException {
- boolean bypass = false;
- boolean hasNext = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- hasNext = ((RegionObserver)env.getInstance()).preScannerNext(ctx, s, results, limit,
- hasNext);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true, false,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
}
- }
-
- return bypass ? hasNext : null;
+ });
}
/**
@@ -2037,29 +1236,14 @@ public class RegionCoprocessorHost
public boolean postScannerNext(final InternalScanner s,
final List<Result> results, final int limit, boolean hasMore)
throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- hasMore = ((RegionObserver)env.getInstance()).postScannerNext(ctx, s, results, limit,
- hasMore);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(hasMore,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
}
- }
- return hasMore;
+ });
}
/**
@@ -2074,90 +1258,42 @@ public class RegionCoprocessorHost
*/
public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
final int offset, final short length) throws IOException {
- boolean hasMore = true; // By default assume more rows there.
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env : coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
- offset, length, hasMore);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperationWithResult(true,
+ coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
}
- }
- return hasMore;
+ });
}
-
+
/**
* @param s the scanner
* @return true if default behavior should be bypassed, false otherwise
* @exception IOException Exception
*/
public boolean preScannerClose(final InternalScanner s) throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).preScannerClose(ctx, s);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preScannerClose(ctx, s);
}
- }
-
- return bypass;
+ });
}
/**
* @exception IOException Exception
*/
public void postScannerClose(final InternalScanner s) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postScannerClose(ctx, s);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postScannerClose(ctx, s);
}
- }
+ });
}
/**
@@ -2169,30 +1305,13 @@ public class RegionCoprocessorHost
*/
public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
final WALEdit logEdit) throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey, logEdit);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preWALRestore(ctx, info, logKey, logEdit);
}
- }
- return bypass;
+ });
}
/**
@@ -2203,27 +1322,13 @@ public class RegionCoprocessorHost
*/
public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).postWALRestore(ctx, info, logKey, logEdit);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.postWALRestore(ctx, info, logKey, logEdit);
}
- }
+ });
}
/**
@@ -2232,30 +1337,13 @@ public class RegionCoprocessorHost
* @throws IOException
*/
public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
- boolean bypass = false;
- ObserverContext<RegionCoprocessorEnvironment> ctx = null;
- for (RegionEnvironment env: coprocessors) {
- if (env.getInstance() instanceof RegionObserver) {
- long startTime = System.nanoTime();
- ctx = ObserverContext.createAndPrepare(env, ctx);
- Thread currentThread = Thread.currentThread();
- ClassLoader cl = currentThread.getContextClassLoader();
- try {
- currentThread.setContextClassLoader(env.getClassLoader());
- ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths);
- } catch (Throwable e) {
- handleCoprocessorThrowable(env, e);
- } finally {
- currentThread.setContextClassLoader(cl);
- }
- bypass |= ctx.shouldBypass();
- env.offerExecutionLatency(System.nanoTime() - startTime);
- if (ctx.shouldComplete()) {
- break;
- }
+ return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preBulkLoadHFile(ctx, familyPaths);
}
- }
- return bypass;
+ });
}
/**
@@ -2266,77 +1354,34 @@ public class RegionCoprocessorHost
*/
public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
boolean hasLoaded) throws IOException {
- ObserverContext<RegionCoprocessorEnvironment> ctx =
<TRUNCATED>