You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/11/23 20:39:26 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17346. Fair call queue is defeated by abusive service principals. Contributed by Ahmed Hussein (ahussein).

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 8459f1d  HADOOP-17346. Fair call queue is defeated by abusive service principals. Contributed by Ahmed Hussein (ahussein).
8459f1d is described below

commit 8459f1d95506503316f1dd535f9ecb7f781e0b72
Author: Eric Payne <ep...@apache.org>
AuthorDate: Mon Nov 23 20:37:33 2020 +0000

    HADOOP-17346. Fair call queue is defeated by abusive service principals. Contributed by Ahmed Hussein (ahussein).
---
 .../org/apache/hadoop/ipc/CallQueueManager.java    | 14 +++++
 .../org/apache/hadoop/ipc/DecayRpcScheduler.java   | 59 +++++++++++++++++++---
 .../src/main/java/org/apache/hadoop/ipc/RPC.java   | 14 ++++-
 .../main/java/org/apache/hadoop/ipc/Server.java    | 17 ++++++-
 .../apache/hadoop/ipc/UserIdentityProvider.java    |  2 +-
 .../org/apache/hadoop/security/SecurityUtil.java   | 20 +++++++-
 .../authorize/ServiceAuthorizationManager.java     | 25 ++++-----
 .../apache/hadoop/ipc/TestDecayRpcScheduler.java   |  3 +-
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   | 37 ++++++++++++++
 .../java/org/apache/hadoop/ipc/TestRpcBase.java    |  5 +-
 10 files changed, 167 insertions(+), 29 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 81b7d34..518c3cd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -208,6 +209,19 @@ public class CallQueueManager<E extends Schedulable>
     return scheduler.getPriorityLevel(e);
   }
 
+  int getPriorityLevel(UserGroupInformation user) {
+    if (scheduler instanceof DecayRpcScheduler) {
+      return ((DecayRpcScheduler)scheduler).getPriorityLevel(user);
+    }
+    return 0;
+  }
+
+  void setPriorityLevel(UserGroupInformation user, int priority) {
+    if (scheduler instanceof DecayRpcScheduler) {
+      ((DecayRpcScheduler)scheduler).setPriorityLevel(user, priority);
+    }
+  }
+
   void setClientBackoffEnabled(boolean value) {
     clientBackOffEnabled = value;
   }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index 45cbd4e..0ef481b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AtomicDoubleArray;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -179,6 +181,7 @@ public class DecayRpcScheduler implements RpcScheduler,
   private MetricsProxy metricsProxy;
   private final CostProvider costProvider;
 
+  private final Map<String, Integer> staticPriorities = new HashMap<>();
   /**
    * This TimerTask will call decayCurrentCosts until
    * the scheduler has been garbage collected.
@@ -486,7 +489,7 @@ public class DecayRpcScheduler implements RpcScheduler,
       AtomicLong value = entry.getValue().get(0);
 
       long snapshot = value.get();
-      int computedLevel = computePriorityLevel(snapshot);
+      int computedLevel = computePriorityLevel(snapshot, id);
 
       nextCache.put(id, computedLevel);
     }
@@ -536,7 +539,11 @@ public class DecayRpcScheduler implements RpcScheduler,
    * @param cost the cost for an identity
    * @return scheduling decision from 0 to numLevels - 1
    */
