You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2020/04/23 06:42:36 UTC

[ranger] branch master updated: RANGER-2754: upgrade presto plugin to support row-filtering and column-masking and for changes in 317

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new a15e49f  RANGER-2754: upgrade presto plugin to support row-filtering and column-masking and for changes in 317
a15e49f is described below

commit a15e49fb53b9a0ea7ae273a26e412e4b065765a1
Author: Bolke de Bruin <bo...@xs4all.nl>
AuthorDate: Tue Apr 21 13:09:28 2020 +0200

    RANGER-2754: upgrade presto plugin to support row-filtering and column-masking and for changes in 317
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
---
 .../service-defs/ranger-servicedef-presto.json     |  236 ++++-
 distro/src/main/assembly/plugin-presto.xml         |    7 +-
 plugin-presto/pom.xml                              |   15 +
 .../authorizer/RangerSystemAccessControl.java      |  610 +++++++++---
 .../presto/authorizer/RangerAdminClientImpl.java   |   43 +
 .../authorizer/RangerSystemAccessControlTest.java  |  185 ++++
 plugin-presto/src/test/resources/log4j.properties  |   26 +
 .../src/test/resources/presto-policies.json        | 1040 ++++++++++++++++++++
 .../src/test/resources/ranger-presto-security.xml  |   52 +
 pom.xml                                            |    5 +-
 ranger-presto-plugin-shim/pom.xml                  |   12 +
 .../presto/authorizer/RangerConfig.java            |   22 +
 .../authorizer/RangerSystemAccessControl.java      |  387 +++++---
 13 files changed, 2347 insertions(+), 293 deletions(-)

diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-presto.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-presto.json
index 56a8f5a..4d5b795 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-presto.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-presto.json
@@ -93,6 +93,69 @@
       "uiHint": "",
       "label": "Presto Column",
       "description": "Presto Column"
+    },
+    {
+      "itemId": 5,
+      "name": "prestouser",
+      "type": "string",
+      "level": 10,
+      "parent": "",
+      "mandatory": true,
+      "lookupSupported": false,
+      "recursiveSupported": false,
+      "excludesSupported": false,
+      "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+      "matcherOptions": {
+        "wildCard": true,
+        "ignoreCase": true
+      },
+      "validationRegEx": "",
+      "validationMessage": "",
+      "uiHint": "",
+      "label": "Presto User",
+      "description": "Presto User"
+    },
+    {
+      "itemId": 6,
+      "name": "systemproperty",
+      "type": "string",
+      "level": 10,
+      "parent": "",
+      "mandatory": true,
+      "lookupSupported": false,
+      "recursiveSupported": false,
+      "excludesSupported": false,
+      "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+      "matcherOptions": {
+        "wildCard": true,
+        "ignoreCase": true
+      },
+      "validationRegEx": "",
+      "validationMessage": "",
+      "uiHint": "",
+      "label": "System Property",
+      "description": "Presto System Property"
+    },
+    {
+      "itemId": 7,
+      "name": "sessionproperty",
+      "type": "string",
+      "level": 20,
+      "parent": "catalog",
+      "mandatory": true,
+      "lookupSupported": false,
+      "recursiveSupported": false,
+      "excludesSupported": false,
+      "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+      "matcherOptions": {
+        "wildCard": true,
+        "ignoreCase": true
+      },
+      "validationRegEx": "",
+      "validationMessage": "",
+      "uiHint": "",
+      "label": "Catalog Session Property",
+      "description": "Presto Catalog Session Property"
     }
   ],
   "accessTypes": [
@@ -118,31 +181,55 @@
     },
     {
       "itemId": 5,
+      "name": "delete",
+      "label": "Delete"
+    },
+    {
+      "itemId": 6,
       "name": "use",
       "label": "Use"
     },
     {
-      "itemId": 6,
+      "itemId": 7,
       "name": "alter",
       "label": "Alter"
     },
     {
-      "itemId": 7,
-      "name": "admin",
-      "label": "Admin"
+      "itemId": 8,
+      "name": "grant",
+      "label": "Grant"
     },
     {
-      "itemId": 8,
+      "itemId": 9,
+      "name": "revoke",
+      "label": "Revoke"
+    },
+    {
+      "itemId": 10,
+      "name": "show",
+      "label": "Show"
+    },
+    {
+      "itemId": 11,
+      "name": "impersonate",
+      "label": "Impersonate"
+    },
+    {
+      "itemId": 12,
       "name": "all",
       "label": "All",
       "impliedGrants": [
         "select",
         "insert",
         "create",
+        "delete",
         "drop",
         "use",
         "alter",
-        "admin"
+        "grant",
+        "revoke",
+        "show",
+        "impersonate"
       ]
     }
   ],
@@ -194,5 +281,140 @@
   ],
   "policyConditions":
   [
-  ]
+  ],
+  "dataMaskDef": {
+    "accessTypes": [
+      {
+        "name": "select"
+      }
+    ],
+    "resources": [
+      {
+        "name": "catalog",
+        "matcherOptions": {
+          "wildCard": "true"
+        },
+        "lookupSupported": true,
+        "uiHint":"{ \"singleValue\":true }"
+      },
+      {
+        "name": "schema",
+        "matcherOptions": {
+          "wildCard": "true"
+        },
+        "lookupSupported": true,
+        "uiHint":"{ \"singleValue\":true }"
+      },
+      {
+        "name": "table",
+        "matcherOptions": {
+          "wildCard": "true"
+        },
+        "lookupSupported": true,
+        "uiHint":"{ \"singleValue\":true }"
+      },
+      {
+        "name": "column",
+        "matcherOptions": {
+          "wildCard": "true"
+        },
+        "lookupSupported": true,
+        "uiHint":"{ \"singleValue\":true }"
+      }
+    ],
+    "maskTypes": [
+      {
+        "itemId": 1,
+        "name": "MASK",
+        "label": "Redact",
+        "description": "Replace lowercase with 'x', uppercase with 'X', digits with '0'",
+        "transformer": "cast(regexp_replace(regexp_replace(regexp_replace({col},'([A-Z])', 'X'),'([a-z])','x'),'([0-9])','0') as {type})",
+        "dataMaskOptions": {
+        }
+      },
+      {
+        "itemId": 2,
+        "name": "MASK_SHOW_LAST_4",
+        "label": "Partial mask: show last 4",
+        "description": "Show last 4 characters; replace rest with 'X'",
+        "transformer": "cast(regexp_replace({col}, '(.*)(.{4}$)', x -> regexp_replace(x[1], '.', 'X') || x[2]) as {type})"
+      },
+      {
+        "itemId": 3,
+        "name": "MASK_SHOW_FIRST_4",
+        "label": "Partial mask: show first 4",
+        "description": "Show first 4 characters; replace rest with 'x'",
+        "transformer": "cast(regexp_replace({col}, '(^.{4})(.*)', x -> x[1] || regexp_replace(x[2], '.', 'X')) as {type})"
+      },
+      {
+        "itemId": 4,
+        "name": "MASK_HASH",
+        "label": "Hash",
+        "description": "Hash the value of a varchar with sha256",
+        "transformer": "cast(to_hex(sha256(to_utf8({col}))) as {type})"
+      },
+      {
+        "itemId": 5,
+        "name": "MASK_NULL",
+        "label": "Nullify",
+        "description": "Replace with NULL"
+      },
+      {
+        "itemId": 6,
+        "name": "MASK_NONE",
+        "label": "Unmasked (retain original value)",
+        "description": "No masking"
+      },
+      {
+        "itemId": 12,
+        "name": "MASK_DATE_SHOW_YEAR",
+        "label": "Date: show only year",
+        "description": "Date: show only year",
+        "transformer": "date_trunc('year', {col})"
+      },
+      {
+        "itemId": 13,
+        "name": "CUSTOM",
+        "label": "Custom",
+        "description": "Custom"
+      }
+    ]
+  },
+  "rowFilterDef": {
+    "accessTypes": [
+      {
+        "name": "select"
+      }
+    ],
+    "resources": [
+      {
+        "name": "catalog",
+        "matcherOptions": {
+          "wildCard": "true"
+        },
+        "lookupSupported": true,
+        "mandatory": true,
+        "uiHint": "{ \"singleValue\":true }"
+      },
+      {
+        "name": "schema",
+        "matcherOptions": {
+          "wildCard": "true"
+        },
+        "lookupSupported": true,
+        "mandatory": true,
+        "uiHint": "{ \"singleValue\":true }"
+      },
+      {
+        "name": "table",
+        "matcherOptions": {
+          "wildCard": "true"
+        },
+        "lookupSupported": true,
+        "mandatory": true,
+        "uiHint": "{ \"singleValue\":true }"
+      }
+    ]
+  }
+
 }
\ No newline at end of file
diff --git a/distro/src/main/assembly/plugin-presto.xml b/distro/src/main/assembly/plugin-presto.xml
index d2075bf..507944b 100644
--- a/distro/src/main/assembly/plugin-presto.xml
+++ b/distro/src/main/assembly/plugin-presto.xml
@@ -32,7 +32,7 @@
             </includes>
             <binaries>
                 <outputDirectory>lib</outputDirectory>
-                <includeDependencies>false</includeDependencies>
+                <includeDependencies>true</includeDependencies>
                 <unpack>false</unpack>
                 <directoryMode>755</directoryMode>
                 <fileMode>644</fileMode>
@@ -84,8 +84,13 @@
                     <include>commons-lang:commons-lang:jar:${commons.lang.version}</include>
                     <include>commons-logging:commons-logging:jar:${commons.logging.version}</include>
                     <include>com.google.guava:guava:jar:${google.guava.version}</include>
+                    <include>com.google.protobuf:protobuf-java:jar:${protobuf-java.version}</include>
+                    <include>com.google.re2j:re2j:jar:${presto.re2j.version}</include>
+                    <include>org.apache.commons:commons-configuration2:jar:${commons.configuration.version}</include>
                     <include>org.apache.hadoop:hadoop-common:jar:${hadoop.version}</include>
+                    <include>org.apache.hadoop:hadoop-common-plus:jar:${hadoop.version}</include>
                     <include>org.apache.hadoop:hadoop-auth:jar:${hadoop.version}</include>
+                    <include>org.apache.hadoop:hadoop-hdfs:jar:${hadoop.version}</include>
                     <include>org.apache.ranger:ranger-plugins-cred</include>
                     <include>org.apache.ranger:credentialbuilder</include>
                     <include>org.codehaus.woodstox:stax2-api</include>
diff --git a/plugin-presto/pom.xml b/plugin-presto/pom.xml
index b63f7de..61728b5 100644
--- a/plugin-presto/pom.xml
+++ b/plugin-presto/pom.xml
@@ -52,5 +52,20 @@
             <artifactId>ranger-plugins-audit</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
     </dependencies>
