You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/11/08 09:43:21 UTC
phoenix git commit: PHOENIX-4340 Implements Observer interfaces
instead of extending base observers classes
Repository: phoenix
Updated Branches:
refs/heads/5.x-HBase-2.0 d36558f4a -> 645fc39e8
PHOENIX-4340 Implements Observer interfaces instead of extending base observers classes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/645fc39e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/645fc39e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/645fc39e
Branch: refs/heads/5.x-HBase-2.0
Commit: 645fc39e823f503e5503ffd4b26f126f84449c2d
Parents: d36558f
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed Nov 8 15:13:12 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed Nov 8 15:13:12 2017 +0530
----------------------------------------------------------------------
.../coprocessor/BaseScannerRegionObserver.java | 10 +-
.../coprocessor/DelegateRegionObserver.java | 421 ++-----------------
.../coprocessor/DelegateRegionScanner.java | 18 +-
.../coprocessor/MetaDataRegionObserver.java | 12 +-
.../coprocessor/SequenceRegionObserver.java | 6 +-
.../UngroupedAggregateRegionObserver.java | 42 +-
.../org/apache/phoenix/hbase/index/Indexer.java | 47 +--
.../hbase/index/master/IndexMasterObserver.java | 4 +-
.../index/PhoenixTransactionalIndexer.java | 14 +-
.../transaction/OmidTransactionContext.java | 5 +-
.../transaction/PhoenixTransactionContext.java | 12 +-
.../transaction/TephraTransactionContext.java | 20 +-
12 files changed, 123 insertions(+), 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d3b257b..95379a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -21,13 +21,12 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -50,7 +49,7 @@ import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
-abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
+abstract public class BaseScannerRegionObserver implements RegionObserver {
public static final String AGGREGATORS = "_Aggs";
public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions";
@@ -135,11 +134,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
protected QualifierEncodingScheme encodingScheme;
protected boolean useNewValueColumnQualifier;
- @Override
- public void start(CoprocessorEnvironment e) throws IOException {
- super.start(e);
- }
-
/**
* Used by logger to identify coprocessor
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
index 59b2271..34eee78 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
@@ -18,15 +18,11 @@
package org.apache.phoenix.coprocessor;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
import java.util.List;
-import java.util.NavigableSet;
+import java.util.Map;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -34,36 +30,23 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
-import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.regionserver.DeleteTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
-import com.google.common.collect.ImmutableList;
-
public class DelegateRegionObserver implements RegionObserver {
protected final RegionObserver delegate;
@@ -72,15 +55,7 @@ public class DelegateRegionObserver implements RegionObserver {
this.delegate = delegate;
}
- @Override
- public void start(CoprocessorEnvironment env) throws IOException {
- delegate.start(env);
- }
-
- @Override
- public void stop(CoprocessorEnvironment env) throws IOException {
- delegate.stop(env);
- }
+
@Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
@@ -97,11 +72,6 @@ public class DelegateRegionObserver implements RegionObserver {
delegate.postLogReplay(c);
}
- @Override
- public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
- Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
- return delegate.preFlushScannerOpen(c, store, memstoreScanner, s);
- }
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
@@ -125,236 +95,7 @@ public class DelegateRegionObserver implements RegionObserver {
delegate.postFlush(c, store, resultFile);
}
- // Compaction and split upcalls run with the effective user context of the requesting user.
- // This will lead to failure of cross cluster RPC if the effective user is not
- // the login user. Switch to the login user context to ensure we have the expected
- // security context.
-
- @Override
- public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final List<StoreFile> candidates, final CompactionRequest request) throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.preCompactSelection(c, store, candidates, request);
- return null;
- }
- });
- }
-
- @Override
- public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final List<StoreFile> candidates) throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.preCompactSelection(c, store, candidates);
- return null;
- }
- });
- }
-
- @Override
- public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final ImmutableList<StoreFile> selected, final CompactionRequest request) {
- try {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.postCompactSelection(c, store, selected, request);
- return null;
- }
- });
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final ImmutableList<StoreFile> selected) {
- try {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.postCompactSelection(c, store, selected);
- return null;
- }
- });
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final InternalScanner scanner, final ScanType scanType, final CompactionRequest request)
- throws IOException {
- return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- return delegate.preCompact(c, store, scanner, scanType, request);
- }
- });
- }
-
- @Override
- public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final InternalScanner scanner, final ScanType scanType) throws IOException {
- return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- return delegate.preCompact(c, store, scanner, scanType);
- }
- });
- }
-
- @Override
- public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType,
- final long earliestPutTs, final InternalScanner s, final CompactionRequest request) throws IOException {
- return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s,
- request);
- }
- });
- }
-
- @Override
- public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType,
- final long earliestPutTs, final InternalScanner s) throws IOException {
- return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
- }
- });
- }
-
- @Override
- public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final StoreFile resultFile, final CompactionRequest request) throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.postCompact(c, store, resultFile, request);
- return null;
- }
- });
- }
-
- @Override
- public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final StoreFile resultFile) throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.postCompact(c, store, resultFile);
- return null;
- }
- });
- }
-
- @Override
- public void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.preSplit(c);
- return null;
- }
- });
- }
-
- @Override
- public void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] splitRow)
- throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.preSplit(c, splitRow);
- return null;
- }
- });
- }
-
- @Override
- public void postSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final Region l, final Region r)
- throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.postSplit(c, l, r);
- return null;
- }
- });
- }
-
- @Override
- public void preSplitBeforePONR(final ObserverContext<RegionCoprocessorEnvironment> ctx,
- final byte[] splitKey, final List<Mutation> metaEntries) throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.preSplitBeforePONR(ctx, splitKey, metaEntries);
- return null;
- }
- });
- }
-
- @Override
- public void preSplitAfterPONR(final ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.preSplitAfterPONR(ctx);
- return null;
- }
- });
- }
-
- @Override
- public void preRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.preRollBackSplit(ctx);
- return null;
- }
- });
- }
-
- @Override
- public void postRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.postRollBackSplit(ctx);
- return null;
- }
- });
- }
-
- @Override
- public void postCompleteSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx)
- throws IOException {
- // NOTE: This one is an exception and doesn't need a context change. Should
- // be infrequent and overhead is low, so let's ensure we have the right context
- // anyway to avoid potential surprise.
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- delegate.postCompleteSplit(ctx);
- return null;
- }
- });
- }
+
@Override
public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
@@ -366,19 +107,7 @@ public class DelegateRegionObserver implements RegionObserver {
public void postClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) {
delegate.postClose(c, abortRequested);
}
-
- @Override
- public void preGetClosestRowBefore(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
- byte[] family, Result result) throws IOException {
- delegate.preGetClosestRowBefore(c, row, family, result);
- }
-
- @Override
- public void postGetClosestRowBefore(ObserverContext<RegionCoprocessorEnvironment> c,
- byte[] row, byte[] family, Result result) throws IOException {
- delegate.postGetClosestRowBefore(c, row, family, result);
- }
-
+
@Override
public void
preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
@@ -466,69 +195,6 @@ public class DelegateRegionObserver implements RegionObserver {
}
@Override
- public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
- byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
- Put put, boolean result) throws IOException {
- return delegate.preCheckAndPut(c, row, family, qualifier, compareOp, comparator, put,
- result);
- }
-
- @Override
- public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
- byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
- ByteArrayComparable comparator, Put put, boolean result) throws IOException {
- return delegate.preCheckAndPutAfterRowLock(c, row, family, qualifier, compareOp,
- comparator, put, result);
- }
-
- @Override
- public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
- byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
- Put put, boolean result) throws IOException {
- return delegate.postCheckAndPut(c, row, family, qualifier, compareOp, comparator, put,
- result);
- }
-
- @Override
- public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
- byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
- Delete delete, boolean result) throws IOException {
- return delegate.preCheckAndDelete(c, row, family, qualifier, compareOp, comparator, delete,
- result);
- }
-
- @Override
- public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c,
- byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
- ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {
- return delegate.preCheckAndDeleteAfterRowLock(c, row, family, qualifier, compareOp,
- comparator, delete, result);
- }
-
- @Override
- public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
- byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
- Delete delete, boolean result) throws IOException {
- return delegate.postCheckAndDelete(c, row, family, qualifier, compareOp, comparator,
- delete, result);
- }
-
- @Override
- public long preIncrementColumnValue(ObserverContext<RegionCoprocessorEnvironment> c,
- byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- return delegate.preIncrementColumnValue(c, row, family, qualifier, amount, writeToWAL);
- }
-
- @Override
- public long postIncrementColumnValue(ObserverContext<RegionCoprocessorEnvironment> c,
- byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL,
- long result) throws IOException {
- return delegate.postIncrementColumnValue(c, row, family, qualifier, amount, writeToWAL,
- result);
- }
-
- @Override
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
throws IOException {
return delegate.preAppend(c, append);
@@ -572,13 +238,6 @@ public class DelegateRegionObserver implements RegionObserver {
}
@Override
- public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
- Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s)
- throws IOException {
- return delegate.preStoreScannerOpen(c, store, scan, targetCols, s);
- }
-
- @Override
public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Scan scan, RegionScanner s) throws IOException {
return delegate.postScannerOpen(c, scan, s);
@@ -596,12 +255,7 @@ public class DelegateRegionObserver implements RegionObserver {
return delegate.postScannerNext(c, s, result, limit, hasNext);
}
- @Override
- public boolean postScannerFilterRow(ObserverContext<RegionCoprocessorEnvironment> c,
- InternalScanner s, byte[] currentRow, int offset, short length, boolean hasMore)
- throws IOException {
- return delegate.postScannerFilterRow(c, s, currentRow, offset, length, hasMore);
- }
+
@Override
public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner s)
@@ -617,28 +271,19 @@ public class DelegateRegionObserver implements RegionObserver {
}
@Override
- public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
- HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
- delegate.preWALRestore(ctx, info, logKey, logEdit);
- }
-
- @Override
- public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegionInfo info,
- HLogKey logKey, WALEdit logEdit) throws IOException {
+ public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, RegionInfo info,
+ WALKey logKey, WALEdit logEdit) throws IOException {
delegate.preWALRestore(ctx, info, logKey, logEdit);
}
-
+
+
@Override
- public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
- HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, RegionInfo info,
+ WALKey logKey, WALEdit logEdit) throws IOException {
delegate.postWALRestore(ctx, info, logKey, logEdit);
}
- @Override
- public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegionInfo info,
- HLogKey logKey, WALEdit logEdit) throws IOException {
- delegate.postWALRestore(ctx, info, logKey, logEdit);
- }
+
@Override
public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
@@ -646,37 +291,39 @@ public class DelegateRegionObserver implements RegionObserver {
delegate.preBulkLoadHFile(ctx, familyPaths);
}
+
@Override
- public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
- List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
- return delegate.postBulkLoadHFile(ctx, familyPaths, hasLoaded);
+ public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
+ return delegate.postMutationBeforeWAL(ctx, opType, mutation, oldCell, newCell);
}
@Override
- public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
- FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
- Reference r, Reader reader) throws IOException {
- return delegate.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, reader);
+ public DeleteTracker postInstantiateDeleteTracker(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
+ throws IOException {
+ return delegate.postInstantiateDeleteTracker(ctx, delTracker);
}
@Override
- public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
- FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
- Reference r, Reader reader) throws IOException {
- return delegate.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, reader);
+ public void preCommitStoreFile(ObserverContext<RegionCoprocessorEnvironment> ctx, byte[] family,
+ List<Pair<Path, Path>> pairs) throws IOException {
+ delegate.preCommitStoreFile(ctx, family, pairs);
+
}
@Override
- public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
- MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
- return delegate.postMutationBeforeWAL(ctx, opType, mutation, oldCell, newCell);
+ public void postCommitStoreFile(ObserverContext<RegionCoprocessorEnvironment> ctx, byte[] family, Path srcPath,
+ Path dstPath) throws IOException {
+ delegate.postCommitStoreFile(ctx, family, srcPath, dstPath);
+
}
@Override
- public DeleteTracker postInstantiateDeleteTracker(
- ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
+ public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths, boolean hasLoaded)
throws IOException {
- return delegate.postInstantiateDeleteTracker(ctx, delTracker);
+ return delegate.postBulkLoadHFile(ctx, stagingFamilyPaths, finalPaths, hasLoaded);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index 95d449a..21a8eef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -20,7 +20,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -33,11 +33,6 @@ public class DelegateRegionScanner implements RegionScanner {
}
@Override
- public HRegionInfo getRegionInfo() {
- return delegate.getRegionInfo();
- }
-
- @Override
public boolean isFilterDone() throws IOException {
return delegate.isFilterDone();
}
@@ -86,4 +81,15 @@ public class DelegateRegionScanner implements RegionScanner {
public int getBatch() {
return delegate.getBatch();
}
+
+ @Override
+ public void shipped() throws IOException {
+ delegate.shipped();
+
+ }
+
+ @Override
+ public RegionInfo getRegionInfo() {
+ return delegate.getRegionInfo();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index c816549..e11ff14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -40,12 +41,12 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -95,7 +96,7 @@ import com.google.common.collect.Maps;
* to SYSTEM.TABLE.
*/
@SuppressWarnings("deprecation")
-public class MetaDataRegionObserver extends BaseRegionObserver {
+public class MetaDataRegionObserver implements RegionObserver,RegionCoprocessor {
public static final Log LOG = LogFactory.getLog(MetaDataRegionObserver.class);
public static final String REBUILD_INDEX_APPEND_TO_URL_STRING = "REBUILDINDEX";
private static final byte[] SYSTEM_CATALOG_KEY = SchemaUtil.getTableKey(
@@ -117,6 +118,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
executor.shutdownNow();
GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll();
}
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
@Override
public void start(CoprocessorEnvironment env) throws IOException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 6773f36..8ef5e80 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -34,9 +34,9 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
@@ -73,7 +73,7 @@ import com.google.common.collect.Lists;
*
* @since 3.0.0
*/
-public class SequenceRegionObserver extends BaseRegionObserver {
+public class SequenceRegionObserver implements RegionObserver {
public static final String OPERATION_ATTRIB = "SEQUENCE_OPERATION";
public static final String MAX_TIMERANGE_ATTRIB = "MAX_TIMERANGE";
public static final String CURRENT_VALUE_ATTRIB = "CURRENT_VALUE";
@@ -399,7 +399,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
Mutation m = null;
switch (op) {
case RETURN_SEQUENCE:
- KeyValue currentValueKV = result.raw()[0];
+ KeyValue currentValueKV = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(result.rawCells()[0]);
long expectedValue = PLong.INSTANCE.getCodec().decodeLong(append.getAttribute(CURRENT_VALUE_ATTRIB), 0, SortOrder.getDefault());
long value = PLong.INSTANCE.getCodec().decodeLong(currentValueKV.getValueArray(),
currentValueKV.getValueOffset(), SortOrder.getDefault());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c2f789a..a770aa0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -36,6 +36,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -53,22 +54,19 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.CoprocessorHConnection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -81,7 +79,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.ServerCacheClient;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.execute.TupleProjector;
@@ -98,7 +95,6 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -136,7 +132,6 @@ import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -148,9 +143,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
@@ -162,7 +155,7 @@ import com.google.common.primitives.Ints;
*
* @since 0.1
*/
-public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
+public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver implements RegionCoprocessor {
// TODO: move all constants into a single class
public static final String UNGROUPED_AGG = "UngroupedAgg";
public static final String DELETE_AGG = "DeleteAgg";
@@ -206,8 +199,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private Configuration compactionConfig;
@Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+
+ @Override
public void start(CoprocessorEnvironment e) throws IOException {
- super.start(e);
// Can't use ClientKeyValueBuilder on server-side because the memstore expects to
// be able to get a single backing buffer for a KeyValue.
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
@@ -243,7 +241,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
Mutation[] mutationArray = new Mutation[mutations.size()];
// When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
// flush happen which decrease the memstore size and then writes allowed on the region.
- for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
+ for (int i = 0; region.getMemStoreSize() > blockingMemstoreSize && i < 30; i++) {
try {
checkForRegionClosing();
Thread.sleep(100);
@@ -712,7 +710,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
Delete delete = new Delete(results.get(0).getRowArray(),
results.get(0).getRowOffset(),
results.get(0).getRowLength());
- delete.deleteColumns(deleteCF, deleteCQ, ts);
+ delete.addColumn(deleteCF, deleteCQ, ts);
// force tephra to ignore this deletes
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
mutations.add(delete);
@@ -902,8 +900,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
@Override
- public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final InternalScanner scanner, final ScanType scanType) throws IOException {
+ public InternalScanner preCompact(
+ org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ InternalScanner scanner, ScanType scanType,
+ org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
// Compaction and split upcalls run with the effective user context of the requesting user.
// This will lead to failure of cross cluster RPC if the effective user is not
// the login user. Switch to the login user context to ensure we have the expected
@@ -918,7 +919,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector(
c.getEnvironment(), table.getNameAsString(), clientTimeStamp,
- store.getFamily().getName());
+ store.getColumnFamilyDescriptor().getName());
internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner);
} catch (IOException e) {
// If we can't reach the stats table, don't interrupt the normal
@@ -934,8 +935,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
@Override
- public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final StoreFile resultFile, CompactionRequest request) throws IOException {
+ public void postCompact(org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, StoreFile resultFile,
+ org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
+
// If we're compacting all files, then delete markers are removed
// and we must permanently disable an index that needs to be
// partially rebuild because we're potentially losing the information
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 24eeab5..1c78fff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -24,7 +24,6 @@ import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER
import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,6 +31,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,23 +50,18 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
@@ -119,7 +114,7 @@ import com.google.common.collect.Multimap;
* Phoenix always does batch mutations.
* <p>
*/
-public class Indexer extends BaseRegionObserver {
+public class Indexer implements RegionObserver, RegionCoprocessor {
private static final Log LOG = LogFactory.getLog(Indexer.class);
private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS);
@@ -199,6 +194,11 @@ public class Indexer extends BaseRegionObserver {
private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
@Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
public void start(CoprocessorEnvironment e) throws IOException {
try {
final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
@@ -266,7 +266,6 @@ public class Indexer extends BaseRegionObserver {
}
} catch (NoSuchMethodError ex) {
disabled = true;
- super.start(e);
LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
}
}
@@ -301,7 +300,6 @@ public class Indexer extends BaseRegionObserver {
return;
}
if (this.disabled) {
- super.stop(e);
return;
}
this.stopped = true;
@@ -362,7 +360,6 @@ public class Indexer extends BaseRegionObserver {
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (this.disabled) {
- super.preBatchMutate(c, miniBatchOp);
return;
}
long start = EnvironmentEdgeManager.currentTimeMillis();
@@ -578,7 +575,6 @@ public class Indexer extends BaseRegionObserver {
public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
if (this.disabled) {
- super.postBatchMutateIndispensably(c, miniBatchOp, success);
return;
}
long start = EnvironmentEdgeManager.currentTimeMillis();
@@ -755,29 +751,6 @@ public class Indexer extends BaseRegionObserver {
}
}
- /**
- * Create a custom {@link InternalScanner} for a compaction that tracks the versions of rows that
- * are removed so we can clean then up from the the index table(s).
- * <p>
- * This is not yet implemented - its not clear if we should even mess around with the Index table
- * for these rows as those points still existed. TODO: v2 of indexing
- */
- @Override
- public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType,
- final long earliestPutTs, final InternalScanner s) throws IOException {
- // Compaction and split upcalls run with the effective user context of the requesting user.
- // This will lead to failure of cross cluster RPC if the effective user is not
- // the login user. Switch to the login user context to ensure we have the expected
- // security context.
- // NOTE: Not necessary here at this time but leave in place to document this critical detail.
- return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- return Indexer.super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
- }
- });
- }
/**
* Exposed for testing!
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
index 2f83f8d..c2b177a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
@@ -17,12 +17,12 @@
*/
package org.apache.phoenix.hbase.index.master;
-import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
/**
* Defines of coprocessor hooks(to support secondary indexing) of operations on
* {@link org.apache.hadoop.hbase.master.HMaster} process.
*/
-public class IndexMasterObserver extends BaseMasterObserver {
+public class IndexMasterObserver implements MasterObserver {
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 3495267..f3c1dbd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -33,6 +33,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -45,14 +46,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
@@ -101,7 +102,7 @@ import com.google.common.primitives.Longs;
* bit simpler than the non transactional case. For example, there's no need to muck with the WAL, as failure scenarios
* are handled by aborting the transaction.
*/
-public class PhoenixTransactionalIndexer extends BaseRegionObserver {
+public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoprocessor {
private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class);
@@ -117,7 +118,12 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
private PhoenixIndexCodec codec;
private IndexWriter writer;
private boolean stopped;
-
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
public void start(CoprocessorEnvironment e) throws IOException {
final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index d4553ec..6505cff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -19,10 +19,9 @@ package org.apache.phoenix.transaction;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -128,7 +127,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
}
@Override
- public BaseRegionObserver getCoProcessor() {
+ public RegionObserver getCoProcessor() {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index d335692..0e46ae9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -17,18 +17,18 @@
*/
package org.apache.phoenix.transaction;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.TimeoutException;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.concurrent.TimeoutException;
-
public interface PhoenixTransactionContext {
/**
@@ -166,7 +166,7 @@ public interface PhoenixTransactionContext {
*
* @return the coprocessor
*/
- public BaseRegionObserver getCoProcessor();
+ public RegionObserver getCoProcessor();
/**
*
http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 7515a9c..ebd7d2d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -24,17 +24,19 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.tephra.Transaction;
+import org.apache.tephra.Transaction.VisibilityLevel;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TransactionConflictException;
@@ -42,20 +44,19 @@ import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.Transaction.VisibilityLevel;
import org.apache.tephra.TxConstants;
import org.apache.tephra.distributed.PooledClientProvider;
+import org.apache.tephra.distributed.TransactionService;
import org.apache.tephra.distributed.TransactionServiceClient;
import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.tephra.util.TxUtils;
import org.apache.tephra.visibility.FenceWait;
import org.apache.tephra.visibility.VisibilityFence;
import org.apache.tephra.zookeeper.TephraZKClientService;
-import org.apache.tephra.distributed.TransactionService;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.HDFSTransactionStateStorage;
-import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.internal.utils.Networks;
@@ -63,13 +64,12 @@ import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
+import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.inject.util.Providers;
-import org.slf4j.Logger;
-
public class TephraTransactionContext implements PhoenixTransactionContext {
private static final TransactionCodec CODEC = new TransactionCodec();
@@ -423,7 +423,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
}
@Override
- public BaseRegionObserver getCoProcessor() {
+ public RegionObserver getCoProcessor() {
return new TransactionProcessor();
}