-  private int computePriorityLevel(long cost) {
+  private int computePriorityLevel(long cost, Object identity) {
+    Integer staticPriority = staticPriorities.get(identity);
+    if (staticPriority != null) {
+      return staticPriority.intValue();
+    }
     long totalCallSnapshot = totalDecayedCallCost.get();
 
     double proportion = 0;
@@ -576,11 +583,20 @@ public class DecayRpcScheduler implements RpcScheduler,
     // Cache was no good, compute it
     List<AtomicLong> costList = callCosts.get(identity);
     long currentCost = costList == null ? 0 : costList.get(0).get();
-    int priority = computePriorityLevel(currentCost);
+    int priority = computePriorityLevel(currentCost, identity);
     LOG.debug("compute priority for {} priority {}", identity, priority);
     return priority;
   }
 
+  private String getIdentity(Schedulable obj) {
+    String identity = this.identityProvider.makeIdentity(obj);
+    if (identity == null) {
+      // Identity provider did not handle this
+      identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
+    }
+    return identity;
+  }
+
   /**
    * Compute the appropriate priority for a schedulable based on past requests.
    * @param obj the schedulable obj to query and remember
@@ -589,15 +605,42 @@ public class DecayRpcScheduler implements RpcScheduler,
   @Override
   public int getPriorityLevel(Schedulable obj) {
     // First get the identity
-    String identity = this.identityProvider.makeIdentity(obj);
-    if (identity == null) {
-      // Identity provider did not handle this
-      identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
-    }
+    String identity = getIdentity(obj);
+    // highest priority users may have a negative priority but their
+    // calls will be priority 0.
+    return Math.max(0, cachedOrComputedPriorityLevel(identity));
+  }
 
+  @VisibleForTesting
+  int getPriorityLevel(UserGroupInformation ugi) {
+    String identity = getIdentity(newSchedulable(ugi));
+    // returns true priority of the user.
     return cachedOrComputedPriorityLevel(identity);
   }
 
+  @VisibleForTesting
+  void setPriorityLevel(UserGroupInformation ugi, int priority) {
+    String identity = getIdentity(newSchedulable(ugi));
+    priority = Math.min(numLevels - 1, priority);
+    LOG.info("Setting priority for user:" + identity + "=" + priority);
+    staticPriorities.put(identity, priority);
+  }
+
+  // dummy instance to conform to identity provider api.
+  private static Schedulable newSchedulable(UserGroupInformation ugi) {
+    return new Schedulable() {
+      @Override
+      public UserGroupInformation getUserGroupInformation() {
+        return ugi;
+      }
+
+      @Override
+      public int getPriorityLevel() {
+        return 0;
+      }
+    };
+  }
+
   @Override
   public boolean shouldBackOff(Schedulable obj) {
     Boolean backOff = false;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index ad3628d01..6169fef 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -980,7 +981,18 @@ public class RPC {
            " ProtocolImpl=" + protocolImpl.getClass().getName() +
            " protocolClass=" + protocolClass.getName());
      }
-   }
+      String client = SecurityUtil.getClientPrincipal(protocolClass, getConf());
+      if (client != null) {
+        // notify the server's rpc scheduler that the protocol user has
+        // highest priority.  the scheduler should exempt the user from
+        // priority calculations.
+        try {
+          setPriorityLevel(UserGroupInformation.createRemoteUser(client), -1);
+        } catch (Exception ex) {
+          LOG.warn("Failed to set scheduling priority for " + client, ex);
+        }
+      }
+    }
    
    static class VerProtocolImpl {
      final long version;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index f83cfb7..d522bce 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -643,7 +643,22 @@ public abstract class Server {
           address.getPort(), e);
     }
   }
-  
+
+  @VisibleForTesting
+  int getPriorityLevel(Schedulable e) {
+    return callQueue.getPriorityLevel(e);
+  }
+
+  @VisibleForTesting
+  int getPriorityLevel(UserGroupInformation ugi) {
+    return callQueue.getPriorityLevel(ugi);
+  }
+
+  @VisibleForTesting
+  void setPriorityLevel(UserGroupInformation ugi, int priority) {
+    callQueue.setPriorityLevel(ugi, priority);
+  }
+
   /**
    * Returns a handle to the rpcMetrics (required in tests)
    * @return rpc metrics
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
index 763605e..91ec1a2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
@@ -31,6 +31,6 @@ public class UserIdentityProvider implements IdentityProvider {
       return null;
     }
 
-    return ugi.getUserName();
+    return ugi.getShortUserName();
   }
 }
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
index aa12b93..03c5b58 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
@@ -380,7 +380,25 @@ public final class SecurityUtil {
     }
     return null;
   }
- 
+
+  /**
+   * Look up the client principal for a given protocol. It searches all known
+   * SecurityInfo providers.
+   * @param protocol the protocol class to get the information for
+   * @param conf configuration object
+   * @return client principal or null if it has no client principal defined.
+   */
+  public static String getClientPrincipal(Class<?> protocol,
+      Configuration conf) {
+    String user = null;
+    KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
+    if (krbInfo != null) {
+      String key = krbInfo.clientPrincipal();
+      user = (key != null && !key.isEmpty()) ? conf.get(key) : null;
+    }
+    return user;
+  }
+
   /**
    * Look up the TokenInfo for a given protocol. It searches all known
    * SecurityInfo providers.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
index a264eb4..b17e02b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.MachineList;
@@ -101,21 +100,19 @@ public class ServiceAuthorizationManager {
     String clientPrincipal = null;
     if (UserGroupInformation.isSecurityEnabled()) {
       // get client principal key to verify (if available)
-      KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
-      if (krbInfo != null) {
-        String clientKey = krbInfo.clientPrincipal();
-        if (clientKey != null && !clientKey.isEmpty()) {
-          try {
-            clientPrincipal = SecurityUtil.getServerPrincipal(
-                conf.get(clientKey), addr);
-          } catch (IOException e) {
-            throw (AuthorizationException) new AuthorizationException(
-                "Can't figure out Kerberos principal name for connection from "
-                + addr + " for user=" + user + " protocol=" + protocol)
-                .initCause(e);
-          }
+      clientPrincipal = SecurityUtil.getClientPrincipal(protocol, conf);
+      try {
+        if (clientPrincipal != null) {
+          clientPrincipal =
+              SecurityUtil.getServerPrincipal(clientPrincipal, addr);
         }
+      } catch (IOException e) {
+        throw (AuthorizationException) new AuthorizationException(
+            "Can't figure out Kerberos principal name for connection from "
+                + addr + " for user=" + user + " protocol=" + protocol)
+            .initCause(e);
       }
+
     }
     if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) || 
        acls.length != 2  || !acls[0].isUserAllowed(user) || acls[1].isUserAllowed(user)) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
index 7172332..3b8c58c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
@@ -42,9 +42,8 @@ import java.util.concurrent.TimeUnit;
 public class TestDecayRpcScheduler {
   private Schedulable mockCall(String id) {
     Schedulable mockCall = mock(Schedulable.class);
-    UserGroupInformation ugi = mock(UserGroupInformation.class);
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(id);
 
-    when(ugi.getUserName()).thenReturn(id);
     when(mockCall.getUserGroupInformation()).thenReturn(ugi);
 
     return mockCall;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 628c044..9fbb865 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1294,6 +1294,43 @@ public class TestRPC extends TestRpcBase {
     }
   }
 
+  @Test (timeout=30000)
+  public void testProtocolUserPriority() throws Exception {
+    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+    conf.set(CLIENT_PRINCIPAL_KEY, "clientForProtocol");
+    Server server = null;
+    try {
+      server = setupDecayRpcSchedulerandTestServer(ns + ".");
+
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user");
+      // normal users start with priority 0.
+      Assert.assertEquals(0, server.getPriorityLevel(ugi));
+      // calls for a protocol defined client will have priority of 0.
+      Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
+
+      // protocol defined client will have top priority of -1.
+      ugi = UserGroupInformation.createRemoteUser("clientForProtocol");
+      Assert.assertEquals(-1, server.getPriorityLevel(ugi));
+      // calls for a protocol defined client will have priority of 0.
+      Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
+    } finally {
+      stop(server, null);
+    }
+  }
+
+  private static Schedulable newSchedulable(UserGroupInformation ugi) {
+    return new Schedulable(){
+      @Override
+      public UserGroupInformation getUserGroupInformation() {
+        return ugi;
+      }
+      @Override
+      public int getPriorityLevel() {
+        return 0; // doesn't matter.
+      }
+    };
+  }
+
   private Server setupDecayRpcSchedulerandTestServer(String ns)
       throws Exception {
     final int queueSizePerHandler = 3;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 010935b..0962b50 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -62,6 +62,8 @@ public class TestRpcBase {
 
   protected final static String SERVER_PRINCIPAL_KEY =
       "test.ipc.server.principal";
+  protected final static String CLIENT_PRINCIPAL_KEY =
+      "test.ipc.client.principal";
   protected final static String ADDRESS = "0.0.0.0";
   protected final static int PORT = 0;
   protected static InetSocketAddress addr;
@@ -271,7 +273,8 @@ public class TestRpcBase {
     }
   }
 
-  @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY)
+  @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY,
+                clientPrincipal = CLIENT_PRINCIPAL_KEY)
   @TokenInfo(TestTokenSelector.class)
   @ProtocolInfo(protocolName = "org.apache.hadoop.ipc.TestRpcBase$TestRpcService",
       protocolVersion = 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org