You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/11/23 20:39:26 UTC
[hadoop] branch branch-3.3 updated: HADOOP-17346. Fair call queue
is defeated by abusive service principals. Contributed by Ahmed Hussein
(ahussein).
This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 8459f1d HADOOP-17346. Fair call queue is defeated by abusive service principals. Contributed by Ahmed Hussein (ahussein).
8459f1d is described below
commit 8459f1d95506503316f1dd535f9ecb7f781e0b72
Author: Eric Payne <ep...@apache.org>
AuthorDate: Mon Nov 23 20:37:33 2020 +0000
HADOOP-17346. Fair call queue is defeated by abusive service principals. Contributed by Ahmed Hussein (ahussein).
---
.../org/apache/hadoop/ipc/CallQueueManager.java | 14 +++++
.../org/apache/hadoop/ipc/DecayRpcScheduler.java | 59 +++++++++++++++++++---
.../src/main/java/org/apache/hadoop/ipc/RPC.java | 14 ++++-
.../main/java/org/apache/hadoop/ipc/Server.java | 17 ++++++-
.../apache/hadoop/ipc/UserIdentityProvider.java | 2 +-
.../org/apache/hadoop/security/SecurityUtil.java | 20 +++++++-
.../authorize/ServiceAuthorizationManager.java | 25 ++++-----
.../apache/hadoop/ipc/TestDecayRpcScheduler.java | 3 +-
.../test/java/org/apache/hadoop/ipc/TestRPC.java | 37 ++++++++++++++
.../java/org/apache/hadoop/ipc/TestRpcBase.java | 5 +-
10 files changed, 167 insertions(+), 29 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 81b7d34..518c3cd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -208,6 +209,19 @@ public class CallQueueManager<E extends Schedulable>
return scheduler.getPriorityLevel(e);
}
+ int getPriorityLevel(UserGroupInformation user) {
+ if (scheduler instanceof DecayRpcScheduler) {
+ return ((DecayRpcScheduler)scheduler).getPriorityLevel(user);
+ }
+ return 0;
+ }
+
+ void setPriorityLevel(UserGroupInformation user, int priority) {
+ if (scheduler instanceof DecayRpcScheduler) {
+ ((DecayRpcScheduler)scheduler).setPriorityLevel(user, priority);
+ }
+ }
+
void setClientBackoffEnabled(boolean value) {
clientBackOffEnabled = value;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index 45cbd4e..0ef481b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDoubleArray;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -179,6 +181,7 @@ public class DecayRpcScheduler implements RpcScheduler,
private MetricsProxy metricsProxy;
private final CostProvider costProvider;
+ private final Map<String, Integer> staticPriorities = new HashMap<>();
/**
* This TimerTask will call decayCurrentCosts until
* the scheduler has been garbage collected.
@@ -486,7 +489,7 @@ public class DecayRpcScheduler implements RpcScheduler,
AtomicLong value = entry.getValue().get(0);
long snapshot = value.get();
- int computedLevel = computePriorityLevel(snapshot);
+ int computedLevel = computePriorityLevel(snapshot, id);
nextCache.put(id, computedLevel);
}
@@ -536,7 +539,11 @@ public class DecayRpcScheduler implements RpcScheduler,
* @param cost the cost for an identity
* @return scheduling decision from 0 to numLevels - 1
*/
- private int computePriorityLevel(long cost) {
+ private int computePriorityLevel(long cost, Object identity) {
+ Integer staticPriority = staticPriorities.get(identity);
+ if (staticPriority != null) {
+ return staticPriority.intValue();
+ }
long totalCallSnapshot = totalDecayedCallCost.get();
double proportion = 0;
@@ -576,11 +583,20 @@ public class DecayRpcScheduler implements RpcScheduler,
// Cache was no good, compute it
List<AtomicLong> costList = callCosts.get(identity);
long currentCost = costList == null ? 0 : costList.get(0).get();
- int priority = computePriorityLevel(currentCost);
+ int priority = computePriorityLevel(currentCost, identity);
LOG.debug("compute priority for {} priority {}", identity, priority);
return priority;
}
+ private String getIdentity(Schedulable obj) {
+ String identity = this.identityProvider.makeIdentity(obj);
+ if (identity == null) {
+ // Identity provider did not handle this
+ identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
+ }
+ return identity;
+ }
+
/**
* Compute the appropriate priority for a schedulable based on past requests.
* @param obj the schedulable obj to query and remember
@@ -589,15 +605,42 @@ public class DecayRpcScheduler implements RpcScheduler,
@Override
public int getPriorityLevel(Schedulable obj) {
// First get the identity
- String identity = this.identityProvider.makeIdentity(obj);
- if (identity == null) {
- // Identity provider did not handle this
- identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
- }
+ String identity = getIdentity(obj);
+ // highest priority users may have a negative priority but their
+ // calls will be priority 0.
+ return Math.max(0, cachedOrComputedPriorityLevel(identity));
+ }
+ @VisibleForTesting
+ int getPriorityLevel(UserGroupInformation ugi) {
+ String identity = getIdentity(newSchedulable(ugi));
+ // returns true priority of the user.
return cachedOrComputedPriorityLevel(identity);
}
+ @VisibleForTesting
+ void setPriorityLevel(UserGroupInformation ugi, int priority) {
+ String identity = getIdentity(newSchedulable(ugi));
+ priority = Math.min(numLevels - 1, priority);
+ LOG.info("Setting priority for user:" + identity + "=" + priority);
+ staticPriorities.put(identity, priority);
+ }
+
+ // dummy instance to conform to identity provider api.
+ private static Schedulable newSchedulable(UserGroupInformation ugi) {
+ return new Schedulable() {
+ @Override
+ public UserGroupInformation getUserGroupInformation() {
+ return ugi;
+ }
+
+ @Override
+ public int getPriorityLevel() {
+ return 0;
+ }
+ };
+ }
+
@Override
public boolean shouldBackOff(Schedulable obj) {
Boolean backOff = false;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index ad3628d01..6169fef 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -980,7 +981,18 @@ public class RPC {
" ProtocolImpl=" + protocolImpl.getClass().getName() +
" protocolClass=" + protocolClass.getName());
}
- }
+ String client = SecurityUtil.getClientPrincipal(protocolClass, getConf());
+ if (client != null) {
+ // notify the server's rpc scheduler that the protocol user has
+ // highest priority. the scheduler should exempt the user from
+ // priority calculations.
+ try {
+ setPriorityLevel(UserGroupInformation.createRemoteUser(client), -1);
+ } catch (Exception ex) {
+ LOG.warn("Failed to set scheduling priority for " + client, ex);
+ }
+ }
+ }
static class VerProtocolImpl {
final long version;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index f83cfb7..d522bce 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -643,7 +643,22 @@ public abstract class Server {
address.getPort(), e);
}
}
-
+
+ @VisibleForTesting
+ int getPriorityLevel(Schedulable e) {
+ return callQueue.getPriorityLevel(e);
+ }
+
+ @VisibleForTesting
+ int getPriorityLevel(UserGroupInformation ugi) {
+ return callQueue.getPriorityLevel(ugi);
+ }
+
+ @VisibleForTesting
+ void setPriorityLevel(UserGroupInformation ugi, int priority) {
+ callQueue.setPriorityLevel(ugi, priority);
+ }
+
/**
* Returns a handle to the rpcMetrics (required in tests)
* @return rpc metrics
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
index 763605e..91ec1a2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
@@ -31,6 +31,6 @@ public class UserIdentityProvider implements IdentityProvider {
return null;
}
- return ugi.getUserName();
+ return ugi.getShortUserName();
}
}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
index aa12b93..03c5b58 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
@@ -380,7 +380,25 @@ public final class SecurityUtil {
}
return null;
}
-
+
+ /**
+ * Look up the client principal for a given protocol. It searches all known
+ * SecurityInfo providers.
+ * @param protocol the protocol class to get the information for
+ * @param conf configuration object
+ * @return client principal or null if it has no client principal defined.
+ */
+ public static String getClientPrincipal(Class<?> protocol,
+ Configuration conf) {
+ String user = null;
+ KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
+ if (krbInfo != null) {
+ String key = krbInfo.clientPrincipal();
+ user = (key != null && !key.isEmpty()) ? conf.get(key) : null;
+ }
+ return user;
+ }
+
/**
* Look up the TokenInfo for a given protocol. It searches all known
* SecurityInfo providers.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
index a264eb4..b17e02b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.MachineList;
@@ -101,21 +100,19 @@ public class ServiceAuthorizationManager {
String clientPrincipal = null;
if (UserGroupInformation.isSecurityEnabled()) {
// get client principal key to verify (if available)
- KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
- if (krbInfo != null) {
- String clientKey = krbInfo.clientPrincipal();
- if (clientKey != null && !clientKey.isEmpty()) {
- try {
- clientPrincipal = SecurityUtil.getServerPrincipal(
- conf.get(clientKey), addr);
- } catch (IOException e) {
- throw (AuthorizationException) new AuthorizationException(
- "Can't figure out Kerberos principal name for connection from "
- + addr + " for user=" + user + " protocol=" + protocol)
- .initCause(e);
- }
+ clientPrincipal = SecurityUtil.getClientPrincipal(protocol, conf);
+ try {
+ if (clientPrincipal != null) {
+ clientPrincipal =
+ SecurityUtil.getServerPrincipal(clientPrincipal, addr);
}
+ } catch (IOException e) {
+ throw (AuthorizationException) new AuthorizationException(
+ "Can't figure out Kerberos principal name for connection from "
+ + addr + " for user=" + user + " protocol=" + protocol)
+ .initCause(e);
}
+
}
if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) ||
acls.length != 2 || !acls[0].isUserAllowed(user) || acls[1].isUserAllowed(user)) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
index 7172332..3b8c58c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
@@ -42,9 +42,8 @@ import java.util.concurrent.TimeUnit;
public class TestDecayRpcScheduler {
private Schedulable mockCall(String id) {
Schedulable mockCall = mock(Schedulable.class);
- UserGroupInformation ugi = mock(UserGroupInformation.class);
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(id);
- when(ugi.getUserName()).thenReturn(id);
when(mockCall.getUserGroupInformation()).thenReturn(ugi);
return mockCall;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 628c044..9fbb865 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1294,6 +1294,43 @@ public class TestRPC extends TestRpcBase {
}
}
+ @Test (timeout=30000)
+ public void testProtocolUserPriority() throws Exception {
+ final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+ conf.set(CLIENT_PRINCIPAL_KEY, "clientForProtocol");
+ Server server = null;
+ try {
+ server = setupDecayRpcSchedulerandTestServer(ns + ".");
+
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user");
+ // normal users start with priority 0.
+ Assert.assertEquals(0, server.getPriorityLevel(ugi));
+ // calls for a protocol defined client will have priority of 0.
+ Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
+
+ // protocol defined client will have top priority of -1.
+ ugi = UserGroupInformation.createRemoteUser("clientForProtocol");
+ Assert.assertEquals(-1, server.getPriorityLevel(ugi));
+ // calls for a protocol defined client will have priority of 0.
+ Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
+ } finally {
+ stop(server, null);
+ }
+ }
+
+ private static Schedulable newSchedulable(UserGroupInformation ugi) {
+ return new Schedulable(){
+ @Override
+ public UserGroupInformation getUserGroupInformation() {
+ return ugi;
+ }
+ @Override
+ public int getPriorityLevel() {
+ return 0; // doesn't matter.
+ }
+ };
+ }
+
private Server setupDecayRpcSchedulerandTestServer(String ns)
throws Exception {
final int queueSizePerHandler = 3;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 010935b..0962b50 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -62,6 +62,8 @@ public class TestRpcBase {
protected final static String SERVER_PRINCIPAL_KEY =
"test.ipc.server.principal";
+ protected final static String CLIENT_PRINCIPAL_KEY =
+ "test.ipc.client.principal";
protected final static String ADDRESS = "0.0.0.0";
protected final static int PORT = 0;
protected static InetSocketAddress addr;
@@ -271,7 +273,8 @@ public class TestRpcBase {
}
}
- @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY)
+ @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY,
+ clientPrincipal = CLIENT_PRINCIPAL_KEY)
@TokenInfo(TestTokenSelector.class)
@ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService",
protocolVersion = 1)
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org