+    <build>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+                <includes>
+                    <include>**/*</include>
+                </includes>
+                <filtering>true</filtering>
+            </testResource>
+        </testResources>
+    </build>
 </project>
\ No newline at end of file
diff --git a/plugin-presto/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControl.java b/plugin-presto/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControl.java
index 3ab63f5..d4521a3 100644
--- a/plugin-presto/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControl.java
+++ b/plugin-presto/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControl.java
@@ -20,28 +20,34 @@ package org.apache.ranger.authorization.presto.authorizer;
 
 import io.prestosql.spi.connector.CatalogSchemaName;
 import io.prestosql.spi.connector.CatalogSchemaTableName;
+import io.prestosql.spi.connector.ColumnMetadata;
 import io.prestosql.spi.connector.SchemaTableName;
 import io.prestosql.spi.security.AccessDeniedException;
-import io.prestosql.spi.security.Identity;
 import io.prestosql.spi.security.PrestoPrincipal;
 import io.prestosql.spi.security.Privilege;
 import io.prestosql.spi.security.SystemAccessControl;
+import io.prestosql.spi.security.SystemSecurityContext;
+import io.prestosql.spi.security.ViewExpression;
+import io.prestosql.spi.type.Type;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
-import org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.URL;
 import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -52,20 +58,42 @@ import static java.util.Locale.ENGLISH;
 
 public class RangerSystemAccessControl
   implements SystemAccessControl {
+  private static Logger LOG = LoggerFactory.getLogger(RangerSystemAccessControl.class);
 
-  public static String RANGER_CONFIG_KEYTAB = "ranger.keytab";
-  public static String RANGER_CONFIG_PRINCIPAL = "ranger.principal";
-
-  public static String RANGER_PRESTO_SERVICETYPE = "presto";
-  public static String RANGER_PRESTO_APPID = "presto";
+  final public static String RANGER_CONFIG_KEYTAB = "ranger.keytab";
+  final public static String RANGER_CONFIG_PRINCIPAL = "ranger.principal";
+  final public static String RANGER_CONFIG_USE_UGI = "ranger.use_ugi";
+  final public static String RANGER_CONFIG_HADOOP_CONFIG = "ranger.hadoop_config";
+  final public static String RANGER_PRESTO_DEFAULT_HADOOP_CONF = "presto-ranger-site.xml";
+  final public static String RANGER_PRESTO_SERVICETYPE = "presto";
+  final public static String RANGER_PRESTO_APPID = "presto";
 
-  private static Logger LOG = LoggerFactory.getLogger(RangerSystemAccessControl.class);
+  final private RangerBasePlugin rangerPlugin;
 
-  private RangerBasePlugin rangerPlugin;
+  private boolean useUgi = false;
 
   public RangerSystemAccessControl(Map<String, String> config) {
     super();
 
+    Configuration hadoopConf = new Configuration();
+    if (config.get(RANGER_CONFIG_HADOOP_CONFIG) != null) {
+      URL url =  hadoopConf.getResource(config.get(RANGER_CONFIG_HADOOP_CONFIG));
+      if (url == null) {
+        LOG.warn("Hadoop config " + config.get(RANGER_CONFIG_HADOOP_CONFIG) + " not found");
+      } else {
+        hadoopConf.addResource(url);
+      }
+    } else {
+      URL url = hadoopConf.getResource(RANGER_PRESTO_DEFAULT_HADOOP_CONF);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to load Hadoop config from " + url + " (can be null)");
+      }
+      if (url != null) {
+        hadoopConf.addResource(url);
+      }
+    }
+    UserGroupInformation.setConfiguration(hadoopConf);
+
     if (config.get(RANGER_CONFIG_KEYTAB) != null && config.get(RANGER_CONFIG_PRINCIPAL) != null) {
       String keytab = config.get(RANGER_CONFIG_KEYTAB);
       String principal = config.get(RANGER_CONFIG_PRINCIPAL);
@@ -73,267 +101,562 @@ public class RangerSystemAccessControl
       LOG.info("Performing kerberos login with principal " + principal + " and keytab " + keytab);
 
       try {
-        UserGroupInformation.setConfiguration(new Configuration());
         UserGroupInformation.loginUserFromKeytab(principal, keytab);
       } catch (IOException ioe) {
         LOG.error("Kerberos login failed", ioe);
         throw new RuntimeException(ioe);
       }
     }
+
+    if (config.getOrDefault(RANGER_CONFIG_USE_UGI, "false").equalsIgnoreCase("true")) {
+      useUgi = true;
+    }
+
     rangerPlugin = new RangerBasePlugin(RANGER_PRESTO_SERVICETYPE, RANGER_PRESTO_APPID);
     rangerPlugin.init();
     rangerPlugin.setResultProcessor(new RangerDefaultAuditHandler());
   }
 
-  private boolean checkPermission(RangerPrestoResource resource, Identity identity, PrestoAccessType accessType) {
-    boolean ret = false;
 
-    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(identity.getUser());
+  /** FILTERING AND DATA MASKING **/
 
-    String[] groups = ugi != null ? ugi.getGroupNames() : null;
+  private RangerAccessResult getDataMaskResult(RangerPrestoAccessRequest request) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("==> getDataMaskResult(request=" + request + ")");
+    }
 
-    Set<String> userGroups = null;
-    if (groups != null && groups.length > 0) {
-      userGroups = new HashSet<>(Arrays.asList(groups));
+    RangerAccessResult ret = rangerPlugin.evalDataMaskPolicies(request, null);
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("<== getDataMaskResult(request=" + request + "): ret=" + ret);
     }
 
-    RangerPrestoAccessRequest request = new RangerPrestoAccessRequest(
-      resource,
-      identity.getUser(),
-      userGroups,
-      accessType
-    );
+    return ret;
+  }
 
-    RangerAccessResult result = rangerPlugin.isAccessAllowed(request);
-    if (result != null && result.getIsAllowed()) {
-      ret = true;
+  private RangerAccessResult getRowFilterResult(RangerPrestoAccessRequest request) {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("==> getRowFilterResult(request=" + request + ")");
+    }
+
+    RangerAccessResult ret = rangerPlugin.evalRowFilterPolicies(request, null);
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("<== getRowFilterResult(request=" + request + "): ret=" + ret);
     }
 
     return ret;
   }
 
+  private boolean isDataMaskEnabled(RangerAccessResult result) {
+    return result != null && result.isMaskEnabled();
+  }
+
+  private boolean isRowFilterEnabled(RangerAccessResult result) {
+    return result != null && result.isRowFilterEnabled();
+  }
+
   @Override
-  public void checkCanSetUser(Optional<Principal> principal, String userName) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("==> RangerSystemAccessControl.checkCanSetUser(" + userName + ")");
+  public Optional<ViewExpression> getRowFilter(SystemSecurityContext context, CatalogSchemaTableName tableName) {
+    RangerPrestoAccessRequest request = createAccessRequest(createResource(tableName), context, PrestoAccessType.SELECT);
+    RangerAccessResult result = getRowFilterResult(request);
+
+    ViewExpression viewExpression = null;
+    if (isRowFilterEnabled(result)) {
+      String filter = result.getFilterExpr();
+      viewExpression = new ViewExpression(
+        context.getIdentity().getUser(),
+        Optional.of(tableName.getCatalogName()),
+        Optional.of(tableName.getSchemaTableName().getSchemaName()),
+        filter
+      );
     }
+    return Optional.ofNullable(viewExpression);
+  }
+
+  @Override
+  public Optional<ViewExpression> getColumnMask(SystemSecurityContext context, CatalogSchemaTableName tableName, String columnName, Type type) {
+    RangerPrestoAccessRequest request = createAccessRequest(
+      createResource(tableName.getCatalogName(), tableName.getSchemaTableName().getSchemaName(),
+        tableName.getSchemaTableName().getTableName(), Optional.of(columnName)),
+      context, PrestoAccessType.SELECT);
+    RangerAccessResult result = getDataMaskResult(request);
+
+    ViewExpression viewExpression = null;
+    if (isDataMaskEnabled(result)) {
+      String                maskType    = result.getMaskType();
+      RangerServiceDef.RangerDataMaskTypeDef maskTypeDef = result.getMaskTypeDef();
+      String transformer	= null;
+
+      if (maskTypeDef != null) {
+        transformer = maskTypeDef.getTransformer();
+      }
+
+      if(StringUtils.equalsIgnoreCase(maskType, RangerPolicy.MASK_TYPE_NULL)) {
+        transformer = "NULL";
+      } else if(StringUtils.equalsIgnoreCase(maskType, RangerPolicy.MASK_TYPE_CUSTOM)) {
+        String maskedValue = result.getMaskedValue();
+
+        if(maskedValue == null) {
+          transformer = "NULL";
+        } else {
+          transformer = maskedValue;
+        }
+      }
 
-    /*
-    if (!principal.isPresent()) {
-      //AccessDeniedException.denySetUser(principal, userName);
-    }*/
+      if(StringUtils.isNotEmpty(transformer)) {
+        transformer = transformer.replace("{col}", columnName).replace("{type}", type.getDisplayName());
+      }
 
-    //AccessDeniedException.denySetUser(principal, userName);
+      viewExpression = new ViewExpression(
+        context.getIdentity().getUser(),
+        Optional.of(tableName.getCatalogName()),
+        Optional.of(tableName.getSchemaTableName().getSchemaName()),
+        transformer
+      );
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("getColumnMask: user: %s, catalog: %s, schema: %s, transformer: %s");
+      }
+
+    }
+
+    return Optional.ofNullable(viewExpression);
+  }
+
+  @Override
+  public Set<String> filterCatalogs(SystemSecurityContext context, Set<String> catalogs) {
+    LOG.debug("==> RangerSystemAccessControl.filterCatalogs("+ catalogs + ")");
+    Set<String> filteredCatalogs = new HashSet<>(catalogs.size());
+    for (String catalog: catalogs) {
+      if (hasPermission(createResource(catalog), context, PrestoAccessType.SELECT)) {
+        filteredCatalogs.add(catalog);
+      }
+    }
+    return filteredCatalogs;
+  }
+
+  @Override
+  public Set<String> filterSchemas(SystemSecurityContext context, String catalogName, Set<String> schemaNames) {
+    LOG.debug("==> RangerSystemAccessControl.filterSchemas(" + catalogName + ")");
+    Set<String> filteredSchemaNames = new HashSet<>(schemaNames.size());
+    for (String schemaName: schemaNames) {
+      if (hasPermission(createResource(catalogName, schemaName), context, PrestoAccessType.SELECT)) {
+        filteredSchemaNames.add(schemaName);
+      }
+    }
+    return filteredSchemaNames;
+  }
+
+  @Override
+  public Set<SchemaTableName> filterTables(SystemSecurityContext context, String catalogName, Set<SchemaTableName> tableNames) {
+    LOG.debug("==> RangerSystemAccessControl.filterTables(" + catalogName + ")");
+    Set<SchemaTableName> filteredTableNames = new HashSet<>(tableNames.size());
+    for (SchemaTableName tableName : tableNames) {
+      RangerPrestoResource res = createResource(catalogName, tableName.getSchemaName(), tableName.getTableName());
+      if (hasPermission(res, context, PrestoAccessType.SELECT)) {
+        filteredTableNames.add(tableName);
+      }
+    }
+    return filteredTableNames;
   }
 
+  /** PERMISSION CHECKS ORDERED BY SYSTEM, CATALOG, SCHEMA, TABLE, VIEW, COLUMN, QUERY **/
+
+  /** SYSTEM **/
+
   @Override
