You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2016/11/07 20:39:32 UTC

[06/13] phoenix git commit: PHOENIX-3164 Cache UGI instances for remote users in PQS

PHOENIX-3164 Cache UGI instances for remote users in PQS

equals(Object) and hashCode() on UGI are implemented
via reference checks (rather than the values themselves).
This creates a situation where new PhoenixConnections are
opened for the same user via PQS.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d3c7c9b4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d3c7c9b4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d3c7c9b4

Branch: refs/heads/4.x-HBase-1.0
Commit: d3c7c9b48e02eef065c0c3af0c25ef3a279b8a3d
Parents: b64b08f
Author: Josh Elser <el...@apache.org>
Authored: Tue Aug 9 14:52:20 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Aug 15 18:42:06 2016 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/query/QueryServices.java |  3 +
 .../phoenix/query/QueryServicesOptions.java     |  4 +
 .../apache/phoenix/queryserver/server/Main.java | 61 ++++++++++++--
 .../server/PhoenixDoAsCallbackTest.java         | 89 ++++++++++++++++++++
 4 files changed, 149 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3c7c9b4/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e945021..42f954a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -206,6 +206,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String QUERY_SERVER_DNS_NAMESERVER_ATTRIB = "phoenix.queryserver.dns.nameserver";
     public static final String QUERY_SERVER_DNS_INTERFACE_ATTRIB = "phoenix.queryserver.dns.interface";
     public static final String QUERY_SERVER_HBASE_SECURITY_CONF_ATTRIB = "hbase.security.authentication";
