You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/10/02 00:13:06 UTC
[5/5] hbase git commit: HBASE-14475 Region split requests are always
audited with hbase user rather than request user (Ted Yu)
HBASE-14475 Region split requests are always audited with hbase user rather than request user (Ted Yu)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef7001c0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef7001c0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef7001c0
Branch: refs/heads/master
Commit: ef7001c0e87ff646f416c21227c7d5a0934820f7
Parents: 2e8575b
Author: Andrew Purtell <ap...@apache.org>
Authored: Thu Oct 1 12:08:49 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Thu Oct 1 15:12:32 2015 -0700
----------------------------------------------------------------------
.../hbase/regionserver/CompactSplitThread.java | 75 ++++++++++++++------
.../hbase/regionserver/CompactionRequestor.java | 8 ++-
.../hbase/regionserver/HRegionServer.java | 2 +-
.../hbase/regionserver/RSRpcServices.java | 7 +-
.../hadoop/hbase/regionserver/SplitRequest.java | 39 +++++++---
.../hbase/regionserver/TestCompaction.java | 4 +-
6 files changed, 97 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef7001c0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index 9474ac0..04adf25 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.StealJobQueue;
@@ -247,6 +249,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
}
public synchronized void requestSplit(final Region r, byte[] midKey) {
+ requestSplit(r, midKey, null);
+ }
+
+ /*
+ * The User parameter allows the split thread to assume the correct user identity
+ */
+ public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
if (midKey == null) {
LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
" not splittable because midkey=null");
@@ -256,7 +265,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
return;
}
try {
- this.splits.execute(new SplitRequest(r, midKey, this.server));
+ this.splits.execute(new SplitRequest(r, midKey, this.server, user));
if (LOG.isDebugEnabled()) {
LOG.debug("Split requested for " + r + ". " + this);
}
@@ -274,54 +283,55 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
@Override
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
List<Pair<CompactionRequest, Store>> requests) throws IOException {
- return requestCompaction(r, why, Store.NO_PRIORITY, requests);
+ return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
}
@Override
public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
final String why, CompactionRequest request) throws IOException {
- return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
+ return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
}
@Override
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
- int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
- return requestCompactionInternal(r, why, p, requests, true);
+ int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
+ return requestCompactionInternal(r, why, p, requests, true, user);
}
private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
- int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
+ int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
+ throws IOException {
// not a special compaction request, so make our own list
List<CompactionRequest> ret = null;
if (requests == null) {
ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
for (Store s : r.getStores()) {
- CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
+ CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
if (selectNow) ret.add(cr);
}
} else {
Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
ret = new ArrayList<CompactionRequest>(requests.size());
for (Pair<CompactionRequest, Store> pair : requests) {
- ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
+ ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
}
}
return ret;
}
public CompactionRequest requestCompaction(final Region r, final Store s,
- final String why, int priority, CompactionRequest request) throws IOException {
- return requestCompactionInternal(r, s, why, priority, request, true);
+ final String why, int priority, CompactionRequest request, User user) throws IOException {
+ return requestCompactionInternal(r, s, why, priority, request, true, user);
}
public synchronized void requestSystemCompaction(
final Region r, final String why) throws IOException {
- requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
+ requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
}
public void requestSystemCompaction(
final Region r, final Store s, final String why) throws IOException {
- requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
+ requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
}
/**
@@ -333,7 +343,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
* compaction will be used.
*/
private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
- final String why, int priority, CompactionRequest request, boolean selectNow)
+ final String why, int priority, CompactionRequest request, boolean selectNow, User user)
throws IOException {
if (this.server.isStopped()
|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
@@ -350,7 +360,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
// pool; we will do selection there, and move to large pool if necessary.
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
? longCompactions : shortCompactions;
- pool.execute(new CompactionRunner(s, r, compaction, pool));
+ pool.execute(new CompactionRunner(s, r, compaction, pool, user));
if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
@@ -454,9 +464,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
private CompactionContext compaction;
private int queuedPriority;
private ThreadPoolExecutor parent;
+ private User user;
public CompactionRunner(Store store, Region region,
- CompactionContext compaction, ThreadPoolExecutor parent) {
+ CompactionContext compaction, ThreadPoolExecutor parent, User user) {
super();
this.store = store;
this.region = (HRegion)region;
@@ -464,6 +475,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
this.queuedPriority = (this.compaction == null)
? store.getCompactPriority() : compaction.getRequest().getPriority();
this.parent = parent;
+ this.user = user;
}
@Override
@@ -472,13 +484,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
: ("Store = " + store.toString() + ", pri = " + queuedPriority);
}
- @Override
- public void run() {
- Preconditions.checkNotNull(server);
- if (server.isStopped()
- || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
- return;
- }
+ private void doCompaction() {
// Common case - system compaction without a file selection. Select now.
if (this.compaction == null) {
int oldPriority = this.queuedPriority;
@@ -552,6 +558,31 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
this.compaction.getRequest().afterExecute();
}
+ @Override
+ public void run() {
+ Preconditions.checkNotNull(server);
+ if (server.isStopped()
+ || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
+ return;
+ }
+ if (this.user == null) doCompaction();
+ else {
+ try {
+ user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ doCompaction();
+ return null;
+ }
+ });
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (IOException ioe) {
+ LOG.error("Encountered exception while compacting", ioe);
+ }
+ }
+ }
+
private String formatStackTrace(Exception ex) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef7001c0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
index 930baf0..a30b526 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
@InterfaceAudience.Private
@@ -72,12 +73,14 @@ public interface CompactionRequestor {
* @param requests custom compaction requests. Each compaction must specify the store on which it
* is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
* stores for the region.
+ * @user the effective user
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
* compactions were started.
* @throws IOException
*/
List<CompactionRequest> requestCompaction(
- final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>> requests
+ final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>> requests,
+ User user
) throws IOException;
/**
@@ -87,10 +90,11 @@ public interface CompactionRequestor {
* @param pri Priority of this compaction. minHeap. <=0 is critical
* @param request custom compaction request to run. {@link Store} and {@link Region} for the
* request must match the region and store specified here.
+ * @param user
* @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started
* @throws IOException
*/
CompactionRequest requestCompaction(
- final Region r, final Store s, final String why, int pri, CompactionRequest request
+ final Region r, final Store s, final String why, int pri, CompactionRequest request, User user
) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef7001c0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 59d13fa..0ba7b94 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1546,7 +1546,7 @@ public class HRegionServer extends HasThread implements
} else {
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use configured priority",
- this.majorCompactPriority, null);
+ this.majorCompactPriority, null, null);
}
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef7001c0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 3c0f50a..d00c65e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1238,10 +1238,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
if(family != null) {
regionServer.compactSplitThread.requestCompaction(region, store, log,
- Store.PRIORITY_USER, null);
+ Store.PRIORITY_USER, null, RpcServer.getRequestUser());
} else {
regionServer.compactSplitThread.requestCompaction(region, log,
- Store.PRIORITY_USER, null);
+ Store.PRIORITY_USER, null, RpcServer.getRequestUser());
}
return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) {
@@ -1850,7 +1850,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
splitPoint = request.getSplitPoint().toByteArray();
}
((HRegion)region).forceSplit(splitPoint);
- regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit());
+ regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
+ RpcServer.getRequestUser());
return SplitRegionResponse.newBuilder().build();
} catch (DroppedSnapshotException ex) {
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef7001c0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index c40f9b0..7e71727 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -19,12 +19,14 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
@@ -41,13 +43,15 @@ class SplitRequest implements Runnable {
private final HRegion parent;
private final byte[] midKey;
private final HRegionServer server;
+ private final User user;
private TableLock tableLock;
- SplitRequest(Region region, byte[] midKey, HRegionServer hrs) {
+ SplitRequest(Region region, byte[] midKey, HRegionServer hrs, User user) {
Preconditions.checkNotNull(hrs);
this.parent = (HRegion)region;
this.midKey = midKey;
this.server = hrs;
+ this.user = user;
}
@Override
@@ -55,13 +59,7 @@ class SplitRequest implements Runnable {
return "regionName=" + parent + ", midKey=" + Bytes.toStringBinary(midKey);
}
- @Override
- public void run() {
- if (this.server.isStopping() || this.server.isStopped()) {
- LOG.debug("Skipping split because server is stopping=" +
- this.server.isStopping() + " or stopped=" + this.server.isStopped());
- return;
- }
+ private void doSplitting() {
boolean success = false;
server.metricsRegionServer.incrSplitRequest();
long startTime = EnvironmentEdgeManager.currentTime();
@@ -148,6 +146,31 @@ class SplitRequest implements Runnable {
}
}
+ @Override
+ public void run() {
+ if (this.server.isStopping() || this.server.isStopped()) {
+ LOG.debug("Skipping split because server is stopping=" +
+ this.server.isStopping() + " or stopped=" + this.server.isStopped());
+ return;
+ }
+ if (this.user == null) doSplitting();
+ else {
+ try {
+ user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ doSplitting();
+ return null;
+ }
+ });
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (IOException ioe) {
+ LOG.error("Encountered exception while splitting", ioe);
+ }
+ }
+ }
+
protected void releaseTableLock() {
if (this.tableLock != null) {
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ef7001c0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 122b7a5..a377325 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -291,7 +291,7 @@ public class TestCompaction {
CountDownLatch latch = new CountDownLatch(1);
TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
- thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
+ thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request,null);
// wait for the latch to complete.
latch.await();
@@ -327,7 +327,7 @@ public class TestCompaction {
}
thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
- Collections.unmodifiableList(requests));
+ Collections.unmodifiableList(requests), null);
// wait for the latch to complete.
latch.await();