-  public void checkCanSetSystemSessionProperty(Identity identity, String propertyName) {
-    if (!checkPermission(new RangerPrestoResource(), identity, PrestoAccessType.ADMIN)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanSetSystemSessionProperty denied");
+  public void checkCanSetSystemSessionProperty(SystemSecurityContext context, String propertyName) {
+    if (!hasPermission(createSystemPropertyResource(propertyName), context, PrestoAccessType.ALTER)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanSetSystemSessionProperty denied");
       AccessDeniedException.denySetSystemSessionProperty(propertyName);
     }
   }
 
   @Override
-  public void checkCanAccessCatalog(Identity identity, String catalogName) {
-    if (!checkPermission(createResource(catalogName), identity, PrestoAccessType.SELECT)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanAccessCatalog(" + catalogName + ") denied");
+  public void checkCanImpersonateUser(SystemSecurityContext context, String userName) {
+    if (!hasPermission(createUserResource(userName), context, PrestoAccessType.IMPERSONATE)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanImpersonateUser(" + userName + ") denied");
+      AccessDeniedException.denyImpersonateUser(context.getIdentity().getUser(), userName);
+    }
+  }
+
+  @Override
+  public void checkCanSetUser(Optional<Principal> principal, String userName) {
+    // pass as it is deprecated
+  }
+
+  /** CATALOG **/
+  @Override
+  public void checkCanSetCatalogSessionProperty(SystemSecurityContext context, String catalogName, String propertyName) {
+    if (!hasPermission(createCatalogSessionResource(catalogName, propertyName), context, PrestoAccessType.ALTER)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanSetSystemSessionProperty(" + catalogName + ") denied");
+      AccessDeniedException.denySetCatalogSessionProperty(catalogName, propertyName);
+    }
+  }
+
+  @Override
+  public void checkCanShowRoles(SystemSecurityContext context, String catalogName) {
+    if (!hasPermission(createResource(catalogName), context, PrestoAccessType.SHOW)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanShowRoles(" + catalogName + ") denied");
+      AccessDeniedException.denyShowRoles(catalogName);
+    }
+  }
+
+
+  @Override
+  public void checkCanAccessCatalog(SystemSecurityContext context, String catalogName) {
+    if (!hasPermission(createResource(catalogName), context, PrestoAccessType.USE)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanAccessCatalog(" + catalogName + ") denied");
       AccessDeniedException.denyCatalogAccess(catalogName);
     }
   }
 
   @Override
-  public Set<String> filterCatalogs(Identity identity, Set<String> catalogs) {
-    return catalogs;
+  public void checkCanShowSchemas(SystemSecurityContext context, String catalogName) {
+    if (!hasPermission(createResource(catalogName), context, PrestoAccessType.SHOW)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanShowSchemas(" + catalogName + ") denied");
+      AccessDeniedException.denyShowSchemas(catalogName);
+    }
   }
 
+  /** SCHEMA **/
+
+  /**
+   * Create schema is evaluated on the level of the Catalog. This means that it is assumed you have permission
+   * to create a schema when you have create rights on the catalog level
+   */
   @Override
-  public void checkCanCreateSchema(Identity identity, CatalogSchemaName schema) {
-    if (!checkPermission(createResource(schema.getCatalogName(), schema.getSchemaName()), identity, PrestoAccessType.CREATE)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanCreateSchema(" + schema.getSchemaName() + ") denied");
+  public void checkCanCreateSchema(SystemSecurityContext context, CatalogSchemaName schema) {
+    if (!hasPermission(createResource(schema.getCatalogName()), context, PrestoAccessType.CREATE)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanCreateSchema(" + schema.getSchemaName() + ") denied");
       AccessDeniedException.denyCreateSchema(schema.getSchemaName());
     }
   }
 
+  /**
+   * This is evaluated against the schema name as ownership information is not available
+   */
   @Override
-  public void checkCanDropSchema(Identity identity, CatalogSchemaName schema) {
-    if (!checkPermission(createResource(schema.getCatalogName(), schema.getSchemaName()), identity, PrestoAccessType.DROP)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanDropSchema(" + schema.getSchemaName() + ") denied");
+  public void checkCanDropSchema(SystemSecurityContext context, CatalogSchemaName schema) {
+    if (!hasPermission(createResource(schema.getCatalogName(), schema.getSchemaName()), context, PrestoAccessType.DROP)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanDropSchema(" + schema.getSchemaName() + ") denied");
       AccessDeniedException.denyDropSchema(schema.getSchemaName());
     }
   }
 
+  /**
+   * This is evaluated against the schema name as ownership information is not available
+   */
   @Override
-  public void checkCanRenameSchema(Identity identity, CatalogSchemaName schema, String newSchemaName) {
+  public void checkCanRenameSchema(SystemSecurityContext context, CatalogSchemaName schema, String newSchemaName) {
     RangerPrestoResource res = createResource(schema.getCatalogName(), schema.getSchemaName());
-    if (!checkPermission(res, identity, PrestoAccessType.ALTER)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanRenameSchema(" + schema.getSchemaName() + ") denied");
+    if (!hasPermission(res, context, PrestoAccessType.ALTER)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanRenameSchema(" + schema.getSchemaName() + ") denied");
       AccessDeniedException.denyRenameSchema(schema.getSchemaName(), newSchemaName);
     }
   }
 
+  /** TABLE **/
+
   @Override
-  public void checkCanShowSchemas(Identity identity, String catalogName) {
-    if (!checkPermission(createResource(catalogName), identity, PrestoAccessType.SELECT)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanShowSchemas(" + catalogName + ") denied");
-      AccessDeniedException.denyShowSchemas(catalogName);
+  public void checkCanShowTables(SystemSecurityContext context, CatalogSchemaName schema) {
+    if (!hasPermission(createResource(schema), context, PrestoAccessType.SHOW)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanShowTables(" + schema.toString() + ") denied");
+      AccessDeniedException.denyShowTables(schema.toString());
     }
   }
 
+
   @Override
-  public Set<String> filterSchemas(Identity identity, String catalogName, Set<String> schemaNames) {
-    LOG.debug("==> RangerSystemAccessControl.filterSchemas(" + catalogName + ")");
-    return schemaNames;
+  public void checkCanShowCreateTable(SystemSecurityContext context, CatalogSchemaTableName table) {
+    if (!hasPermission(createResource(table), context, PrestoAccessType.SHOW)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanShowTables(" + table.toString() + ") denied");
+      AccessDeniedException.denyShowCreateTable(table.toString());
+    }
   }
 
+  /**
+   * Create table is verified on schema level
+   */
   @Override
-  public void checkCanCreateTable(Identity identity, CatalogSchemaTableName table) {
-    if (!checkPermission(createResource(table), identity, PrestoAccessType.CREATE)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanCreateTable(" + table.getSchemaTableName().getTableName() + ") denied");
+  public void checkCanCreateTable(SystemSecurityContext context, CatalogSchemaTableName table) {
+    if (!hasPermission(createResource(table.getCatalogName(), table.getSchemaTableName().getSchemaName()), context, PrestoAccessType.CREATE)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanCreateTable(" + table.getSchemaTableName().getTableName() + ") denied");
       AccessDeniedException.denyCreateTable(table.getSchemaTableName().getTableName());
     }
   }
 
+  /**
+   * This is evaluated against the table name as ownership information is not available
+   */
   @Override
-  public void checkCanDropTable(Identity identity, CatalogSchemaTableName table) {
-    if (!checkPermission(createResource(table), identity, PrestoAccessType.DROP)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanDropTable(" + table.getSchemaTableName().getTableName() + ") denied");
+  public void checkCanDropTable(SystemSecurityContext context, CatalogSchemaTableName table) {
+    if (!hasPermission(createResource(table), context, PrestoAccessType.DROP)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanDropTable(" + table.getSchemaTableName().getTableName() + ") denied");
       AccessDeniedException.denyDropTable(table.getSchemaTableName().getTableName());
     }
   }
 
+  /**
+   * This is evaluated against the table name as ownership information is not available
+   */
   @Override
-  public void checkCanRenameTable(Identity identity, CatalogSchemaTableName table, CatalogSchemaTableName newTable) {
+  public void checkCanRenameTable(SystemSecurityContext context, CatalogSchemaTableName table, CatalogSchemaTableName newTable) {
     RangerPrestoResource res = createResource(table);
-    if (!checkPermission(res, identity, PrestoAccessType.ALTER)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanRenameTable(" + table.getSchemaTableName().getTableName() + ") denied");
+    if (!hasPermission(res, context, PrestoAccessType.ALTER)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanRenameTable(" + table.getSchemaTableName().getTableName() + ") denied");
       AccessDeniedException.denyRenameTable(table.getSchemaTableName().getTableName(), newTable.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanShowTablesMetadata(Identity identity, CatalogSchemaName schema) {
-    if (!checkPermission(createResource(schema.getCatalogName(), schema.getSchemaName()), identity, PrestoAccessType.SELECT)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanShowTablesMetadata(" + schema.getSchemaName() + ") denied");
-      AccessDeniedException.denyShowTablesMetadata(schema.getSchemaName());
+  public void checkCanInsertIntoTable(SystemSecurityContext context, CatalogSchemaTableName table) {
+    RangerPrestoResource res = createResource(table);
+    if (!hasPermission(res, context, PrestoAccessType.INSERT)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanInsertIntoTable(" + table.getSchemaTableName().getTableName() + ") denied");
+      AccessDeniedException.denyInsertTable(table.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public Set<SchemaTableName> filterTables(Identity identity, String catalogName, Set<SchemaTableName> tableNames) {
-    LOG.debug("==> RangerSystemAccessControl.filterTables(" + catalogName + ")");
-    return tableNames;
+  public void checkCanDeleteFromTable(SystemSecurityContext context, CatalogSchemaTableName table) {
+    if (!hasPermission(createResource(table), context, PrestoAccessType.DELETE)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanDeleteFromTable(" + table.getSchemaTableName().getTableName() + ") denied");
+      AccessDeniedException.denyDeleteTable(table.getSchemaTableName().getTableName());
+    }
   }
 
   @Override
-  public void checkCanAddColumn(Identity identity, CatalogSchemaTableName table) {
-    RangerPrestoResource res = createResource(table);
-    if (!checkPermission(res, identity, PrestoAccessType.ALTER)) {
-      AccessDeniedException.denyAddColumn(table.getSchemaTableName().getTableName());
+  public void checkCanGrantTablePrivilege(SystemSecurityContext context, Privilege privilege, CatalogSchemaTableName table, PrestoPrincipal grantee, boolean withGrantOption) {
+    if (!hasPermission(createResource(table), context, PrestoAccessType.GRANT)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanGrantTablePrivilege(" + table + ") denied");
+      AccessDeniedException.denyGrantTablePrivilege(privilege.toString(), table.toString());
     }
   }
 
   @Override
-  public void checkCanDropColumn(Identity identity, CatalogSchemaTableName table) {
-    RangerPrestoResource res = createResource(table);
-    if (!checkPermission(res, identity, PrestoAccessType.ALTER)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanDropColumn(" + table.getSchemaTableName().getTableName() + ") denied");
-      AccessDeniedException.denyDropColumn(table.getSchemaTableName().getTableName());
+  public void checkCanRevokeTablePrivilege(SystemSecurityContext context, Privilege privilege, CatalogSchemaTableName table, PrestoPrincipal revokee, boolean grantOptionFor) {
+    if (!hasPermission(createResource(table), context, PrestoAccessType.REVOKE)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanRevokeTablePrivilege(" + table + ") denied");
+      AccessDeniedException.denyRevokeTablePrivilege(privilege.toString(), table.toString());
     }
   }
 
   @Override
-  public void checkCanRenameColumn(Identity identity, CatalogSchemaTableName table) {
-    RangerPrestoResource res = createResource(table);
-    if (!checkPermission(res, identity, PrestoAccessType.ALTER)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanRenameColumn(" + table.getSchemaTableName().getTableName() + ") denied");
-      AccessDeniedException.denyRenameColumn(table.getSchemaTableName().getTableName());
+  public void checkCanSetTableComment(SystemSecurityContext context, CatalogSchemaTableName table) {
+    if (!hasPermission(createResource(table), context, PrestoAccessType.ALTER)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanSetTableComment(" + table.toString() + ") denied");
+      AccessDeniedException.denyCommentTable(table.toString());
     }
   }
 
+  /**
+   * Create view is verified on schema level
+   */
   @Override
-  public void checkCanSelectFromColumns(Identity identity, CatalogSchemaTableName table, Set<String> columns) {
-    for (RangerPrestoResource res : createResource(table, columns)) {
-      if (!checkPermission(res, identity, PrestoAccessType.SELECT)) {
-        LOG.info("==> RangerSystemAccessControl.checkCanSelectFromColumns(" + table.getSchemaTableName().getTableName() + ") denied");
-        AccessDeniedException.denySelectColumns(table.getSchemaTableName().getTableName(), columns);
-      }
+  public void checkCanCreateView(SystemSecurityContext context, CatalogSchemaTableName view) {
+    if (!hasPermission(createResource(view.getCatalogName(), view.getSchemaTableName().getSchemaName()), context, PrestoAccessType.CREATE)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanCreateView(" + view.getSchemaTableName().getTableName() + ") denied");
+      AccessDeniedException.denyCreateView(view.getSchemaTableName().getTableName());
+    }
+  }
+
+  /**
+   * This is evaluated against the table name as ownership information is not available
+   */
+  @Override
+  public void checkCanDropView(SystemSecurityContext context, CatalogSchemaTableName view) {
+    if (!hasPermission(createResource(view), context, PrestoAccessType.DROP)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanDropView(" + view.getSchemaTableName().getTableName() + ") denied");
+      AccessDeniedException.denyCreateView(view.getSchemaTableName().getTableName());
     }
   }
 
+  /**
+   * This check equals the check for checkCanCreateView
+   */
   @Override
-  public void checkCanInsertIntoTable(Identity identity, CatalogSchemaTableName table) {
+  public void checkCanCreateViewWithSelectFromColumns(SystemSecurityContext context, CatalogSchemaTableName table, Set<String> columns) {
+    try {
+      checkCanCreateView(context, table);
+    } catch (AccessDeniedException ade) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanCreateViewWithSelectFromColumns(" + table.getSchemaTableName().getTableName() + ") denied");
+      AccessDeniedException.denyCreateViewWithSelect(table.getSchemaTableName().getTableName(), context.getIdentity());
+    }
+  }
+
+  /**
+   * This is evaluated against the table name as ownership information is not available
+   */
+  @Override
+  public void checkCanRenameView(SystemSecurityContext context, CatalogSchemaTableName view, CatalogSchemaTableName newView) {
+    if (!hasPermission(createResource(view), context, PrestoAccessType.ALTER)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanRenameView(" + view.toString() + ") denied");
+      AccessDeniedException.denyRenameView(view.toString(), newView.toString());
+    }
+  }
+
+  /** COLUMN **/
+
+  /**
+   * This is evaluated on table level
+   */
+  @Override
+  public void checkCanAddColumn(SystemSecurityContext context, CatalogSchemaTableName table) {
     RangerPrestoResource res = createResource(table);
-    if (!checkPermission(res, identity, PrestoAccessType.INSERT)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanInsertIntoTable(" + table.getSchemaTableName().getTableName() + ") denied");
-      AccessDeniedException.denyInsertTable(table.getSchemaTableName().getTableName());
+    if (!hasPermission(res, context, PrestoAccessType.ALTER)) {
+      AccessDeniedException.denyAddColumn(table.getSchemaTableName().getTableName());
     }
   }
 
+  /**
+   * This is evaluated on table level
+   */
   @Override
-  public void checkCanDeleteFromTable(Identity identity, CatalogSchemaTableName table) {
-    if (!checkPermission(createResource(table), identity, PrestoAccessType.DELETE)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanDeleteFromTable(" + table.getSchemaTableName().getTableName() + ") denied");
-      AccessDeniedException.denyDeleteTable(table.getSchemaTableName().getTableName());
+  public void checkCanDropColumn(SystemSecurityContext context, CatalogSchemaTableName table) {
+    RangerPrestoResource res = createResource(table);
+    if (!hasPermission(res, context, PrestoAccessType.DROP)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanDropColumn(" + table.getSchemaTableName().getTableName() + ") denied");
+      AccessDeniedException.denyDropColumn(table.getSchemaTableName().getTableName());
     }
   }
 
+  /**
+   * This is evaluated on table level
+   */
   @Override
-  public void checkCanCreateView(Identity identity, CatalogSchemaTableName view) {
-    if (!checkPermission(createResource(view), identity, PrestoAccessType.CREATE)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanCreateView(" + view.getSchemaTableName().getTableName() + ") denied");
-      AccessDeniedException.denyCreateView(view.getSchemaTableName().getTableName());
+  public void checkCanRenameColumn(SystemSecurityContext context, CatalogSchemaTableName table) {
+    RangerPrestoResource res = createResource(table);
+    if (!hasPermission(res, context, PrestoAccessType.ALTER)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanRenameColumn(" + table.getSchemaTableName().getTableName() + ") denied");
+      AccessDeniedException.denyRenameColumn(table.getSchemaTableName().getTableName());
     }
   }
 
+  /**
+   * This is evaluated on table level
+   */
   @Override
-  public void checkCanDropView(Identity identity, CatalogSchemaTableName view) {
-    if (!checkPermission(createResource(view), identity, PrestoAccessType.DROP)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanDropView(" + view.getSchemaTableName().getTableName() + ") denied");
-      AccessDeniedException.denyCreateView(view.getSchemaTableName().getTableName());
+  public void checkCanShowColumns(SystemSecurityContext context, CatalogSchemaTableName table) {
+    if (!hasPermission(createResource(table), context, PrestoAccessType.SHOW)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanShowTables(" + table.toString() + ") denied");
+      AccessDeniedException.denyShowColumns(table.toString());
     }
   }
 
   @Override
-  public void checkCanCreateViewWithSelectFromColumns(Identity identity, CatalogSchemaTableName table, Set<String> columns) {
+  public void checkCanSelectFromColumns(SystemSecurityContext context, CatalogSchemaTableName table, Set<String> columns) {
     for (RangerPrestoResource res : createResource(table, columns)) {
-      if (!checkPermission(res, identity, PrestoAccessType.CREATE)) {
-        LOG.info("==> RangerSystemAccessControl.checkCanDropView(" + table.getSchemaTableName().getTableName() + ") denied");
-        AccessDeniedException.denyCreateViewWithSelect(table.getSchemaTableName().getTableName(), identity);
+      if (!hasPermission(res, context, PrestoAccessType.SELECT)) {
+        LOG.debug("==> RangerSystemAccessControl.checkCanSelectFromColumns(" + table.getSchemaTableName().getTableName() + ") denied");
+        AccessDeniedException.denySelectColumns(table.getSchemaTableName().getTableName(), columns);
       }
     }
   }
 
+  /**
+   * This is a NOOP, no filtering is applied
+   */
   @Override
-  public void checkCanSetCatalogSessionProperty(Identity identity, String catalogName, String propertyName) {
-    if (!checkPermission(createResource(catalogName), identity, PrestoAccessType.ADMIN)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanSetSystemSessionProperty(" + catalogName + ") denied");
-      AccessDeniedException.denySetCatalogSessionProperty(catalogName, propertyName);
-    }
+  public List<ColumnMetadata> filterColumns(SystemSecurityContext context, CatalogSchemaTableName table, List<ColumnMetadata> columns) {
+    return columns;
   }
 
+  /** QUERY **/
+
+  /**
+   * This is a NOOP. Everyone can execute a query
+   * @param context
+   */
   @Override
-  public void checkCanGrantTablePrivilege(Identity identity, Privilege privilege, CatalogSchemaTableName table, PrestoPrincipal grantee, boolean withGrantOption) {
-    if (!checkPermission(createResource(table), identity, PrestoAccessType.ADMIN)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanGrantTablePrivilege(" + table + ") denied");
-      AccessDeniedException.denyGrantTablePrivilege(privilege.toString(), table.toString());
-    }
+  public void checkCanExecuteQuery(SystemSecurityContext context) {
   }
 
   @Override
-  public void checkCanRevokeTablePrivilege(Identity identity, Privilege privilege, CatalogSchemaTableName table, PrestoPrincipal revokee, boolean grantOptionFor) {
-    if (!checkPermission(createResource(table), identity, PrestoAccessType.ADMIN)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanRevokeTablePrivilege(" + table + ") denied");
-      AccessDeniedException.denyRevokeTablePrivilege(privilege.toString(), table.toString());
+  public void checkCanViewQueryOwnedBy(SystemSecurityContext context, String queryOwner) {
+    if (!hasPermission(createUserResource(queryOwner), context, PrestoAccessType.IMPERSONATE)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanViewQueryOwnedBy(" + queryOwner + ") denied");
+      AccessDeniedException.denyImpersonateUser(context.getIdentity().getUser(), queryOwner);
     }
   }
 
+  /**
+   * This is a NOOP, no filtering is applied
+   */
   @Override
-  public void checkCanShowRoles(Identity identity, String catalogName) {
-    if (!checkPermission(createResource(catalogName), identity, PrestoAccessType.ADMIN)) {
-      LOG.info("==> RangerSystemAccessControl.checkCanShowRoles(" + catalogName + ") denied");
-      AccessDeniedException.denyShowRoles(catalogName);
+  public Set<String> filterViewQueryOwnedBy(SystemSecurityContext context, Set<String> queryOwners) {
+    return queryOwners;
+  }
+
+  @Override
+  public void checkCanKillQueryOwnedBy(SystemSecurityContext context, String queryOwner) {
+    if (!hasPermission(createUserResource(queryOwner), context, PrestoAccessType.IMPERSONATE)) {
+      LOG.debug("==> RangerSystemAccessControl.checkCanKillQueryOwnedBy(" + queryOwner + ") denied");
+      AccessDeniedException.denyImpersonateUser(context.getIdentity().getUser(), queryOwner);
+    }
+  }
+
+
+  /** HELPER FUNCTIONS **/
+
+  private RangerPrestoAccessRequest createAccessRequest(RangerPrestoResource resource, SystemSecurityContext context, PrestoAccessType accessType) {
+    Set<String> userGroups = null;
+
+    if (useUgi) {
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(context.getIdentity().getUser());
+
+      String[] groups = ugi != null ? ugi.getGroupNames() : null;
+
+      if (groups != null && groups.length > 0) {
+        userGroups = new HashSet<>(Arrays.asList(groups));
+      }
+    } else {
+      userGroups = context.getIdentity().getGroups();
     }
+
+    RangerPrestoAccessRequest request = new RangerPrestoAccessRequest(
+      resource,
+      context.getIdentity().getUser(),
+      userGroups,
+      accessType
+    );
+
+    return request;
+  }
+
+  private boolean hasPermission(RangerPrestoResource resource, SystemSecurityContext context, PrestoAccessType accessType) {
+    boolean ret = false;
+
+    RangerPrestoAccessRequest request = createAccessRequest(resource, context, accessType);
+
+    RangerAccessResult result = rangerPlugin.isAccessAllowed(request);
+    if (result != null && result.getIsAllowed()) {
+      ret = true;
+    }
+
+    return ret;
+  }
+
+  private static RangerPrestoResource createUserResource(String userName) {
+    RangerPrestoResource res = new RangerPrestoResource();
+    res.setValue(RangerPrestoResource.KEY_USER, userName);
+
+    return res;
+  }
+
+  private static RangerPrestoResource createCatalogSessionResource(String catalogName, String propertyName) {
+    RangerPrestoResource res = new RangerPrestoResource();
+    res.setValue(RangerPrestoResource.KEY_CATALOG, catalogName);
+    res.setValue(RangerPrestoResource.KEY_SESSION_PROPERTY, propertyName);
+
+    return res;
+  }
+
+  private static RangerPrestoResource createSystemPropertyResource(String property) {
+    RangerPrestoResource res = new RangerPrestoResource();
+    res.setValue(RangerPrestoResource.KEY_SYSTEM_PROPERTY, property);
+
+    return res;
   }
 
   private static RangerPrestoResource createResource(CatalogSchemaName catalogSchemaName) {
@@ -389,8 +712,12 @@ class RangerPrestoResource
   public static final String KEY_SCHEMA = "schema";
   public static final String KEY_TABLE = "table";
   public static final String KEY_COLUMN = "column";
+  public static final String KEY_USER = "prestouser";
+  public static final String KEY_SYSTEM_PROPERTY = "systemproperty";
+  public static final String KEY_SESSION_PROPERTY = "sessionproperty";
 
-  public RangerPrestoResource() {}
+  public RangerPrestoResource() {
+  }
 
   public RangerPrestoResource(String catalogName, Optional<String> schema, Optional<String> table) {
     setValue(KEY_CATALOG, catalogName);
@@ -427,7 +754,9 @@ class RangerPrestoResource
     return (String) getValue(KEY_CATALOG);
   }
 
-  public String getSchema() { return (String) getValue(KEY_SCHEMA); }
+  public String getSchema() {
+    return (String) getValue(KEY_SCHEMA);
+  }
 
   public Optional<SchemaTableName> getSchemaTable() {
     final String schema = getSchema();
@@ -443,17 +772,12 @@ class RangerPrestoAccessRequest
   public RangerPrestoAccessRequest(RangerPrestoResource resource,
                                    String user,
                                    Set<String> userGroups,
-                                   PrestoAccessType prestoAccessType)
-
-  {
-    super(resource,
-      prestoAccessType == PrestoAccessType.USE ? RangerPolicyEngine.ANY_ACCESS :
-        prestoAccessType == PrestoAccessType.ADMIN ? RangerPolicyEngine.ADMIN_ACCESS :
-          prestoAccessType.name().toLowerCase(ENGLISH), user,
-      userGroups);
+                                   PrestoAccessType prestoAccessType) {
+    super(resource, prestoAccessType.name().toLowerCase(ENGLISH), user, userGroups);
+    setAccessTime(new Date());
   }
 }
 
 enum PrestoAccessType {
-  CREATE, DROP, SELECT, INSERT, DELETE, USE, ALTER, ALL, ADMIN;
+  CREATE, DROP, SELECT, INSERT, DELETE, USE, ALTER, ALL, GRANT, REVOKE, SHOW, IMPERSONATE;
 }
\ No newline at end of file
diff --git a/plugin-presto/src/test/java/org/apache/ranger/authorization/presto/authorizer/RangerAdminClientImpl.java b/plugin-presto/src/test/java/org/apache/ranger/authorization/presto/authorizer/RangerAdminClientImpl.java
new file mode 100644
index 0000000..bcfaef0
--- /dev/null
+++ b/plugin-presto/src/test/java/org/apache/ranger/authorization/presto/authorizer/RangerAdminClientImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.authorization.presto.authorizer;
+
+import org.apache.ranger.admin.client.AbstractRangerAdminClient;
+import org.apache.ranger.plugin.util.ServicePolicies;
+
+import java.io.File;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+
+public class RangerAdminClientImpl extends AbstractRangerAdminClient {
+  private final static String cacheFilename = "presto-policies.json";
+
+  public ServicePolicies getServicePoliciesIfUpdated(long lastKnownVersion, long lastActivationTimeInMillis) throws Exception {
+
+    String basedir = System.getProperty("basedir");
+    if (basedir == null) {
+      basedir = new File(".").getCanonicalPath();
+    }
+
+    java.nio.file.Path cachePath = FileSystems.getDefault().getPath(basedir, "/src/test/resources/" + cacheFilename);
+    byte[] cacheBytes = Files.readAllBytes(cachePath);
+
+    return gson.fromJson(new String(cacheBytes), ServicePolicies.class);
+  }
+
+}
\ No newline at end of file
diff --git a/plugin-presto/src/test/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControlTest.java b/plugin-presto/src/test/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControlTest.java
new file mode 100644
index 0000000..c00d519
--- /dev/null
+++ b/plugin-presto/src/test/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControlTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.authorization.presto.authorizer;
+
+import com.google.common.collect.ImmutableSet;
+import io.prestosql.spi.connector.CatalogSchemaName;
+import io.prestosql.spi.connector.CatalogSchemaTableName;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.security.AccessDeniedException;
+import io.prestosql.spi.security.Identity;
+import io.prestosql.spi.security.PrestoPrincipal;
+import io.prestosql.spi.security.SystemSecurityContext;
+
+import static io.prestosql.spi.security.PrincipalType.USER;
+import static io.prestosql.spi.security.Privilege.SELECT;
+import static org.junit.Assert.*;
+
+import io.prestosql.spi.security.ViewExpression;
+import io.prestosql.spi.type.VarcharType;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.security.auth.kerberos.KerberosPrincipal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class RangerSystemAccessControlTest {
+  static RangerSystemAccessControl accessControlManager = null;
+
+  private static final Identity alice = Identity.ofUser("alice");
+  private static final Identity admin = Identity.ofUser("admin");
+  //private static final Identity aliceWithGroups = Identity.from(alice).withGroups(new HashSet(Arrays.asList("users", "friends"))).build();
+  //private static final Identity kerberosValidAlice = Identity.from(alice).withPrincipal(new KerberosPrincipal("alice/example.com@EXAMPLE.COM")).build();
+  //private static final Identity kerberosValidNonAsciiUser = Identity.forUser("\u0194\u0194\u0194").withPrincipal(new KerberosPrincipal("\u0194\u0194\u0194/example.com@EXAMPLE.COM")).build();
+  private static final Identity kerberosInvalidAlice = Identity.from(alice).withPrincipal(new KerberosPrincipal("mallory/example.com@EXAMPLE.COM")).build();
+  private static final Identity bob = Identity.ofUser("bob");
+  //private static final Identity nonAsciiUser = Identity.ofUser("\u0194\u0194\u0194");
+
+  private static final Set<String> allCatalogs = ImmutableSet.of("open-to-all", "all-allowed", "alice-catalog");
+  private static final Set<String> queryOwners = ImmutableSet.of("bob", "alice", "frank");
+  private static final String aliceCatalog = "alice-catalog";
+  private static final CatalogSchemaName aliceSchema = new CatalogSchemaName("alice-catalog", "schema");
+  private static final CatalogSchemaTableName aliceTable = new CatalogSchemaTableName("alice-catalog", "schema","table");
+  private static final CatalogSchemaTableName aliceView = new CatalogSchemaTableName("alice-catalog", "schema","view");
+
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Map<String, String> config = new HashMap<>();
+    accessControlManager = new RangerSystemAccessControl(config);
+  }
+
+  @Test
+  @SuppressWarnings("PMD")
+  public void testCanSetUserOperations() {
+    try {
+      accessControlManager.checkCanImpersonateUser(context(alice), bob.getUser());
+      throw new AssertionError("expected AccessDeniedExeption");
+    }
+    catch (AccessDeniedException expected) {
+    }
+
+    accessControlManager.checkCanImpersonateUser(context(admin), bob.getUser());
+
+    try {
+      accessControlManager.checkCanImpersonateUser(context(kerberosInvalidAlice), bob.getUser());
+      throw new AssertionError("expected AccessDeniedExeption");
+    }
+    catch (AccessDeniedException expected) {
+    }
+
+  }
+
+  @Test
+  public void testCatalogOperations()
+  {
+    assertEquals(accessControlManager.filterCatalogs(context(alice), allCatalogs), allCatalogs);
+    Set<String> bobCatalogs = ImmutableSet.of("open-to-all", "all-allowed");
+    assertEquals(accessControlManager.filterCatalogs(context(bob), allCatalogs), bobCatalogs);
+    //Set<String> nonAsciiUserCatalogs = ImmutableSet.of("open-to-all", "all-allowed", "\u0200\u0200\u0200");
+    //assertEquals(accessControlManager.filterCatalogs(context(nonAsciiUser), allCatalogs), nonAsciiUserCatalogs);
+  }
+
+  @Test
+  @SuppressWarnings("PMD")
+  public void testSchemaOperations()
+  {
+
+    Set<String> aliceSchemas = ImmutableSet.of("schema");
+    assertEquals(accessControlManager.filterSchemas(context(alice), aliceCatalog, aliceSchemas), aliceSchemas);
+    assertEquals(accessControlManager.filterSchemas(context(bob), "alice-catalog", aliceSchemas), ImmutableSet.of());
+
+    accessControlManager.checkCanCreateSchema(context(alice), aliceSchema);
+    accessControlManager.checkCanDropSchema(context(alice), aliceSchema);
+    accessControlManager.checkCanRenameSchema(context(alice), aliceSchema, "new-schema");
+    accessControlManager.checkCanShowSchemas(context(alice), aliceCatalog);
+
+    try {
+      accessControlManager.checkCanCreateSchema(context(bob), aliceSchema);
+    } catch (AccessDeniedException expected) {
+    }
+  }
+
+  @Test
+  @SuppressWarnings("PMD")
+  public void testTableOperations()
+  {
+    Set<SchemaTableName> aliceTables = ImmutableSet.of(new SchemaTableName("schema", "table"));
+    assertEquals(accessControlManager.filterTables(context(alice), aliceCatalog, aliceTables), aliceTables);
+    assertEquals(accessControlManager.filterTables(context(bob), "alice-catalog", aliceTables), ImmutableSet.of());
+
+    accessControlManager.checkCanCreateTable(context(alice), aliceTable);
+    accessControlManager.checkCanDropTable(context(alice), aliceTable);
+    accessControlManager.checkCanSelectFromColumns(context(alice), aliceTable, ImmutableSet.of());
+    accessControlManager.checkCanInsertIntoTable(context(alice), aliceTable);
+    accessControlManager.checkCanDeleteFromTable(context(alice), aliceTable);
+    accessControlManager.checkCanRenameColumn(context(alice), aliceTable);
+
+
+    try {
+      accessControlManager.checkCanCreateTable(context(bob), aliceTable);
+    } catch (AccessDeniedException expected) {
+    }
+  }
+
+  @Test
+  @SuppressWarnings("PMD")
+  public void testViewOperations()
+  {
+    accessControlManager.checkCanCreateView(context(alice), aliceView);
+    accessControlManager.checkCanDropView(context(alice), aliceView);
+    accessControlManager.checkCanSelectFromColumns(context(alice), aliceView, ImmutableSet.of());
+    accessControlManager.checkCanCreateViewWithSelectFromColumns(context(alice), aliceTable, ImmutableSet.of());
+    accessControlManager.checkCanCreateViewWithSelectFromColumns(context(alice), aliceView, ImmutableSet.of());
+    accessControlManager.checkCanSetCatalogSessionProperty(context(alice), aliceCatalog, "property");
+    accessControlManager.checkCanGrantTablePrivilege(context(alice), SELECT, aliceTable, new PrestoPrincipal(USER, "grantee"), true);
+    accessControlManager.checkCanRevokeTablePrivilege(context(alice), SELECT, aliceTable, new PrestoPrincipal(USER, "revokee"), true);
+
+    try {
+      accessControlManager.checkCanCreateView(context(bob), aliceView);
+    } catch (AccessDeniedException expected) {
+    }
+  }
+
+  @Test
+  @SuppressWarnings("PMD")
+  public void testMisc()
+  {
+    assertEquals(accessControlManager.filterViewQueryOwnedBy(context(alice), queryOwners), queryOwners);
+
+    // check {type} / {col} replacement
+    final VarcharType varcharType = VarcharType.createVarcharType(20);
+
+    Optional<ViewExpression> ret = accessControlManager.getColumnMask(context(alice), aliceTable, "cast_me", varcharType);
+    assertNotNull(ret.get());
+    assertEquals(ret.get().getExpression(), "cast cast_me as varchar(20)");
+
+    ret = accessControlManager.getColumnMask(context(alice), aliceTable,"do-not-cast-me", varcharType);
+    assertFalse(ret.isPresent());
+
+    ret = accessControlManager.getRowFilter(context(alice), aliceTable);
+    assertFalse(ret.isPresent());
+  }
+
+  private SystemSecurityContext context(Identity id) {
+    return new SystemSecurityContext(id);
+  }
+}
\ No newline at end of file
diff --git a/plugin-presto/src/test/resources/log4j.properties b/plugin-presto/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fd5ca2a
--- /dev/null
+++ b/plugin-presto/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{10}:%L - %m%n
diff --git a/plugin-presto/src/test/resources/presto-policies.json b/plugin-presto/src/test/resources/presto-policies.json
new file mode 100644
index 0000000..28eabf2
--- /dev/null
+++ b/plugin-presto/src/test/resources/presto-policies.json
@@ -0,0 +1,1040 @@
+{
+  "serviceName": "cl1_presto",
+  "serviceId": 16,
+  "policyUpdateTime": "20180304-09:49:38.000-+0000",
+  "policies": [
+    {
+      "service": "cl1_presto",
+      "name": "checkCanImpersonateUser",
+      "policyType": 0,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "prestouser": {
+          "values": [
+            "bob"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "impersonate",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "admin"
+          ],
+          "groups": [],
+          "roles": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "serviceType": "presto",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [],
+      "zoneName": "",
+      "isDenyAllElse": false,
+      "id": 51,
+      "guid": "7ab96b62-6fd3-4193-bf49-af462c25784d",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "cl1_presto",
+      "name": "alice-schema",
+      "policyType": 0,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "schema": {
+          "values": [
+            "schema"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "catalog": {
+          "values": [
+            "alice-catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            },
+            {
+              "type": "insert",
+              "isAllowed": true
+            },
+            {
+              "type": "create",
+              "isAllowed": true
+            },
+            {
+              "type": "drop",
+              "isAllowed": true
+            },
+            {
+              "type": "alter",
+              "isAllowed": true
+            },
+            {
+              "type": "show",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "alice"
+          ],
+          "groups": [],
+          "roles": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "serviceType": "presto",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [],
+      "zoneName": "",
+      "isDenyAllElse": false,
+      "id": 52,
+      "guid": "11b10138-34c3-4e11-8beb-56a10334a375",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "cl1_presto",
+      "name": "alice-catalog",
+      "policyType": 0,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "catalog": {
+          "values": [
+            "alice-catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            },
+            {
+              "type": "insert",
+              "isAllowed": true
+            },
+            {
+              "type": "create",
+              "isAllowed": true
+            },
+            {
+              "type": "drop",
+              "isAllowed": true
+            },
+            {
+              "type": "use",
+              "isAllowed": true
+            },
+            {
+              "type": "alter",
+              "isAllowed": true
+            },
+            {
+              "type": "show",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "alice"
+          ],
+          "groups": [],
+          "roles": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "serviceType": "presto",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [],
+      "zoneName": "",
+      "isDenyAllElse": false,
+      "id": 53,
+      "guid": "60207d91-7fd7-424e-8e6f-88d803297b6a",
+      "isEnabled": true,
+      "version": 2
+    },
+    {
+      "service": "cl1_presto",
+      "name": "alice-table",
+      "policyType": 0,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "schema": {
+          "values": [
+            "schema"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "catalog": {
+          "values": [
+            "alice-catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "table",
+            "alice"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            },
+            {
+              "type": "insert",
+              "isAllowed": true
+            },
+            {
+              "type": "drop",
+              "isAllowed": true
+            },
+            {
+              "type": "delete",
+              "isAllowed": true
+            },
+            {
+              "type": "alter",
+              "isAllowed": true
+            },
+            {
+              "type": "grant",
+              "isAllowed": true
+            },
+            {
+              "type": "revoke",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "alice"
+          ],
+          "groups": [],
+          "roles": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "serviceType": "presto",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [],
+      "zoneName": "",
+      "isDenyAllElse": false,
+      "id": 55,
+      "guid": "b47e1c19-a05f-41f8-94ef-f86c14076ad9",
+      "isEnabled": true,
+      "version": 2
+    },
+    {
+      "service": "cl1_presto",
+      "name": "alice-view",
+      "policyType": 0,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "schema": {
+          "values": [
+            "schema"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "catalog": {
+          "values": [
+            "alice-catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "view"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            },
+            {
+              "type": "drop",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "alice"
+          ],
+          "groups": [],
+          "roles": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "serviceType": "presto",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [],
+      "zoneName": "",
+      "isDenyAllElse": false,
+      "id": 56,
+      "guid": "91335d40-0bcf-4515-89ed-74531df970c7",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "cl1_presto",
+      "name": "alice-session-property",
+      "policyType": 0,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "sessionproperty": {
+          "values": [
+            "property"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "catalog": {
+          "values": [
+            "alice-catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "show",
+              "isAllowed": true
+            },
+            {
+              "type": "alter",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "alice"
+          ],
+          "groups": [],
+          "roles": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "serviceType": "presto",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [],
+      "zoneName": "",
+      "isDenyAllElse": false,
+      "id": 57,
+      "guid": "b5a30229-89b2-456a-83d4-7d64e8b8b6bf",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "cl1_presto",
+      "name": "open-to-all",
+      "policyType": 0,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "catalog": {
+          "values": [
+            "open-to-all",
+            "all-allowed"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "{USER}"
+          ],
+          "groups": [],
+          "roles": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "serviceType": "presto",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [],
+      "zoneName": "",
+      "isDenyAllElse": false,
+      "id": 67,
+      "guid": "370d3e54-0428-4fcb-b0b5-ad1f5dfdd7db",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "cl1_presto",
+      "name": "test-mask",
+      "policyType": 1,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "schema": {
+          "values": [
+            "schema"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "catalog": {
+          "values": [
+            "alice-catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "column": {
+          "values": [
+            "only_first_4"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "table"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [
+        {
+          "dataMaskInfo": {
+            "dataMaskType": "MASK_SHOW_FIRST_4"
+          },
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "{USER}"
+          ],
+          "groups": [],
+          "roles": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "rowFilterPolicyItems": [],
+      "serviceType": "presto",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [],
+      "zoneName": "",
+      "isDenyAllElse": false,
+      "id": 68,
+      "guid": "6ec12d33-4d5d-46f0-9a05-b31d78281b02",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "cl1_presto",
+      "name": "test-mask-cast",
+      "policyType": 1,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "schema": {
+          "values": [
+            "schema"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "catalog": {
+          "values": [
+            "alice-catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "column": {
+          "values": [
+            "cast_me"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "table"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [
+        {
+          "dataMaskInfo": {
+            "dataMaskType": "CUSTOM",
+            "valueExpr": "cast {col} as {type}"
+          },
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "{USER}"
+          ],
+          "groups": [],
+          "roles": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "rowFilterPolicyItems": [],
+      "serviceType": "presto",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [],
+      "zoneName": "",
+      "isDenyAllElse": false,
+      "id": 69,
+      "guid": "50e855fb-8dc2-42cd-99d3-16e8df8de774",
+      "isEnabled": true,
+      "version": 1
+    }
+  ],
+  "startIndex": 0,
+  "pageSize": 0,
+  "totalCount": 0,
+  "resultSize": 0,
+  "queryTimeMS": 1585212824007,
+  "serviceDef": {
+    "id": 17,
+    "name": "presto",
+    "displayName": "presto",
+    "implClass": "org.apache.ranger.services.presto.RangerServicePresto",
+    "label": "Presto",
+    "description": "Presto",
+    "guid": "379a9fe5-1b6e-4091-a584-4890e245e6c1",
+    "resources": [
+      {
+        "itemId": 1,
+        "name": "catalog",
+        "type": "string",
+        "level": 10,
+        "parent": "",
+        "mandatory": true,
+        "isValidLeaf": true,
+        "lookupSupported": true,
+        "recursiveSupported": false,
+        "excludesSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "label": "Presto Catalog",
+        "description": "Presto Catalog"
+      },
+      {
+        "itemId": 2,
+        "name": "schema",
+        "type": "string",
+        "level": 20,
+        "parent": "catalog",
+        "mandatory": true,
+        "isValidLeaf": true,
+        "lookupSupported": true,
+        "recursiveSupported": false,
+        "excludesSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "label": "Presto Schema",
+        "description": "Presto Schema"
+      },
+      {
+        "itemId": 3,
+        "name": "table",
+        "type": "string",
+        "level": 30,
+        "parent": "schema",
+        "mandatory": true,
+        "isValidLeaf": true,
+        "lookupSupported": true,
+        "recursiveSupported": false,
+        "excludesSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "label": "Presto Table",
+        "description": "Presto Table"
+      },
+      {
+        "itemId": 4,
+        "name": "column",
+        "type": "string",
+        "level": 40,
+        "parent": "table",
+        "mandatory": true,
+        "lookupSupported": true,
+        "recursiveSupported": false,
+        "excludesSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "label": "Presto Column",
+        "description": "Presto Column"
+      },
+      {
+        "itemId": 5,
+        "name": "prestouser",
+        "type": "string",
+        "level": 10,
+        "parent": "",
+        "mandatory": true,
+        "lookupSupported": false,
+        "recursiveSupported": false,
+        "excludesSupported": false,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "label": "Presto User",
+        "description": "Presto User"
+      },
+      {
+        "itemId": 6,
+        "name": "systemproperty",
+        "type": "string",
+        "level": 10,
+        "parent": "",
+        "mandatory": true,
+        "lookupSupported": false,
+        "recursiveSupported": false,
+        "excludesSupported": false,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "label": "System Property",
+        "description": "Presto System Property"
+      },
+      {
+        "itemId": 7,
+        "name": "sessionproperty",
+        "type": "string",
+        "level": 20,
+        "parent": "catalog",
+        "mandatory": true,
+        "lookupSupported": false,
+        "recursiveSupported": false,
+        "excludesSupported": false,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "label": "Catalog Session Property",
+        "description": "Presto Catalog Session Property"
+      }
+    ],
+    "accessTypes": [
+      {
+        "itemId": 1,
+        "name": "select",
+        "label": "Select"
+      },
+      {
+        "itemId": 2,
+        "name": "insert",
+        "label": "Insert"
+      },
+      {
+        "itemId": 3,
+        "name": "create",
+        "label": "Create"
+      },
+      {
+        "itemId": 4,
+        "name": "drop",
+        "label": "Drop"
+      },
+      {
+        "itemId": 5,
+        "name": "delete",
+        "label": "Delete"
+      },
+      {
+        "itemId": 6,
+        "name": "use",
+        "label": "Use"
+      },
+      {
+        "itemId": 7,
+        "name": "alter",
+        "label": "Alter"
+      },
+      {
+        "itemId": 8,
+        "name": "grant",
+        "label": "Grant"
+      },
+      {
+        "itemId": 9,
+        "name": "revoke",
+        "label": "Revoke"
+      },
+      {
+        "itemId": 10,
+        "name": "show",
+        "label": "Show"
+      },
+      {
+        "itemId": 11,
+        "name": "impersonate",
+        "label": "Impersonate"
+      },
+      {
+        "itemId": 12,
+        "name": "all",
+        "label": "All",
+        "impliedGrants": [
+          "select",
+          "insert",
+          "create",
+          "delete",
+          "drop",
+          "use",
+          "alter",
+          "grant",
+          "revoke",
+          "show",
+          "impersonate"
+        ]
+      }
+    ],
+    "configs": [
+      {
+        "itemId": 1,
+        "name": "username",
+        "type": "string",
+        "mandatory": true,
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "label": "Username"
+      },
+      {
+        "itemId": 2,
+        "name": "password",
+        "type": "password",
+        "mandatory": false,
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "label": "Password"
+      },
+      {
+        "itemId": 3,
+        "name": "jdbc.driverClassName",
+        "type": "string",
+        "mandatory": true,
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": "",
+        "defaultValue": "io.prestosql.jdbc.PrestoDriver"
+      },
+      {
+        "itemId": 4,
+        "name": "jdbc.url",
+        "type": "string",
+        "mandatory": true,
+        "defaultValue": "",
+        "validationRegEx": "",
+        "validationMessage": "",
+        "uiHint": ""
+      }
+    ],
+    "enums": [
+    ],
+    "contextEnrichers": [
+    ],
+    "policyConditions": [
+    ],
+    "dataMaskDef": {
+      "accessTypes": [
+        {
+          "name": "select"
+        }
+      ],
+      "resources": [
+        {
+          "name": "catalog",
+          "matcherOptions": {
+            "wildCard": "false"
+          },
+          "lookupSupported": true,
+          "uiHint": "{ \"singleValue\":true }"
+        },
+        {
+          "name": "schema",
+          "matcherOptions": {
+            "wildCard": "false"
+          },
+          "lookupSupported": true,
+          "uiHint": "{ \"singleValue\":true }"
+        },
+        {
+          "name": "table",
+          "matcherOptions": {
+            "wildCard": "false"
+          },
+          "lookupSupported": true,
+          "uiHint": "{ \"singleValue\":true }"
+        },
+        {
+          "name": "column",
+          "matcherOptions": {
+            "wildCard": "false"
+          },
+          "lookupSupported": true,
+          "uiHint": "{ \"singleValue\":true }"
+        }
+      ],
+      "maskTypes": [
+        {
+          "itemId": 1,
+          "name": "MASK",
+          "label": "Redact",
+          "description": "Replace lowercase with 'x', uppercase with 'X', digits with '0'",
+          "transformer": "cast(regexp_replace(regexp_replace(regexp_replace({col},'([A-Z])', 'X'),'([a-z])','x'),'([0-9])','0') as {type}",
+          "dataMaskOptions": {
+          }
+        },
+        {
+          "itemId": 2,
+          "name": "MASK_SHOW_LAST_4",
+          "label": "Partial mask: show last 4",
+          "description": "Show last 4 characters; replace rest with 'X'",
+          "transformer": "cast(regexp_replace({col}, '(.*)(.{4}$)', x -> regexp_replace(x[1], '.', 'X') || x[2]) as {type})"
+        },
+        {
+          "itemId": 3,
+          "name": "MASK_SHOW_FIRST_4",
+          "label": "Partial mask: show first 4",
+          "description": "Show first 4 characters; replace rest with 'x'",
+          "transformer": "cast(regexp_replace({col}, '(^.{4})(.*)', x -> x[1] || regexp_replace(x[2], '.', 'X')) as {type})"
+        },
+        {
+          "itemId": 4,
+          "name": "MASK_HASH",
+          "label": "Hash",
+          "description": "Hash the value of a varchar with sha256",
+          "transformer": "cast(to_hex(sha256(to_utf8({col}))) as {type})"
+        },
+        {
+          "itemId": 5,
+          "name": "MASK_NULL",
+          "label": "Nullify",
+          "description": "Replace with NULL"
+        },
+        {
+          "itemId": 6,
+          "name": "MASK_NONE",
+          "label": "Unmasked (retain original value)",
+          "description": "No masking"
+        },
+        {
+          "itemId": 12,
+          "name": "MASK_DATE_SHOW_YEAR",
+          "label": "Date: show only year",
+          "description": "Date: show only year",
+          "transformer": "date_trunc('year', {col})"
+        },
+        {
+          "itemId": 13,
+          "name": "CUSTOM",
+          "label": "Custom",
+          "description": "Custom"
+        }
+      ]
+    },
+    "rowFilterDef": {
+      "accessTypes": [
+        {
+          "name": "select"
+        }
+      ],
+      "resources": [
+        {
+          "name": "catalog",
+          "matcherOptions": {
+            "wildCard": "false"
+          },
+          "lookupSupported": true,
+          "mandatory": true,
+          "uiHint": "{ \"singleValue\":true }"
+        },
+        {
+          "name": "schema",
+          "matcherOptions": {
+            "wildCard": "false"
+          },
+          "lookupSupported": true,
+          "mandatory": true,
+          "uiHint": "{ \"singleValue\":true }"
+        },
+        {
+          "name": "table",
+          "matcherOptions": {
+            "wildCard": "false"
+          },
+          "lookupSupported": true,
+          "mandatory": true,
+          "uiHint": "{ \"singleValue\":true }"
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/plugin-presto/src/test/resources/ranger-presto-security.xml b/plugin-presto/src/test/resources/ranger-presto-security.xml
new file mode 100644
index 0000000..8a1923d
--- /dev/null
+++ b/plugin-presto/src/test/resources/ranger-presto-security.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
+    <property>
+        <name>ranger.plugin.presto.service.name</name>
+        <value>cl1_presto</value>
+        <description>
+            Name of the Ranger service containing policies for this SampleApp instance
+        </description>
+    </property>
+
+    <property>
+        <name>ranger.plugin.presto.policy.source.impl</name>
+        <value>org.apache.ranger.authorization.presto.authorizer.RangerAdminClientImpl</value>
+        <description>
+            Policy source.
+        </description>
+    </property>
+
+    <property>
+        <name>ranger.plugin.presto.policy.pollIntervalMs</name>
+        <value>30000</value>
+        <description>
+            How often to poll for changes in policies?
+        </description>
+    </property>
+
+    <property>
+        <name>ranger.plugin.presto.policy.cache.dir</name>
+        <value>${project.build.directory}</value>
+        <description>
+            Directory where Ranger policies are cached after successful retrieval from the source
+        </description>
+    </property>
+
+</configuration>
diff --git a/pom.xml b/pom.xml
index 992c365..c2931e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,7 +168,7 @@
         <noggit.version>0.8</noggit.version>
         <owasp-java-html-sanitizer.version>r239</owasp-java-html-sanitizer.version>
         <paranamer.version>2.3</paranamer.version>
-        <presto.version>310</presto.version>
+        <presto.version>331</presto.version>
         <poi.version>3.17</poi.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <protobuf-java.version>2.5.0</protobuf-java.version>
@@ -197,11 +197,12 @@
         <jna.version>5.2.0</jna.version>
         <jna-platform.version>5.2.0</jna-platform.version>
         <!-- presto plugin deps -->
-        <presto.airlift.version>0.178</presto.airlift.version>
+        <presto.airlift.version>0.192</presto.airlift.version>
         <presto.bval-jsr.version>2.0.0</presto.bval-jsr.version>
         <presto.guice.version>4.2.2</presto.guice.version>
         <presto.guava.version>26.0-jre</presto.guava.version>
         <presto.validation-api.version>2.0.1.Final</presto.validation-api.version>
+        <presto.re2j.version>1.1</presto.re2j.version>
 
     <!-- Azure Key Vault dependencies -->
         <com.microsoft.azure.version>1.22.0</com.microsoft.azure.version>
diff --git a/ranger-presto-plugin-shim/pom.xml b/ranger-presto-plugin-shim/pom.xml
index d8ff88d..5c0f897 100644
--- a/ranger-presto-plugin-shim/pom.xml
+++ b/ranger-presto-plugin-shim/pom.xml
@@ -90,6 +90,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpcore</artifactId>
             <version>${httpcomponents.httpcore.version}</version>
@@ -125,5 +131,11 @@
             <version>${kstruct.gethostname4j.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf-java.version}</version>
+        </dependency>
+
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/ranger-presto-plugin-shim/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerConfig.java b/ranger-presto-plugin-shim/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerConfig.java
index 67b0d24..e0ab0f3 100644
--- a/ranger-presto-plugin-shim/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerConfig.java
+++ b/ranger-presto-plugin-shim/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerConfig.java
@@ -24,6 +24,8 @@ import io.airlift.configuration.ConfigDescription;
 public class RangerConfig {
   private String keytab;
   private String principal;
+  private boolean useUgi = false;
+  private String hadoopConfigPath;
 
   public String getKeytab() { return keytab; }
 
@@ -44,4 +46,24 @@ public class RangerConfig {
     this.principal = principal;
     return this;
   }
+
+  public boolean isUseUgi() { return useUgi; }
+
+  @Config("ranger.use_ugi")
+  @ConfigDescription("Use Hadoop User Group Information instead of Presto groups")
+  @SuppressWarnings("unused")
+  public RangerConfig setUseUgi(boolean useUgi) {
+    this.useUgi = useUgi;
+    return this;
+  }
+
+  @Config("ranger.hadoop_config")
+  @ConfigDescription("Path to hadoop configuration. Defaults to presto-ranger-site.xml in classpath")
+  @SuppressWarnings("unused")
+  public RangerConfig setHadoopConfigPath(String hadoopConfigPath) {
+    this.hadoopConfigPath = hadoopConfigPath;
+    return this;
+  }
+
+  public String getHadoopConfigPath() { return hadoopConfigPath; }
 }
diff --git a/ranger-presto-plugin-shim/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControl.java b/ranger-presto-plugin-shim/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControl.java
index e89f646..bfb3a59 100644
--- a/ranger-presto-plugin-shim/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControl.java
+++ b/ranger-presto-plugin-shim/src/main/java/org/apache/ranger/authorization/presto/authorizer/RangerSystemAccessControl.java
@@ -15,15 +15,20 @@ package org.apache.ranger.authorization.presto.authorizer;
 
 import io.prestosql.spi.connector.CatalogSchemaName;
 import io.prestosql.spi.connector.CatalogSchemaTableName;
+import io.prestosql.spi.connector.ColumnMetadata;
 import io.prestosql.spi.connector.SchemaTableName;
-import io.prestosql.spi.security.AccessDeniedException;
-import io.prestosql.spi.security.Identity;
+import io.prestosql.spi.security.PrestoPrincipal;
+import io.prestosql.spi.security.Privilege;
 import io.prestosql.spi.security.SystemAccessControl;
+import io.prestosql.spi.security.SystemSecurityContext;
+import io.prestosql.spi.security.ViewExpression;
+import io.prestosql.spi.type.Type;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
 
 import javax.inject.Inject;
 import java.security.Principal;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -51,6 +56,13 @@ public class RangerSystemAccessControl
         configMap.put("ranger.keytab", config.getKeytab());
         configMap.put("ranger.principal", config.getPrincipal());
       }
+
+      configMap.put("ranger.use_ugi", Boolean.toString(config.isUseUgi()));
+
+      if (config.getHadoopConfigPath() != null) {
+        configMap.put("ranger.hadoop_config", config.getHadoopConfigPath());
+      }
+
       systemAccessControlImpl = cls.getDeclaredConstructor(Map.class).newInstance(configMap);
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -60,311 +72,406 @@ public class RangerSystemAccessControl
   }
 
   @Override
-  public void checkCanSetUser(Optional<Principal> principal, String userName) {
+  public void checkCanSetSystemSessionProperty(SystemSecurityContext context, String propertyName) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanSetUser(principal, userName);
-    } catch (AccessDeniedException e) {
-      deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+      systemAccessControlImpl.checkCanSetSystemSessionProperty(context, propertyName);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denySetUser(principal, userName);
     }
   }
 
   @Override
-  public void checkCanSetSystemSessionProperty(Identity identity, String propertyName) {
+  public void checkCanAccessCatalog(SystemSecurityContext context, String catalogName) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanSetSystemSessionProperty(identity, propertyName);
-    } catch (AccessDeniedException e) {
-      deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+      systemAccessControlImpl.checkCanAccessCatalog(context, catalogName);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denySetSystemSessionProperty(propertyName);
     }
   }
 
   @Override
-  public void checkCanAccessCatalog(Identity identity, String catalogName) {
+  public Set<String> filterCatalogs(SystemSecurityContext context, Set<String> catalogs) {
+    Set<String> filteredCatalogs;
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanAccessCatalog(identity, catalogName);
-    } catch (AccessDeniedException e) {
-      deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+      filteredCatalogs = systemAccessControlImpl.filterCatalogs(context, catalogs);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyCatalogAccess(catalogName);
     }
+    return filteredCatalogs;
   }
 
   @Override
-  public Set<String> filterCatalogs(Identity identity, Set<String> catalogs) {
-    return catalogs;
+  public void checkCanCreateSchema(SystemSecurityContext context, CatalogSchemaName schema) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanCreateSchema(context, schema);
+    } finally {
+      deactivatePluginClassLoader();
+    }
   }
 
   @Override
-  public void checkCanCreateSchema(Identity identity, CatalogSchemaName schema) {
+  public void checkCanDropSchema(SystemSecurityContext context, CatalogSchemaName schema) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanCreateSchema(identity, schema);
-    } catch (AccessDeniedException e) {
-      deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+      systemAccessControlImpl.checkCanDropSchema(context, schema);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyCreateSchema(schema.getSchemaName());
     }
   }
 
   @Override
-  public void checkCanDropSchema(Identity identity, CatalogSchemaName schema) {
+  public void checkCanRenameSchema(SystemSecurityContext context, CatalogSchemaName schema, String newSchemaName) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanDropSchema(identity, schema);
-    } catch (AccessDeniedException e) {
-      deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+      systemAccessControlImpl.checkCanRenameSchema(context, schema, newSchemaName);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyDropSchema(schema.getSchemaName());
     }
   }
 
   @Override
-  public void checkCanRenameSchema(Identity identity, CatalogSchemaName schema, String newSchemaName) {
+  public void checkCanShowSchemas(SystemSecurityContext context, String catalogName) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanRenameSchema(identity, schema, newSchemaName);
-    } catch (AccessDeniedException e) {
-      deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+      systemAccessControlImpl.checkCanShowSchemas(context, catalogName);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyRenameSchema(schema.getSchemaName(), newSchemaName);
     }
   }
 
   @Override
-  public void checkCanShowSchemas(Identity identity, String catalogName) {
+  public Set<String> filterSchemas(SystemSecurityContext context, String catalogName, Set<String> schemaNames) {
+    Set<String> filteredSchemas;
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanShowSchemas(identity, catalogName);
-    } catch (AccessDeniedException e) {
+      filteredSchemas = systemAccessControlImpl.filterSchemas(context, catalogName, schemaNames);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+    return filteredSchemas;
+  }
+
+  @Override
+  public void checkCanCreateTable(SystemSecurityContext context, CatalogSchemaTableName table) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanCreateTable(context, table);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyShowSchemas();
     }
   }
 
   @Override
-  public Set<String> filterSchemas(Identity identity, String catalogName, Set<String> schemaNames) {
-    return schemaNames;
+  public void checkCanDropTable(SystemSecurityContext context, CatalogSchemaTableName table) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanDropTable(context, table);
+    } finally {
+      deactivatePluginClassLoader();
+    }
   }
 
   @Override
-  public void checkCanCreateTable(Identity identity, CatalogSchemaTableName table) {
+  public void checkCanRenameTable(SystemSecurityContext context, CatalogSchemaTableName table, CatalogSchemaTableName newTable) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanCreateTable(identity, table);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanRenameTable(context, table, newTable);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public Set<SchemaTableName> filterTables(SystemSecurityContext context, String catalogName, Set<SchemaTableName> tableNames) {
+    Set<SchemaTableName> filteredTableNames;
+    try {
+      activatePluginClassLoader();
+      filteredTableNames = systemAccessControlImpl.filterTables(context, catalogName, tableNames);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyCreateTable(table.getSchemaTableName().getTableName());
     }
+    return filteredTableNames;
   }
 
   @Override
-  public void checkCanDropTable(Identity identity, CatalogSchemaTableName table) {
+  public void checkCanAddColumn(SystemSecurityContext context, CatalogSchemaTableName table) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanDropTable(identity, table);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanAddColumn(context, table);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public void checkCanDropColumn(SystemSecurityContext context, CatalogSchemaTableName table) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanDropColumn(context, table);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyDropTable(table.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanRenameTable(Identity identity, CatalogSchemaTableName table, CatalogSchemaTableName newTable) {
+  public void checkCanRenameColumn(SystemSecurityContext context, CatalogSchemaTableName table) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanRenameTable(identity, table, newTable);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanRenameColumn(context, table);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public void checkCanSelectFromColumns(SystemSecurityContext context, CatalogSchemaTableName table, Set<String> columns) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanSelectFromColumns(context, table, columns);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyRenameTable(table.getSchemaTableName().getTableName(), newTable.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanShowTablesMetadata(Identity identity, CatalogSchemaName schema) {
+  public void checkCanInsertIntoTable(SystemSecurityContext context, CatalogSchemaTableName table) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanShowTablesMetadata(identity, schema);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanInsertIntoTable(context, table);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public void checkCanDeleteFromTable(SystemSecurityContext context, CatalogSchemaTableName table) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanDeleteFromTable(context, table);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyShowTablesMetadata(schema.getSchemaName());
     }
   }
 
   @Override
-  public Set<SchemaTableName> filterTables(Identity identity, String catalogName, Set<SchemaTableName> tableNames) {
-    return tableNames;
+  public void checkCanCreateView(SystemSecurityContext context, CatalogSchemaTableName view) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanCreateView(context, view);
+    } finally {
+      deactivatePluginClassLoader();
+    }
   }
 
   @Override
-  public void checkCanAddColumn(Identity identity, CatalogSchemaTableName table) {
+  public void checkCanDropView(SystemSecurityContext context, CatalogSchemaTableName view) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanAddColumn(identity, table);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanDropView(context, view);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public void checkCanCreateViewWithSelectFromColumns(SystemSecurityContext context, CatalogSchemaTableName table, Set<String> columns) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanCreateViewWithSelectFromColumns(context, table, columns);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyAddColumn(table.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanDropColumn(Identity identity, CatalogSchemaTableName table) {
+  public void checkCanSetCatalogSessionProperty(SystemSecurityContext context, String catalogName, String propertyName) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanDropColumn(identity, table);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanSetCatalogSessionProperty(context, catalogName, propertyName);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public void checkCanImpersonateUser(SystemSecurityContext context, String userName) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanImpersonateUser(context, userName);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyDropColumn(table.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanRenameColumn(Identity identity, CatalogSchemaTableName table) {
+  public void checkCanExecuteQuery(SystemSecurityContext context) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanRenameColumn(identity, table);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanExecuteQuery(context);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public void checkCanViewQueryOwnedBy(SystemSecurityContext context, String queryOwner) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanViewQueryOwnedBy(context, queryOwner);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyRenameColumn(table.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanSelectFromColumns(Identity identity, CatalogSchemaTableName table, Set<String> columns) {
+  public Set<String> filterViewQueryOwnedBy(SystemSecurityContext context, Set<String> queryOwners) {
+    Set<String> filteredQueryOwners;
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanSelectFromColumns(identity, table, columns);
-    } catch (AccessDeniedException e) {
+      filteredQueryOwners = systemAccessControlImpl.filterViewQueryOwnedBy(context, queryOwners);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+    return filteredQueryOwners;
+  }
+
+  @Override
+  public void checkCanKillQueryOwnedBy(SystemSecurityContext context, String queryOwner) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanKillQueryOwnedBy(context, queryOwner);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denySelectColumns(table.getSchemaTableName().getTableName(), columns);
     }
   }
 
   @Override
-  public void checkCanInsertIntoTable(Identity identity, CatalogSchemaTableName table) {
+  public void checkCanShowCreateTable(SystemSecurityContext context, CatalogSchemaTableName table) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanInsertIntoTable(identity, table);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanShowCreateTable(context, table);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public void checkCanSetTableComment(SystemSecurityContext context, CatalogSchemaTableName table) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanSetTableComment(context, table);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyInsertTable(table.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanDeleteFromTable(Identity identity, CatalogSchemaTableName table) {
+  public void checkCanShowTables(SystemSecurityContext context, CatalogSchemaName schema) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanDeleteFromTable(identity, table);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanShowTables(context, schema);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public void checkCanShowColumns(SystemSecurityContext context, CatalogSchemaTableName table) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanShowColumns(context, table);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyDeleteTable(table.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanCreateView(Identity identity, CatalogSchemaTableName view) {
+  public List<ColumnMetadata> filterColumns(SystemSecurityContext context, CatalogSchemaTableName table, List<ColumnMetadata> columns) {
+    List<ColumnMetadata> filteredColumns;
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanCreateView(identity, view);
-    } catch (AccessDeniedException e) {
+      filteredColumns = systemAccessControlImpl.filterColumns(context, table, columns);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+    return filteredColumns;
+  }
+
+  @Override
+  public void checkCanRenameView(SystemSecurityContext context, CatalogSchemaTableName view, CatalogSchemaTableName newView) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanRenameView(context, view, newView);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyCreateView(view.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanDropView(Identity identity, CatalogSchemaTableName view) {
+  public void checkCanGrantTablePrivilege(SystemSecurityContext context, Privilege privilege, CatalogSchemaTableName table, PrestoPrincipal grantee, boolean withGrantOption) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanDropView(identity, view);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanGrantTablePrivilege(context, privilege, table, grantee, withGrantOption);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public void checkCanRevokeTablePrivilege(SystemSecurityContext context, Privilege privilege, CatalogSchemaTableName table, PrestoPrincipal revokee, boolean grantOptionFor) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanRevokeTablePrivilege(context, privilege, table, revokee, grantOptionFor);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyDropView(view.getSchemaTableName().getTableName());
     }
   }
 
   @Override
-  public void checkCanCreateViewWithSelectFromColumns(Identity identity, CatalogSchemaTableName table, Set<String> columns) {
+  public void checkCanShowRoles(SystemSecurityContext context, String catalogName) {
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanCreateViewWithSelectFromColumns(identity, table, columns);
-    } catch (AccessDeniedException e) {
+      systemAccessControlImpl.checkCanShowRoles(context, catalogName);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+  }
+
+  @Override
+  public Optional<ViewExpression> getRowFilter(SystemSecurityContext context, CatalogSchemaTableName tableName) {
+    Optional<ViewExpression> viewExpression;
+    try {
+      activatePluginClassLoader();
+      viewExpression = systemAccessControlImpl.getRowFilter(context, tableName);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denyCreateViewWithSelect(table.getSchemaTableName().getTableName(), identity);
     }
+    return viewExpression;
   }
 
   @Override
-  public void checkCanSetCatalogSessionProperty(Identity identity, String catalogName, String propertyName) {
+  public Optional<ViewExpression> getColumnMask(SystemSecurityContext context, CatalogSchemaTableName tableName, String columnName, Type type) {
+    Optional<ViewExpression> viewExpression;
     try {
       activatePluginClassLoader();
-      systemAccessControlImpl.checkCanSetCatalogSessionProperty(identity, catalogName, propertyName);
-    } catch (AccessDeniedException e) {
+      viewExpression = systemAccessControlImpl.getColumnMask(context, tableName, columnName, type);
+    } finally {
       deactivatePluginClassLoader();
-      throw e;
-    } catch (Exception e) {
+    }
+    return viewExpression;
+  }
+
+  @Override
+  public void checkCanSetUser(Optional<Principal> principal, String userName) {
+    try {
+      activatePluginClassLoader();
+      systemAccessControlImpl.checkCanSetUser(principal, userName);
+    } finally {
       deactivatePluginClassLoader();
-      AccessDeniedException.denySetCatalogSessionProperty(catalogName, propertyName);
     }
   }