+    public static final String QUERY_SERVER_UGI_CACHE_MAX_SIZE = "phoenix.queryserver.ugi.cache.max.size";
+    public static final String QUERY_SERVER_UGI_CACHE_INITIAL_SIZE = "phoenix.queryserver.ugi.cache.initial.size";
+    public static final String QUERY_SERVER_UGI_CACHE_CONCURRENCY = "phoenix.queryserver.ugi.cache.concurrency";
     
     public static final String RENEW_LEASE_ENABLED = "phoenix.scanner.lease.renew.enabled";
     public static final String RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS = "phoenix.scanner.lease.renew.interval";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3c7c9b4/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 9823182..70b85db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -242,6 +242,10 @@ public class QueryServicesOptions {
     // doesn't depend on phoenix-core.
     public static final String DEFAULT_QUERY_SERVER_SERIALIZATION = "PROTOBUF";
     public static final int DEFAULT_QUERY_SERVER_HTTP_PORT = 8765;
+    public static final long DEFAULT_QUERY_SERVER_UGI_CACHE_MAX_SIZE = 1000L;
+    public static final int DEFAULT_QUERY_SERVER_UGI_CACHE_INITIAL_SIZE = 100;
+    public static final int DEFAULT_QUERY_SERVER_UGI_CACHE_CONCURRENCY = 10;
+
     public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true;
     public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS =
             DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3c7c9b4/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/server/Main.java
----------------------------------------------------------------------
diff --git a/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/server/Main.java b/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/server/Main.java
index fc2ee34..4b3ca7e 100644
--- a/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/server/Main.java
+++ b/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/server/Main.java
@@ -18,14 +18,15 @@
 package org.apache.phoenix.queryserver.server;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.remote.Driver;
 import org.apache.calcite.avatica.remote.LocalService;
 import org.apache.calcite.avatica.remote.Service;
-import org.apache.calcite.avatica.server.AvaticaHandler;
-import org.apache.calcite.avatica.server.AvaticaServerConfiguration;
 import org.apache.calcite.avatica.server.DoAsRemoteUserCallback;
-import org.apache.calcite.avatica.server.HandlerFactory;
 import org.apache.calcite.avatica.server.HttpServer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,7 +44,6 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 
 import java.io.File;
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.security.PrivilegedExceptionAction;
@@ -54,6 +54,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -210,7 +211,7 @@ public final class Main extends Configured implements Tool, Runnable {
         // Enable SPNEGO and impersonation (through standard Hadoop configuration means)
         builder.withSpnego(ugi.getUserName())
             .withAutomaticLogin(keytab)
-            .withImpersonation(new PhoenixDoAsCallback(ugi));
+            .withImpersonation(new PhoenixDoAsCallback(ugi, getConf()));
       }
 
       // Build and start the HttpServer
@@ -261,15 +262,29 @@ public final class Main extends Configured implements Tool, Runnable {
    */
   static class PhoenixDoAsCallback implements DoAsRemoteUserCallback {
     private final UserGroupInformation serverUgi;
+    private final LoadingCache<String,UserGroupInformation> ugiCache;
 
-    public PhoenixDoAsCallback(UserGroupInformation serverUgi) {
+    public PhoenixDoAsCallback(UserGroupInformation serverUgi, Configuration conf) {
       this.serverUgi = Objects.requireNonNull(serverUgi);
+      this.ugiCache = CacheBuilder.newBuilder()
+          .initialCapacity(conf.getInt(QueryServices.QUERY_SERVER_UGI_CACHE_INITIAL_SIZE,
+                  QueryServicesOptions.DEFAULT_QUERY_SERVER_UGI_CACHE_INITIAL_SIZE))
+          .concurrencyLevel(conf.getInt(QueryServices.QUERY_SERVER_UGI_CACHE_CONCURRENCY,
+                  QueryServicesOptions.DEFAULT_QUERY_SERVER_UGI_CACHE_CONCURRENCY))
+          .maximumSize(conf.getLong(QueryServices.QUERY_SERVER_UGI_CACHE_MAX_SIZE,
+                  QueryServicesOptions.DEFAULT_QUERY_SERVER_UGI_CACHE_MAX_SIZE))
+          .build(new UgiCacheLoader(this.serverUgi));
     }
 
     @Override
     public <T> T doAsRemoteUser(String remoteUserName, String remoteAddress, final Callable<T> action) throws Exception {
-      // Proxy this user on top of the server's user (the real user)
-      UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(remoteUserName, serverUgi);
+      // We are guaranteed by Avatica that the `remoteUserName` is properly authenticated by the
+      // time this method is called. We don't have to verify the wire credentials, we can assume the
+      // user provided valid credentials for who it claimed it was.
+
+      // Proxy this user on top of the server's user (the real user). Get a cached instance, the
+      // LoadingCache will create a new instance for us if one isn't cached.
+      UserGroupInformation proxyUser = createProxyUser(remoteUserName);
 
       // Check if this user is allowed to be impersonated.
       // Will throw AuthorizationException if the impersonation as this user is not allowed
@@ -283,6 +298,36 @@ public final class Main extends Configured implements Tool, Runnable {
         }
       });
     }
+
+      @VisibleForTesting
+      UserGroupInformation createProxyUser(String remoteUserName) throws ExecutionException {
+          // PHOENIX-3164 UGI's hashCode and equals methods rely on reference checks, not
+          // value-based checks. We need to make sure we return the same UGI instance for a remote
+          // user, otherwise downstream code in Phoenix and HBase may not treat two of the same
+          // calls from one user as equivalent.
+          return ugiCache.get(remoteUserName);
+      }
+
+      @VisibleForTesting
+      LoadingCache<String,UserGroupInformation> getCache() {
+          return ugiCache;
+      }
+  }
+
+  /**
+   * CacheLoader implementation which creates a "proxy" UGI instance for the given user name.
+   */
+  static class UgiCacheLoader extends CacheLoader<String,UserGroupInformation> {
+      private final UserGroupInformation serverUgi;
+
+      public UgiCacheLoader(UserGroupInformation serverUgi) {
+          this.serverUgi = Objects.requireNonNull(serverUgi);
+      }
+
+      @Override
+      public UserGroupInformation load(String remoteUserName) throws Exception {
+          return UserGroupInformation.createProxyUser(remoteUserName, serverUgi);
+      }
   }
 
   public static void main(String[] argv) throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3c7c9b4/phoenix-queryserver/src/test/java/org/apache/phoenix/queryserver/server/PhoenixDoAsCallbackTest.java
----------------------------------------------------------------------
diff --git a/phoenix-queryserver/src/test/java/org/apache/phoenix/queryserver/server/PhoenixDoAsCallbackTest.java b/phoenix-queryserver/src/test/java/org/apache/phoenix/queryserver/server/PhoenixDoAsCallbackTest.java
new file mode 100644
index 0000000..000baec
--- /dev/null
+++ b/phoenix-queryserver/src/test/java/org/apache/phoenix/queryserver/server/PhoenixDoAsCallbackTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.queryserver.server;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.phoenix.queryserver.server.Main.PhoenixDoAsCallback;
+import org.junit.Test;
+
+/**
+ * Tests for the authorization callback hook Avatica provides for Phoenix to implement.
+ */
+public class PhoenixDoAsCallbackTest {
+
+    @Test
+    public void ugiInstancesAreCached() throws Exception {
+        Configuration conf = new Configuration(false);
+        UserGroupInformation serverUgi = UserGroupInformation.createUserForTesting("server", new String[0]);
+        PhoenixDoAsCallback callback = new PhoenixDoAsCallback(serverUgi, conf);
+
+        UserGroupInformation ugi1 = callback.createProxyUser("user1");
+        assertEquals(1, callback.getCache().size());
+        assertTrue(ugi1.getRealUser() == serverUgi);
+        UserGroupInformation ugi2 = callback.createProxyUser("user2");
+        assertEquals(2, callback.getCache().size());
+        assertTrue(ugi2.getRealUser() == serverUgi);
+
+        UserGroupInformation ugi1Reference = callback.createProxyUser("user1");
+        assertTrue(ugi1 == ugi1Reference);
+        assertEquals(2, callback.getCache().size());
+    }
+
+    @Test
+    public void proxyingUsersAreCached() throws Exception {
+      Configuration conf = new Configuration(false);
+      // The user "server" can impersonate anyone
+      conf.set("hadoop.proxyuser.server.groups", "*");
+      conf.set("hadoop.proxyuser.server.hosts", "*");
+      // Trigger ProxyUsers to refresh itself with the above configuration
+      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+      UserGroupInformation serverUgi = UserGroupInformation.createUserForTesting("server", new String[0]);
+      PhoenixDoAsCallback callback = new PhoenixDoAsCallback(serverUgi, conf);
+
+      UserGroupInformation user1 = callback.doAsRemoteUser("user1", "localhost:1234", new Callable<UserGroupInformation>() {
+          public UserGroupInformation call() throws Exception {
+            return UserGroupInformation.getCurrentUser();
+          }
+      });
+
+      UserGroupInformation user2 = callback.doAsRemoteUser("user2", "localhost:1235", new Callable<UserGroupInformation>() {
+          public UserGroupInformation call() throws Exception {
+            return UserGroupInformation.getCurrentUser();
+          }
+      });
+
+      UserGroupInformation user1Reference = callback.doAsRemoteUser("user1", "localhost:1234", new Callable<UserGroupInformation>() {
+          public UserGroupInformation call() throws Exception {
+            return UserGroupInformation.getCurrentUser();
+          }
+      });
+
+      // The UserGroupInformation.getCurrentUser() actually returns a new UGI instance, but the internal
+      // subject is the same. We can verify things will work as expected that way.
+      assertNotEquals(user1.hashCode(), user2.hashCode());
+      assertEquals("These should be the same (cached) instance", user1.hashCode(), user1Reference.hashCode());
+      assertEquals("These should be the same (cached) instance", user1, user1Reference);
+    }
+}