You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2019/09/12 04:31:57 UTC

[impala] branch master updated: IMPALA-8228: Ownership support for Ranger authz

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ced6e98  IMPALA-8228: Ownership support for Ranger authz
ced6e98 is described below

commit ced6e98fb4c361efa4bcc7e5441ccdb8debba8e9
Author: Bharath Vissapragada <bh...@cloudera.com>
AuthorDate: Mon Aug 19 19:54:01 2019 -0700

    IMPALA-8228: Ownership support for Ranger authz
    
    Without this patch, explicit privileges are needed even
    for owners of databases/tables to perform actions on them.
    
    Example: 'user' is the owner of database 'foo'. To create
    a table 't' under 'foo', 'user' needs to be granted a CREATE
    privilege on 'foo'
    
    That is unintuitive from a user POV since users expect owners
    to have ALL privileges on the objects they own. This patch extends
    that support to Impala's ranger authorization plugin.
    
    Ranger natively supports the concept of ownership by letting the
    callers pass the ownership context to RangerAccessResourceImpl.
    This patch plumbs the owner information for the authorizables
    (currently only supported for Tables / Databases) which is then
    evaulated during authorization.
    
    For the ownership based authorization to work, ranger-admin side
    policy on {OWNER} user needs to be defined.
    
    Testing: Added some unit-tests and e-e tests that cover scenarios
    where ownership is used for authorization.
    
    Caveat: Ownership is a part of HMS thrift object. Since we do not
    aggressively load HMS schemas during start-up, coordinators with
    cold caches can result in weird table listings due to lack of
    metadata needed for verifying ownership. This should be fixed
    separately to make the behavior more consistent and user friendly.
    (Added comments in the code wherever necessary along with a test
    to simulate this).
    
    Change-Id: I737b7164a3e7afb9996b3402e6872effd663f7b4
    Reviewed-on: http://gerrit.cloudera.org:8080/14106
    Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
    Tested-by: Bharath Vissapragada <bh...@cloudera.com>
---
 .../java/org/apache/impala/analysis/Analyzer.java  |  80 +++++++---
 .../apache/impala/analysis/CollectionTableRef.java |   4 +-
 .../apache/impala/analysis/CopyTestCaseStmt.java   |   4 +-
 .../apache/impala/analysis/DescribeTableStmt.java  |   6 +-
 .../org/apache/impala/analysis/DropDbStmt.java     |   8 +-
 .../impala/analysis/DropTableOrViewStmt.java       |  12 +-
 .../org/apache/impala/analysis/InsertStmt.java     |   4 +-
 .../apache/impala/analysis/ResetMetadataStmt.java  |  21 ++-
 .../org/apache/impala/analysis/SelectStmt.java     |   2 +-
 .../apache/impala/authorization/Authorizable.java  |  18 ++-
 .../impala/authorization/AuthorizableColumn.java   |  12 +-
 .../impala/authorization/AuthorizableDb.java       |  10 +-
 .../impala/authorization/AuthorizableFactory.java  |  25 +--
 .../impala/authorization/AuthorizableTable.java    |  10 +-
 .../authorization/DefaultAuthorizableFactory.java  |  22 +--
 .../authorization/PrivilegeRequestBuilder.java     |  60 ++++++--
 .../ranger/RangerAuthorizationChecker.java         |   5 +-
 .../ranger/RangerImpalaResourceBuilder.java        |   5 +
 .../sentry/SentryAuthorizableFactory.java          |  17 +-
 .../java/org/apache/impala/catalog/BuiltinsDb.java |   3 +
 fe/src/main/java/org/apache/impala/catalog/Db.java |   6 +
 .../main/java/org/apache/impala/catalog/FeDb.java  |   5 +
 .../java/org/apache/impala/catalog/FeTable.java    |   6 +
 .../main/java/org/apache/impala/catalog/Table.java |   6 +
 .../org/apache/impala/catalog/local/LocalDb.java   |   6 +
 .../apache/impala/catalog/local/LocalTable.java    |   6 +
 .../java/org/apache/impala/service/Frontend.java   |  35 +++--
 .../authorization/AuthorizationStmtTest.java       | 171 +++++++++++++++++++++
 .../authorization/AuthorizationTestBase.java       |   5 +-
 tests/authorization/test_ranger.py                 |  97 ++++++++++++
 30 files changed, 570 insertions(+), 101 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 4f23fa0..1ef542b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -719,7 +719,8 @@ public class Analyzer {
       // table/database if the user is not authorized.
       if (rawPath.size() > 1) {
         registerPrivReq(builder -> {
-          builder.onTable(rawPath.get(0), rawPath.get(1)).allOf(tableRef.getPrivilege());
+          builder.onTableUnknownOwner(
+              rawPath.get(0), rawPath.get(1)).allOf(tableRef.getPrivilege());
           if (tableRef.requireGrantOption()) {
             builder.grantOption();
           }
@@ -728,7 +729,8 @@ public class Analyzer {
       }
 
       registerPrivReq(builder -> {
-        builder.onTable(getDefaultDb(), rawPath.get(0)).allOf(tableRef.getPrivilege());
+        builder.onTableUnknownOwner(
+            getDefaultDb(), rawPath.get(0)).allOf(tableRef.getPrivilege());
         if (tableRef.requireGrantOption()) {
           builder.grantOption();
         }
@@ -1124,7 +1126,7 @@ public class Analyzer {
         registerPrivReq(builder -> builder
             .allOf(Privilege.SELECT)
             .onColumn(tupleDesc.getTableName().getDb(), tupleDesc.getTableName().getTbl(),
-                column.getName()).build());
+                column.getName(), tupleDesc.getTable().getOwnerUser()).build());
       }
     }
   }
@@ -2651,6 +2653,16 @@ public class Analyzer {
   }
 
   /**
+   * Returns the table by looking it up in the local Catalog. Returns null if the db/table
+   * does not exist. Does *not* force-load the table.
+   */
+  public FeTable getTableNoThrow(String dbName, String tableName) {
+    FeDb db = getCatalog().getDb(dbName);
+    if (db == null) return null;
+    return db.getTableIfCached(tableName);
+  }
+
+  /**
    * Checks if a table exists without registering privileges.
    */
   public boolean tableExists(TableName tblName) {
@@ -2686,20 +2698,25 @@ public class Analyzer {
     Preconditions.checkNotNull(tableName);
     Preconditions.checkNotNull(privilege);
     TableName fqTableName = getFqTableName(tableName);
+    // Get the ownership information if the table exists. We do not want it to throw
+    // without registering privileges.
+    FeTable table = getTableNoThrow(fqTableName.getDb(), fqTableName.getTbl());
+    final String tableOwner = table == null ? null : table.getOwnerUser();
     for (Privilege priv : privilege) {
       if (priv == Privilege.ANY || addColumnPrivilege) {
         registerPrivReq(builder ->
             builder.allOf(priv)
-                .onAnyColumn(fqTableName.getDb(), fqTableName.getTbl())
+                .onAnyColumn(fqTableName.getDb(), fqTableName.getTbl(), tableOwner)
                 .build());
       } else {
         registerPrivReq(builder ->
             builder.allOf(priv)
-                .onTable(fqTableName.getDb(), fqTableName.getTbl())
+                .onTable(fqTableName.getDb(), fqTableName.getTbl(), tableOwner)
                 .build());
       }
     }
-    FeTable table = getTable(fqTableName.getDb(), fqTableName.getTbl());
+    // Propagate the AnalysisException if the table/db does not exist.
+    table = getTable(fqTableName.getDb(), fqTableName.getTbl());
     Preconditions.checkNotNull(table);
     if (addAccessEvent) {
       // Add an audit event for this access
@@ -2773,19 +2790,26 @@ public class Analyzer {
    */
   public FeDb getDb(String dbName, Privilege privilege, boolean throwIfDoesNotExist,
       boolean requireGrantOption) throws AnalysisException {
+    // Do not throw until the privileges are registered.
+    FeDb db = getDb(dbName, /*throwIfDoesNotExist*/ false);
     registerPrivReq(builder -> {
       if (requireGrantOption) {
         builder.grantOption();
       }
-      return privilege == Privilege.ANY ?
-          builder.any().onAnyColumn(dbName).build() :
-          builder.allOf(privilege).onDb(dbName).build();
+      if (privilege == Privilege.ANY) {
+        String dbOwner = db == null ? null : db.getOwnerUser();
+        return builder.any().onAnyColumn(dbName, dbOwner).build();
+      } else if (db == null) {
+        // Db does not exist, register a privilege request based on the DB name.
+        return builder.allOf(privilege).onDb(dbName, null).build();
+      }
+      return builder.allOf(privilege).onDb(db).build();
     });
-
-    FeDb db = getDb(dbName, throwIfDoesNotExist);
+    // Propagate the exception if needed.
+    FeDb retDb = getDb(dbName, throwIfDoesNotExist);
     globalState_.accessEvents.add(new TAccessEvent(
         dbName, TCatalogObjectType.DATABASE, privilege.toString()));
-    return db;
+    return retDb;
   }
 
   /**
@@ -2810,16 +2834,35 @@ public class Analyzer {
    */
   public boolean dbContainsTable(String dbName, String tableName, Privilege privilege)
       throws AnalysisException {
-    registerPrivReq(builder ->
-        builder.allOf(privilege)
-            .onTable(dbName, tableName)
-            .build());
     try {
       FeDb db = getCatalog().getDb(dbName);
+      FeTable table = db == null ? null: db.getTable(tableName);
+      if (table != null) {
+        // Table exists, register the privilege and pass the right ownership information.
+        // Table owners are expected to have ALL privileges on the table object.
+        registerPrivReq(builder ->
+            builder.allOf(privilege)
+                .onTable(table)
+                .build());
+      } else if (privilege == Privilege.CREATE) {
+        // Table does not exist and hence the owner information cannot be deduced.
+        // For creating something under this db, we translate the db ownership into having
+        // CREATE privilege on tables under it.
+        String dbOwnerUser = db == null? null : db.getOwnerUser();
+        registerPrivReq(builder ->
+          builder.allOf(privilege)
+              .onTable(dbName, tableName, dbOwnerUser)
+              .build());
+      } else {
+        // All non-CREATE privileges are checked directly on the table object.
+        Preconditions.checkState(table == null && privilege != Privilege.CREATE);
+        registerPrivReq(builder ->
+          builder.allOf(privilege).onTableUnknownOwner(dbName, tableName).build());
+      }
       if (db == null) {
         throw new DatabaseNotFoundException("Database not found: " + dbName);
       }
-      return db.containsTable(tableName);
+      return table != null;
     } catch (DatabaseNotFoundException e) {
       throw new AnalysisException(DB_DOES_NOT_EXIST_ERROR_MSG + dbName);
     }
@@ -2954,9 +2997,8 @@ public class Analyzer {
           priv.toString()));
     }
     // Add privilege request.
-    TableName tableName = table.getTableName();
     registerPrivReq(builder -> {
-      builder.onTable(tableName.getDb(), tableName.getTbl()).allOf(priv);
+      builder.onTable(table).allOf(priv);
       if (requireGrantOption) {
         builder.grantOption();
       }
diff --git a/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java b/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java
index 9965643..47564a6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java
@@ -107,8 +107,8 @@ public class CollectionTableRef extends TableRef {
       analyzer.registerPrivReq(builder ->
           builder.allOf(Privilege.SELECT)
               .onColumn(desc_.getTableName().getDb(), desc_.getTableName().getTbl(),
-                  desc_.getPath().getRawPath().get(0))
-              .build());
+                  desc_.getPath().getRawPath().get(0),
+                  resolvedPath_.getRootTable().getOwnerUser()).build());
     }
     isAnalyzed_ = true;
     analyzeTableSample(analyzer);
diff --git a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
index d961f95..69adb81 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CopyTestCaseStmt.java
@@ -119,11 +119,11 @@ public class CopyTestCaseStmt extends StatementBase {
       Pair<Set<FeDb>, Set<FeTable>> referencedObjects = getReferencedCatalogObjects();
       for (FeDb db: referencedObjects.first) {
         analyzer.registerPrivReq(builder ->
-            builder.onDb(db.getName()).allOf(Privilege.VIEW_METADATA).build());
+            builder.onDb(db).allOf(Privilege.VIEW_METADATA).build());
       }
       for (FeTable table: referencedObjects.second) {
         analyzer.registerPrivReq(builder ->
-            builder.onTable(table.getDb().getName(), table.getName())
+            builder.onTable(table)
                 .allOf(Privilege.VIEW_METADATA)
                 .build());
       }
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java
index 69a7fdf..ec99f6f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java
@@ -99,12 +99,12 @@ public class DescribeTableStmt extends StatementBase {
       // table/database if the user is not authorized.
       if (rawPath_.size() > 1) {
         analyzer.registerPrivReq(builder ->
-            builder.onTable(rawPath_.get(0), rawPath_.get(1))
+            builder.onTableUnknownOwner(rawPath_.get(0), rawPath_.get(1))
                 .any()
                 .build());
       }
       analyzer.registerPrivReq(builder ->
-          builder.onTable(analyzer.getDefaultDb(), rawPath_.get(0))
+          builder.onTableUnknownOwner(analyzer.getDefaultDb(), rawPath_.get(0))
               .any()
               .build());
       throw ae;
@@ -131,7 +131,7 @@ public class DescribeTableStmt extends StatementBase {
     analyzer.registerPrivReq(builder ->
         builder.onColumn(path_.getRootTable().getDb().getName(),
             path_.getRootTable().getName(),
-            path_.getRawPath().get(0))
+            path_.getRawPath().get(0), path_.getRootTable().getOwnerUser())
             .any()
             .build());
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java
index 386579f..7efd00c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropDbStmt.java
@@ -71,16 +71,20 @@ public class DropDbStmt extends StatementBase {
     // Set the servername here if authorization is enabled because analyzer_ is not
     // available in the toThrift() method.
     serverName_ = analyzer.getServerName();
+    // Fetch the owner information if the db exists.
+    FeDb db = analyzer.getDb(dbName_, /*ThrowIfNotExists*/ false);
+    String ownerUser = db == null ? null : db.getOwnerUser();
     if (ifExists_) {
       // Start with ANY privilege in case of IF EXISTS, and register DROP privilege
       // later only if the database exists. See IMPALA-8851 for more explanation.
       analyzer.registerPrivReq(builder ->
           builder.allOf(Privilege.ANY)
-              .onDb(dbName_)
+              .onDb(dbName_, ownerUser)
               .build());
       if (!analyzer.dbExists(dbName_)) return;
     }
-    FeDb db = analyzer.getDb(dbName_, Privilege.DROP, false, false);
+    // Register the DROP privilege request.
+    db = analyzer.getDb(dbName_, Privilege.DROP, false, false);
     if (db == null && !ifExists_) {
       throw new AnalysisException(Analyzer.DB_DOES_NOT_EXIST_ERROR_MSG + dbName_);
     }
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
index cf80569..2d48bec 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
@@ -107,16 +107,20 @@ public class DropTableOrViewStmt extends StatementBase {
     // available in the toThrift() method.
     serverName_ = analyzer.getServerName();
     try {
+      // Fetch the table owner information, without registering any privileges.
+      FeTable table = analyzer.getTableNoThrow(dbName_, tableName_.getTbl());
+      String tblOwnerUser = table == null ? null : table.getOwnerUser();
       if (ifExists_) {
         // Start with ANY privilege in case of IF EXISTS, and register DROP privilege
         // later only if the table exists. See IMPALA-8851 for more explanation.
         analyzer.registerPrivReq(builder ->
             builder.allOf(Privilege.ANY)
-                .onTable(dbName_, getTbl())
-                .build());
-        if (!analyzer.tableExists(tableName_)) return;
+            .onTable(dbName_, getTbl(), tblOwnerUser)
+            .build());
+        if (table == null) return;
       }
-      FeTable table = analyzer.getTable(tableName_, /* add access event */ true,
+      // Register the DROP privilege on the table.
+      table = analyzer.getTable(tableName_, /* add access event */ true,
           /* add column-level privilege */ false, Privilege.DROP);
       Preconditions.checkNotNull(table);
       if (table instanceof FeView && dropTable_) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index a5cc510..13d83c2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -425,9 +425,7 @@ public class InsertStmt extends StatementBase {
     } else {
       targetTableName_ = new TableName(table_.getDb().getName(), table_.getName());
       analyzer.registerPrivReq(builder ->
-          builder.onTable(table_.getDb().getName(), table_.getName())
-              .allOf(privilegeRequired)
-              .build());
+          builder.onTable(table_).allOf(privilegeRequired).build());
     }
 
     // We do not support (in|up)serting into views.
diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index 4200dd8..9a76174 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -22,6 +22,7 @@ import java.util.List;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
@@ -44,6 +45,7 @@ import com.google.common.base.Preconditions;
  * REFRESH AUTHORIZATION
  */
 public class ResetMetadataStmt extends StatementBase {
+
   public enum Action {
     INVALIDATE_METADATA_ALL(false),
     INVALIDATE_METADATA_TABLE(false),
@@ -125,7 +127,6 @@ public class ResetMetadataStmt extends StatementBase {
 
   @Override
   public void collectTableRefs(List<TableRef> tblRefs) {
-    // Only need table metadata for REFRESH <tbl> PARTITION (<partition>)
     if (tableName_ != null && partitionSpec_ != null) {
       tblRefs.add(new TableRef(tableName_.toPath(), null));
     }
@@ -168,11 +169,17 @@ public class ResetMetadataStmt extends StatementBase {
             partitionSpec_.analyze(analyzer);
           }
         } else {
+          FeTable tbl = analyzer.getTableNoThrow(dbName, tableName_.getTbl());
           // Verify the user has privileges to access this table.
-          analyzer.registerPrivReq(builder ->
-              builder.onTable(dbName, tableName_.getTbl())
-                  .allOf(Privilege.REFRESH)
-                  .build());
+          if (tbl == null) {
+            analyzer.registerPrivReq(builder ->
+                builder.onTableUnknownOwner(
+                  dbName, tableName_.getTbl()).allOf(Privilege.REFRESH).build());
+          } else {
+            analyzer.registerPrivReq(
+                builder -> builder.onTable(dbName, tableName_.getTbl(),
+                  tbl.getOwnerUser()).allOf(Privilege.REFRESH).build());
+          }
         }
         break;
       case REFRESH_AUTHORIZATION:
@@ -186,8 +193,10 @@ public class ResetMetadataStmt extends StatementBase {
                 .build());
         break;
       case REFRESH_FUNCTIONS:
+        FeDb db = analyzer.getDb(database_, /*throwIfDoesNotExist*/ false);
+        String dbOwner = db == null ? null : db.getOwnerUser();
         analyzer.registerPrivReq(builder ->
-            builder.onDb(database_)
+            builder.onDb(database_, dbOwner)
                 .allOf(Privilege.REFRESH)
                 .build());
         break;
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 580929e..26a9422 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -580,7 +580,7 @@ public class SelectStmt extends QueryStmt {
             analyzer_.registerPrivReq(builder -> builder
                 .allOf(Privilege.SELECT)
                 .onColumn(view.getDb().getName(), view.getName(),
-                    slotRef.getDesc().getLabel())
+                    slotRef.getDesc().getLabel(), view.getOwnerUser())
                 .build());
           }
         }
diff --git a/fe/src/main/java/org/apache/impala/authorization/Authorizable.java b/fe/src/main/java/org/apache/impala/authorization/Authorizable.java
index 1e35ef6..75f437d 100644
--- a/fe/src/main/java/org/apache/impala/authorization/Authorizable.java
+++ b/fe/src/main/java/org/apache/impala/authorization/Authorizable.java
@@ -17,8 +17,10 @@
 
 package org.apache.impala.authorization;
 
+import java.util.Objects;
 import java.util.List;
 
+import com.google.common.base.Strings;
 /*
  * Abstract class representing an authorizable object (Table, Db, Column, etc).
  */
@@ -53,13 +55,25 @@ public abstract class Authorizable {
   // Returns the function name if applicable, null otherwise.
   public String getFnName() { return null; }
 
+  // Returns the owner for this authorizable if applicable, null otherwise.
+  // Currently, ownership is applicable only for database and table objects.
+  // Only used by RangerAuthorizationChecker.
+  public String getOwnerUser() { return null; }
+
   @Override
-  public int hashCode() { return getName().hashCode(); }
+  public int hashCode() {
+    int ownerHash = getOwnerUser() == null ? 0 : getOwnerUser().hashCode();
+    int nameHash = getName().hashCode();
+    return nameHash * 31 + ownerHash;
+  }
 
   @Override
   public boolean equals(Object o) {
     if (o == null) return false;
     if (o.getClass() != this.getClass()) return false;
-    return ((Authorizable) o).getName().equals(this.getName());
+    Authorizable temp = (Authorizable) o;
+    return temp.getName().equals(this.getName())
+        && Strings.nullToEmpty(temp.getOwnerUser()).equals(
+            Strings.nullToEmpty(this.getOwnerUser()));
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizableColumn.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizableColumn.java
index 2f7abd2..e0a2bdc 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizableColumn.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizableColumn.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.authorization;
 
+import javax.annotation.Nullable;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
@@ -27,14 +29,19 @@ public class AuthorizableColumn extends Authorizable {
   private final String dbName_;
   private final String tableName_;
   private final String columnName_;
+  // Owner for the parent db or table that this Authorizable corresponds to.
+  @Nullable
+  private final String ownerUser_;
 
-  public AuthorizableColumn(String dbName, String tableName, String columnName) {
+  public AuthorizableColumn(
+      String dbName, String tableName, String columnName, @Nullable String ownerUser) {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(dbName));
     Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName));
     Preconditions.checkArgument(!Strings.isNullOrEmpty(columnName));
     dbName_ = dbName;
     tableName_ = tableName;
     columnName_ = columnName;
+    ownerUser_ = ownerUser;
   }
 
   @Override
@@ -54,4 +61,7 @@ public class AuthorizableColumn extends Authorizable {
 
   @Override
   public String getColumnName() { return columnName_; }
+
+  @Override
+  public String getOwnerUser() { return ownerUser_; }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizableDb.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizableDb.java
index 25de969..ca464e9 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizableDb.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizableDb.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.authorization;
 
+import javax.annotation.Nullable;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
@@ -25,10 +27,13 @@ import com.google.common.base.Strings;
  */
 public class AuthorizableDb extends Authorizable {
   private final String dbName_;
+  @Nullable // Owner can be null if not set.
+  private final String ownerUser_;
 
-  public AuthorizableDb(String dbName) {
+  public AuthorizableDb(String dbName, String ownerUser) {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(dbName));
     dbName_ = dbName;
+    ownerUser_ = ownerUser;
   }
 
   @Override
@@ -39,4 +44,7 @@ public class AuthorizableDb extends Authorizable {
 
   @Override
   public Type getType() { return Authorizable.Type.DB; }
+
+  @Override
+  public String getOwnerUser() { return ownerUser_; }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizableFactory.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizableFactory.java
index 906cbb4..89eab19 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizableFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizableFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.authorization;
 
+import javax.annotation.Nullable;
+
 /**
  * An interface to create a factory class for creating instances of
  * {@link Authorizable}s.
@@ -29,33 +31,36 @@ public interface AuthorizableFactory {
   Authorizable newServer(String serverName);
 
   /**
-   * Creates a new instance of database {@link Authorizable} for a given database name.
+   * Creates a new instance of database {@link Authorizable} for a given database name
+   * owned by a given user.
    */
-  Authorizable newDatabase(String dbName);
+  Authorizable newDatabase(String dbName, @Nullable String ownerUser);
 
   /**
    * Creates a new instance of table {@link Authorizable} for given database and table
-   * names.
+   * names owned by a given user.
    */
-  Authorizable newTable(String dbName, String tableName);
+  Authorizable newTable(String dbName, String tableName, @Nullable String ownerUser);
 
   /**
    * Creates a new instance of column {@link Authorizable} for a given database name and
-   * gives access to all tables and columns.
+   * it's owner and gives access to all tables and columns.
    */
-  Authorizable newColumn(String dbName);
+  Authorizable newColumnAllTbls(String dbName, @Nullable String dbOwnerUser);
 
   /**
    * Creates a new instance of column {@link Authorizable} for given database and table
-   * names and gives access to all columns.
+   * name and it's owner and gives access to all columns.
    */
-  Authorizable newColumn(String dbName, String tableName);
+  Authorizable newColumnInTable(
+      String dbName, String tableName, @Nullable String tblOwnerUser);
 
   /**
    * Creates a new instance of column {@link Authorizable} for given database, table, and
-   * column names.
+   * column names. 'tblOwnerUser' is the owner of the table that has this column.
    */
-  Authorizable newColumn(String dbName, String tableName, String columnName);
+  Authorizable newColumnInTable(
+      String dbName, String tableName, String columnName, @Nullable String tblOwnerUser);
 
   /**
    * Creates a new instance of URI {@link Authorizable} for a given URI.
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizableTable.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizableTable.java
index d837013..3cfc8ad 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizableTable.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizableTable.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.authorization;
 
+import javax.annotation.Nullable;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
@@ -26,12 +28,15 @@ import com.google.common.base.Strings;
 public class AuthorizableTable extends Authorizable {
   private final String dbName_;
   private final String tableName_;
+  @Nullable // Is null if the owner is not set.
+  private final String ownerUser_;
 
-  public AuthorizableTable(String dbName, String tableName) {
+  public AuthorizableTable(String dbName, String tableName, @Nullable String ownerUser) {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(dbName));
     Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName));
     dbName_ = dbName;
     tableName_ = tableName;
+    ownerUser_ = ownerUser;
   }
 
   @Override
@@ -48,4 +53,7 @@ public class AuthorizableTable extends Authorizable {
 
   @Override
   public String getFullTableName() { return getName(); }
+
+  @Override
+  public String getOwnerUser() { return ownerUser_; }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/DefaultAuthorizableFactory.java b/fe/src/main/java/org/apache/impala/authorization/DefaultAuthorizableFactory.java
index b5855e3..94afacc 100644
--- a/fe/src/main/java/org/apache/impala/authorization/DefaultAuthorizableFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/DefaultAuthorizableFactory.java
@@ -31,37 +31,39 @@ public class DefaultAuthorizableFactory implements AuthorizableFactory {
   }
 
   @Override
-  public Authorizable newDatabase(String dbName) {
+  public Authorizable newDatabase(String dbName, String ownerUser) {
     Preconditions.checkNotNull(dbName);
-    return new AuthorizableDb(dbName);
+    return new AuthorizableDb(dbName, ownerUser);
   }
 
   @Override
-  public Authorizable newTable(String dbName, String tableName) {
+  public Authorizable newTable(String dbName, String tableName, String ownerUser) {
     Preconditions.checkNotNull(dbName);
     Preconditions.checkNotNull(tableName);
-    return new AuthorizableTable(dbName, tableName);
+    return new AuthorizableTable(dbName, tableName, ownerUser);
   }
 
   @Override
-  public Authorizable newColumn(String dbName) {
+  public Authorizable newColumnAllTbls(String dbName, String dbOwnerUser) {
     Preconditions.checkNotNull(dbName);
-    return new AuthorizableColumn(dbName, ALL, ALL);
+    return new AuthorizableColumn(dbName, ALL, ALL, dbOwnerUser);
   }
 
   @Override
-  public Authorizable newColumn(String dbName, String tableName) {
+  public Authorizable newColumnInTable(
+      String dbName, String tableName, String tblOwnerUser) {
     Preconditions.checkNotNull(dbName);
     Preconditions.checkNotNull(tableName);
-    return new AuthorizableColumn(dbName, tableName, ALL);
+    return new AuthorizableColumn(dbName, tableName, ALL, tblOwnerUser);
   }
 
   @Override
-  public Authorizable newColumn(String dbName, String tableName, String columnName) {
+  public Authorizable newColumnInTable(
+      String dbName, String tableName, String columnName, String tblOwnerUser) {
     Preconditions.checkNotNull(dbName);
     Preconditions.checkNotNull(tableName);
     Preconditions.checkNotNull(columnName);
-    return new AuthorizableColumn(dbName, tableName, columnName);
+    return new AuthorizableColumn(dbName, tableName, columnName, tblOwnerUser);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequestBuilder.java b/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequestBuilder.java
index 4cc39cd..f87518d 100644
--- a/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequestBuilder.java
+++ b/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequestBuilder.java
@@ -18,6 +18,8 @@
 package org.apache.impala.authorization;
 
 import com.google.common.base.Preconditions;
+import org.apache.impala.catalog.FeDb;
+import org.apache.impala.catalog.FeTable;
 
 /**
  * Class that helps build PrivilegeRequest objects.
@@ -57,24 +59,33 @@ public class PrivilegeRequestBuilder {
   }
 
   /**
-   * Sets the authorizable object to be a column.
+   * Sets the authorizable object to be a table.
    */
-  public PrivilegeRequestBuilder onColumn(String dbName, String tableName,
-      String columnName) {
-    Preconditions.checkState(authorizable_ == null);
-    authorizable_ = authzFactory_.newColumn(dbName, tableName, columnName);
-    return this;
+  public PrivilegeRequestBuilder onTable(FeTable table) {
+    Preconditions.checkNotNull(table);
+    String dbName = Preconditions.checkNotNull(table.getTableName().getDb());
+    String tblName = Preconditions.checkNotNull(table.getTableName().getTbl());
+    return onTable(dbName, tblName, table.getOwnerUser());
   }
 
   /**
    * Sets the authorizable object to be a table.
    */
-  public PrivilegeRequestBuilder onTable(String dbName, String tableName) {
+  public PrivilegeRequestBuilder onTable(
+      String dbName, String tableName, String ownerUser) {
     Preconditions.checkState(authorizable_ == null);
-    authorizable_ = authzFactory_.newTable(dbName, tableName);
+    authorizable_ = authzFactory_.newTable(dbName, tableName, ownerUser);
     return this;
   }
 
+  public PrivilegeRequestBuilder onTableUnknownOwner(String dbName, String tableName) {
+    // Useful when owner cannot be determined because the table does not exist.
+    // This call path is specifically meant for cases that try to mask the
+    // TableNotFound AnalysisExceptions and instead propagate that as an
+    // AuthorizationException.
+    return onTable(dbName, tableName, null);
+  }
+
   /**
    * Sets the authorizable object to be a server.
    */
@@ -87,27 +98,48 @@ public class PrivilegeRequestBuilder {
   /**
    * Sets the authorizable object to be a database.
    */
-  public PrivilegeRequestBuilder onDb(String dbName) {
+  public PrivilegeRequestBuilder onDb(FeDb db) {
+    Preconditions.checkState(authorizable_ == null);
+    Preconditions.checkNotNull(db);
+    return onDb(db.getName(), db.getMetaStoreDb().getOwnerName());
+  }
+
+  /**
+   * Sets the authorizable object to be a database.
+   */
+  public PrivilegeRequestBuilder onDb(String dbName, String ownerUser) {
+    Preconditions.checkState(authorizable_ == null);
+    authorizable_ = authzFactory_.newDatabase(dbName, ownerUser);
+    return this;
+  }
+
+  /**
+   * Sets the authorizable object to be a column.
+   */
+  public PrivilegeRequestBuilder onColumn(String dbName, String tableName,
+      String columnName, String tblOwnerUser) {
     Preconditions.checkState(authorizable_ == null);
-    authorizable_ = authzFactory_.newDatabase(dbName);
+    authorizable_ =
+        authzFactory_.newColumnInTable(dbName, tableName, columnName, tblOwnerUser);
     return this;
   }
 
   /**
    * Specifies that permissions on any column in the given table.
    */
-  public PrivilegeRequestBuilder onAnyColumn(String dbName, String tableName) {
+  public PrivilegeRequestBuilder onAnyColumn(
+      String dbName, String tableName, String tblOwnerUser) {
     Preconditions.checkState(authorizable_ == null);
-    authorizable_ = authzFactory_.newColumn(dbName, tableName);
+    authorizable_ = authzFactory_.newColumnInTable(dbName, tableName, tblOwnerUser);
     return this;
   }
 
   /**
    * Specifies that permissions on any column in any table.
    */
-  public PrivilegeRequestBuilder onAnyColumn(String dbName) {
+  public PrivilegeRequestBuilder onAnyColumn(String dbName, String dbOwnerUser) {
     Preconditions.checkState(authorizable_ == null);
-    authorizable_ = authzFactory_.newColumn(dbName);
+    authorizable_ = authzFactory_.newColumnAllTbls(dbName, dbOwnerUser);
     return this;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
index 9a4e67d..5cffa02 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
@@ -100,18 +100,20 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
       case DB:
         resources.add(new RangerImpalaResourceBuilder()
             .database(authorizable.getDbName())
+            .owner(authorizable.getOwnerUser())
             .build());
         break;
       case TABLE:
         resources.add(new RangerImpalaResourceBuilder()
             .database(authorizable.getDbName())
             .table(authorizable.getTableName())
+            .owner(authorizable.getOwnerUser())
             .build());
         break;
       case COLUMN:
         RangerImpalaResourceBuilder builder = new RangerImpalaResourceBuilder();
         builder.database(authorizable.getDbName());
-        // * in Ranger means "all". For example to check access for all columns, we need
+        // * in Ranger means "all". For example, to check access for all columns, we need
         // to create a request, such as:
         // [server=server1, database=foo, table=bar, column=*]
         //
@@ -131,6 +133,7 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
           !DefaultAuthorizableFactory.ALL.equals(authorizable.getColumnName())) {
           builder.column(authorizable.getColumnName());
         }
+        builder.owner(authorizable.getOwnerUser());
         resources.add(builder.build());
         break;
       case FUNCTION:
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerImpalaResourceBuilder.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerImpalaResourceBuilder.java
index e9dbdf5..97170dc 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerImpalaResourceBuilder.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerImpalaResourceBuilder.java
@@ -58,5 +58,10 @@ public class RangerImpalaResourceBuilder {
     return this;
   }
 
+  public RangerImpalaResourceBuilder owner(String ownerUser) {
+    rangerAccessResource.setOwnerUser(ownerUser);
+    return this;
+  }
+
   public RangerAccessResourceImpl build() { return rangerAccessResource; }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizableFactory.java b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizableFactory.java
index 032236c..d3d46ea 100644
--- a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizableFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizableFactory.java
@@ -31,33 +31,40 @@ public class SentryAuthorizableFactory implements AuthorizableFactory {
   }
 
   @Override
-  public Authorizable newDatabase(String dbName) {
+  public Authorizable newDatabase(String dbName, String ownerUser) {
+    // Sentry works with OWNER privilege. Hence ownerUser is ignored.
     Preconditions.checkNotNull(dbName);
     return new SentryAuthorizableDb(dbName);
   }
 
   @Override
-  public Authorizable newTable(String dbName, String tableName) {
+  public Authorizable newTable(String dbName, String tableName, String ownerUser) {
+    // Sentry works with OWNER privilege. Hence ownerUser is ignored.
     Preconditions.checkNotNull(dbName);
     Preconditions.checkNotNull(tableName);
     return new SentryAuthorizableTable(dbName, tableName);
   }
 
   @Override
-  public Authorizable newColumn(String dbName) {
+  public Authorizable newColumnAllTbls(String dbName, String dbOwnerUser) {
+    // Sentry works with OWNER privilege. Hence ownerUser is ignored.
     Preconditions.checkNotNull(dbName);
     return new SentryAuthorizableColumn(dbName);
   }
 
   @Override
-  public Authorizable newColumn(String dbName, String tableName) {
+  public Authorizable newColumnInTable(
+      String dbName, String tableName, String dbOwnerUser) {
+    // Sentry works with OWNER privilege. Hence ownerUser is ignored.
     Preconditions.checkNotNull(dbName);
     Preconditions.checkNotNull(tableName);
     return new SentryAuthorizableColumn(dbName, tableName);
   }
 
   @Override
-  public Authorizable newColumn(String dbName, String tableName, String columnName) {
+  public Authorizable newColumnInTable(
+      String dbName, String tableName, String columnName, String tblOwnerUser) {
+    // Sentry works with OWNER privilege. Hence ownerUser is ignored.
     Preconditions.checkNotNull(dbName);
     Preconditions.checkNotNull(tableName);
     Preconditions.checkNotNull(columnName);
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 414662d..08589f3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -99,6 +99,9 @@ public class BuiltinsDb extends Db {
         BUILTINS_DB_COMMENT, "", Collections.<String,String>emptyMap());
   }
 
+  @Override // FeDb
+  public String getOwnerUser() { return null; }
+
   private static final Map<Type, String> SAMPLE_INIT_SYMBOL =
       ImmutableMap.<Type, String>builder()
         .put(Type.BOOLEAN,
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 6eeca2a..455e0f0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -537,4 +537,10 @@ public class Db extends CatalogObjectImpl implements FeDb {
     versionsForInflightEvents_.add(versionNumber);
     return true;
   }
+
+  @Override // FeDb
+  public String getOwnerUser() {
+    org.apache.hadoop.hive.metastore.api.Database db = getMetaStoreDb();
+    return db == null ? null : db.getOwnerName();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeDb.java b/fe/src/main/java/org/apache/impala/catalog/FeDb.java
index f80dc47..f927a46 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeDb.java
@@ -128,4 +128,9 @@ public interface FeDb extends HasName {
    * Create a target FS table object for CTAS.
    */
   FeFsTable createFsCtasTarget(Table msTbl) throws CatalogException;
+
+  /**
+   * @return the owner user for this database. Returns null if one does not exist.
+   */
+  String getOwnerUser();
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeTable.java b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
index ca56ca7..8ac4186 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
@@ -146,4 +146,10 @@ public interface FeTable {
    */
   String getValidWriteIds();
 
+  /**
+   * @return the owner user for this table. If the table is not loaded or the owner is
+   * missing returns null.
+   */
+  String getOwnerUser();
+
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 24a6b95..7683381 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -587,6 +587,12 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
     return msTable_;
   }
 
+  @Override // FeTable
+  public String getOwnerUser() {
+    if (msTable_ == null) return null;
+    return msTable_.getOwner();
+  }
+
   public void setMetaStoreTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
     msTable_ = msTbl;
     CatalogInterners.internFieldsInPlace(msTable_);
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
index f14e255..4925f56 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
@@ -287,4 +287,10 @@ class LocalDb implements FeDb {
   LocalCatalog getCatalog() {
     return catalog_;
   }
+
+  @Override // FeDb
+  public String getOwnerUser() {
+    Database db = getMetaStoreDb();
+    return db == null? null : db.getOwnerName();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index 907ab03..cecd46c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -170,6 +170,12 @@ abstract class LocalTable implements FeTable {
   }
 
   @Override
+  public String getOwnerUser() {
+    if (msTable_ == null) return null;
+    return msTable_.getOwner();
+  }
+
+  @Override
   public String getStorageHandlerClassName() {
     // Subclasses should override as appropriate.
     return null;
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index dbd51e7..29104f5 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -770,14 +770,28 @@ public class Frontend {
 
   private List<String> doGetTableNames(String dbName, PatternMatcher matcher,
       User user) throws ImpalaException {
-    List<String> tblNames = getCatalog().getTableNames(dbName, matcher);
+    FeCatalog catalog = getCatalog();
+    List<String> tblNames = catalog.getTableNames(dbName, matcher);
     if (authzFactory_.getAuthorizationConfig().isEnabled()) {
       Iterator<String> iter = tblNames.iterator();
       while (iter.hasNext()) {
         String tblName = iter.next();
+        // Get the owner information. Do not force load the table, only get it
+        // from cache, if it is already loaded. This means that we cannot access
+        // ownership information for unloaded tables and they will not be listed
+        // here. This might result in situations like 'show tables' not listing
+        // 'owned' tables for a given user just because the metadata is not loaded.
+        // TODO(IMPALA-8937): Figure out a way to load Table/Database ownership
+        // information when fetching the table lists from HMS.
+        FeTable table = catalog.getTableIfCached(dbName, tblName);
+        String tableOwner = table.getOwnerUser();
+        if (tableOwner == null) {
+          LOG.info("Table {} not yet loaded, ignoring it in table listing.",
+              dbName + "." + tblName);
+        }
         PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder(
             authzFactory_.getAuthorizableFactory())
-            .any().onAnyColumn(dbName, tblName).build();
+            .any().onAnyColumn(dbName, tblName, tableOwner).build();
         if (!authzChecker_.get().hasAccess(user, privilegeRequest)) {
           iter.remove();
         }
@@ -802,7 +816,7 @@ public class Frontend {
         PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder(
             authzFactory_.getAuthorizableFactory())
             .any().onColumn(table.getTableName().getDb(), table.getTableName().getTbl(),
-                colName).build();
+                colName, table.getOwnerUser()).build();
         if (!authzChecker_.get().hasAccess(user, privilegeRequest)) continue;
       }
       columns.add(column);
@@ -839,7 +853,8 @@ public class Frontend {
       return true;
     }
     PrivilegeRequest request = new PrivilegeRequestBuilder(
-        authzFactory_.getAuthorizableFactory()).any().onAnyColumn(db.getName()).build();
+        authzFactory_.getAuthorizableFactory()).any().onAnyColumn(
+            db.getName(), db.getOwnerUser()).build();
     return authzChecker_.get().hasAccess(user, request);
   }
 
@@ -1011,9 +1026,7 @@ public class Frontend {
       // First run a table check
       PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder(
           authzFactory_.getAuthorizableFactory())
-          .allOf(Privilege.VIEW_METADATA).onTable(table.getDb().getName(),
-              table.getName())
-          .build();
+          .allOf(Privilege.VIEW_METADATA).onTable(table).build();
       if (!authzChecker_.get().hasAccess(user, privilegeRequest)) {
         // Filter out columns that the user is not authorized to see.
         filteredColumns = new ArrayList<Column>();
@@ -1022,8 +1035,8 @@ public class Frontend {
           privilegeRequest = new PrivilegeRequestBuilder(
               authzFactory_.getAuthorizableFactory())
               .allOf(Privilege.VIEW_METADATA)
-              .onColumn(table.getDb().getName(), table.getName(), colName)
-              .build();
+              .onColumn(table.getDb().getName(),
+                  table.getName(), colName, table.getOwnerUser()).build();
           if (authzChecker_.get().hasAccess(user, privilegeRequest)) {
             filteredColumns.add(col);
           }
@@ -1051,9 +1064,7 @@ public class Frontend {
       if (authzFactory_.getAuthorizationConfig().isEnabled()) {
         PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder(
             authzFactory_.getAuthorizableFactory())
-            .allOf(Privilege.VIEW_METADATA)
-            .onTable(table.getDb().getName(),table.getName())
-            .build();
+            .allOf(Privilege.VIEW_METADATA).onTable(table).build();
         // Only filter if the user doesn't have table access.
         if (!authzChecker_.get().hasAccess(user, privilegeRequest)) {
           List<TResultRow> results = new ArrayList<>();
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
index f839138..f4980d0 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.authorization;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.impala.analysis.AnalysisContext;
@@ -26,6 +27,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TPrivilegeLevel;
 import org.apache.impala.thrift.TQueryOptions;
@@ -38,12 +40,16 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -54,6 +60,7 @@ import static org.junit.Assert.assertTrue;
  */
 @RunWith(Parameterized.class)
 public class AuthorizationStmtTest extends AuthorizationTestBase {
+  public static final Logger LOG = LoggerFactory.getLogger(AuthorizationStmtTest.class);
   public AuthorizationStmtTest(AuthorizationProvider authzProvider)
       throws ImpalaException {
     super(authzProvider);
@@ -3048,6 +3055,170 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
     }
   }
 
+  /**
+   * Validates Ranger's object ownership privileges.
+   */
+  @Test
+  public void testRangerObjectOwnership() throws Exception {
+    if (authzProvider_ == AuthorizationProvider.SENTRY) return;
+    // Out of the box there are no privileges for the owner on functional db.
+    // So the following set of queries should fail with authz failures.
+    // Maps from a query to the corresponding authz error.
+    ImmutableMap<AuthzTest, String> testQueries = ImmutableMap
+        .<AuthzTest, String>builder()
+        .put(authorize("select count(*) from functional.alltypes"),
+            selectError("functional.alltypes"))
+        .put(authorize("select id from functional.alltypes"),
+            selectError("functional.alltypes"))
+        .put(authorize("select id from functional.alltypes_view"),
+            selectError("functional.alltypes_view"))
+        .put(authorize("show create table functional.alltypes"),
+            accessError("functional.alltypes"))
+        .put(authorize("describe functional.alltypes"),
+            accessError("functional.alltypes"))
+        .put(authorize("show create table functional.alltypes_view"),
+            accessError("functional.alltypes_view"))
+        .put(authorize("describe functional.alltypes_view"),
+            accessError("functional.alltypes_view"))
+        .put(authorize("describe functional.allcomplextypes.int_struct_col"),
+            accessError("functional.allcomplextypes"))
+        .put(authorize("refresh functional.alltypes"),
+            refreshError("functional.alltypes"))
+        .put(authorize("invalidate metadata functional.alltypes"),
+            refreshError("functional.alltypes"))
+        .put(authorize("compute stats functional.alltypes"),
+            alterError("functional.alltypes"))
+        .put(authorize("drop stats functional.alltypes"),
+            alterError("functional.alltypes"))
+        .put(authorize("create table functional.test_tbl(a int)"),
+            createError("functional"))
+        .put(authorize("create table functional.test_tbl like functional.alltypes"),
+            accessError("functional.alltypes"))
+        .put(authorize("create table functional.test_tbl as select 1"),
+            createError("functional"))
+        .put(authorize("create view functional.test_view as select 1"),
+            createError("functional"))
+        .put(authorize("alter table functional.alltypes add column c1 int"),
+            alterError("functional"))
+        .put(authorize("drop table functional.alltypes"),
+            dropError("functional"))
+        .put(authorize("drop view functional.alltypes_view"),
+            dropError("functional"))
+        .put(authorize("alter view functional.alltypes_view as select 1"),
+            alterError("functional.alltypes_view"))
+        .put(authorize("alter database functional set owner user foo"),
+            accessError(true, "functional"))
+        .build();
+    // Run the queries.
+    for (AuthzTest authz: testQueries.keySet()) authz.error(testQueries.get(authz));
+    // Grant ALL privileges on functional db to it's owner. All the above queries
+    // should be authorized now, since we are running as the owner of the db and
+    // ownership should be translated to the tables underneath.
+    String policyName = "functional_owner_" + TestUtils.getRandomString(5);
+    createOwnerPolicy(policyName, "ALL", "functional", "*", "*");
+    try {
+      rangerImpalaPlugin_.refreshPoliciesAndTags();
+      for (AuthzTest authz: testQueries.keySet()) authz.ok();
+    } finally {
+      deleteRangerPolicy(policyName);
+    }
+    rangerImpalaPlugin_.refreshPoliciesAndTags();
+    // Tests for more fine grained {OWNER} privileges.
+    //
+    // SELECT privilege.
+    // With default privileges, select on both alltypes/alltypes_view should fail.
+    authorize("select count(*) from functional.alltypes")
+        .error(selectError("functional.alltypes"));
+    authorize("select count(*) from functional.alltypes")
+        .error(selectError("functional.alltypes"));
+    policyName = "functional_owner_alltypes" + TestUtils.getRandomString(5);
+    createOwnerPolicy(policyName, "SELECT", "functional", "alltypes", "*");
+    rangerImpalaPlugin_.refreshPoliciesAndTags();
+    // With the new privileges, only the first query should pass. Also,
+    // any other non-SELECT on functional.alltypes should fail.
+    try {
+      authorize("select count(*) from functional.alltypes").ok();
+      authorize("alter table functional.alltypes add column c1 int")
+          .error(alterError("functional"));
+      authorize("drop table functional.alltypes")
+          .error(dropError("functional"));
+      authorize("select count(*) from functional.alltypes_view")
+          .error(selectError("functional.alltypes_view"));
+    } finally {
+      deleteRangerPolicy(policyName);
+    }
+  }
+
+  private void createOwnerPolicy(String policyName, String privilege,
+      String db, String tbl, String col) throws Exception {
+    // Template policy that grants privileges on a given db/tbl/column to it's
+    // owner.
+    final String createOwnerPolicyTemplate = "{\n" +
+        "    \"isAuditEnabled\": true,\n" +
+        "    \"isDenyAllElse\": false,\n" +
+        "    \"isEnabled\": true,\n" +
+        "    \"name\": \"%s\",\n" + // policy name
+        "    \"policyItems\": [\n" +
+        "        {\n" +
+        "            \"accesses\": [\n" +
+        "                {\n" +
+        "                    \"isAllowed\": true,\n" +
+        "                    \"type\": \"%s\"\n" + // privilege to grant
+        "                }\n" +
+        "            ],\n" +
+        "            \"delegateAdmin\": false,\n" +
+        "            \"users\": [\n" +
+        "                \"{OWNER}\"\n" + // {OWNER} access
+        "            ]\n" +
+        "        }\n" +
+        "    ],\n" +
+        "    \"policyPriority\": 0,\n" +
+        "    \"policyType\": 0,\n" +
+        "    \"resources\": {\n" +
+        "        \"column\": {\n" +
+        "            \"isExcludes\": false,\n" +
+        "            \"isRecursive\": false,\n" +
+        "           \"values\": [\n" +
+        "               \"%s\"\n" +  // column name
+        "           ]\n" +
+        "        },\n" +
+        "        \"database\": {\n" +
+        "            \"isExcludes\": false,\n" +
+        "            \"isRecursive\": false,\n" +
+        "            \"values\": [\n" +
+        "                \"%s\"\n" + // database name
+        "            ]\n" +
+        "        },\n" +
+        "        \"table\": {\n" +
+        "            \"isExcludes\": false,\n" +
+        "            \"isRecursive\": false,\n" +
+        "            \"values\": [\n" +
+        "                \"%s\"\n" +  // table name
+        "            ]\n" +
+        "        }\n" +
+        "    },\n" +
+        "    \"service\": \"%s\",\n" + // service name
+        "    \"serviceType\": \"%s\"\n" + // service type
+        "}";
+    String policyRequest = String.format(createOwnerPolicyTemplate,
+        policyName, privilege, col, db, tbl, RANGER_SERVICE_NAME, RANGER_SERVICE_TYPE);
+    // Some old policies may exist on the same db/tbl/col combination due to other test
+    // runs. We clean them up and retry in that case.
+    try {
+      createRangerPolicy(policyName, policyRequest);
+    } catch (RuntimeException e) {
+      if (!e.getMessage().contains("Another policy already exists")) throw e;
+      LOG.info("Another policy exists for the given resource, deleting it", e);
+      // Look for policy-name=[*]
+      Pattern pattern = Pattern.compile("policy-name=\\[(.*?)\\]");
+      Matcher m = pattern.matcher(e.getMessage());
+      assertTrue(m.find());
+      LOG.info("Deleting policy: " + m.group(1));
+      deleteRangerPolicy(m.group(1));
+      createRangerPolicy(policyName, policyRequest);
+    }
+  }
+
   private void verifyPrivilegeReqs(String stmt, Set<String> expectedPrivilegeNames)
       throws ImpalaException {
     verifyPrivilegeReqs(createAnalysisCtx(authzFactory_), stmt, expectedPrivilegeNames);
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
index 6b968db..009d05f 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
@@ -593,8 +593,9 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
         .type(RangerRESTUtils.REST_MIME_TYPE_JSON)
         .post(ClientResponse.class, json);
     if (response.getStatusInfo().getFamily() != Family.SUCCESSFUL) {
-      throw new RuntimeException(
-          String.format("Unable to create a Ranger policy: %s.", policyName));
+      throw new RuntimeException(String.format(
+          "Unable to create a Ranger policy: %s Response: %s",
+          policyName, response.getEntity(String.class)));
     }
   }
 
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 4f3ac87..8682f64 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -25,6 +25,7 @@ import requests
 from getpass import getuser
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.util.hdfs_util import NAMENODE
+from tests.util.calculation_util import get_random_id
 
 ADMIN = "admin"
 RANGER_AUTH = ("admin", "admin")
@@ -34,6 +35,10 @@ IMPALAD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
 CATALOGD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
                 "--ranger_app_id=impala --authorization_provider=ranger"
 
+LOCAL_CATALOG_IMPALAD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
+    "--ranger_app_id=impala --authorization_provider=ranger --use_local_catalog=true"
+LOCAL_CATALOG_CATALOGD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
+    "--ranger_app_id=impala --authorization_provider=ranger --catalog_topic_mode=minimal"
 
 class TestRanger(CustomClusterTestSuite):
   """
@@ -545,6 +550,14 @@ class TestRanger(CustomClusterTestSuite):
     if statement is not None:
       self.execute_query_expect_success(client, statement)
 
+  def _run_query_as_user(self, query, username, expect_success):
+    """Helper to run an input query as a given user."""
+    impala_client = self.create_impala_client()
+    if expect_success:
+      return self.execute_query_expect_success(
+          impala_client, query, user=username, query_options={'sync_ddl': 1})
+    return self.execute_query_expect_failure(impala_client, query, user=username)
+
   @CustomClusterTestSuite.with_args(
     impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
   def test_unsupported_sql(self):
@@ -619,3 +632,87 @@ class TestRanger(CustomClusterTestSuite):
       else:
         assert "Error revoking a privilege in Ranger. Ranger error message: " \
                "HTTP 403 Error: Grantee group invalid_group doesn't exist" in str(result)
+
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def test_legacy_catalog_ownership(self):
+      self._test_ownership()
+
+  @CustomClusterTestSuite.with_args(impalad_args=LOCAL_CATALOG_IMPALAD_ARGS,
+    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS)
+  def test_local_catalog_ownership(self):
+      # getTableIfCached() in LocalCatalog loads a minimal incomplete table
+      # that does not include the ownership information. Hence show tables
+      # *never* show owned tables. TODO(bharathv): Fix in a follow up commit
+      pytest.xfail("getTableIfCached() faulty behavior, known issue")
+      self._test_ownership()
+
+  def _test_ownership(self):
+    """Tests ownership privileges for databases and tables with ranger along with
+    some known quirks in the implementation."""
+    test_user = getuser()
+    test_db = "test_ranger_ownership_" + get_random_id(5).lower()
+    # Create a test database as "admin" user. Owner is set accordingly.
+    self._run_query_as_user("create database {}".format(test_db), ADMIN, True)
+    try:
+      # Try to create a table under test_db as current user. It should fail.
+      self._run_query_as_user(
+          "create table {}.foo(a int)".format(test_db), test_user, False)
+      # Change the owner of the database to the current user.
+      self._run_query_as_user(
+          "alter database {} set owner user {}".format(test_db, test_user), ADMIN, True)
+      # Try creating a table under it again. It should still fail due to lack of ownership
+      # privileges
+      self._run_query_as_user(
+          "create table {}.foo(a int)".format(test_db), test_user, False)
+      # Create ranger ownership poicy for the current user on test_db.
+      resource = {
+        "database": test_db,
+        "column": "*",
+        "table": "*"
+      }
+      access = ["create", "select"]
+      TestRanger._grant_ranger_privilege("{OWNER}", resource, access)
+      self._run_query_as_user("refresh authorization", ADMIN, True)
+      try:
+        # Create should succeed now.
+        self._run_query_as_user(
+            "create table {}.foo(a int)".format(test_db), test_user, True)
+        # Run show tables on the db. The resulting list should be empty. This happens
+        # because the created table's ownership information is not aggresively cached
+        # by the current Catalog implementations. Hence the analysis pass does not
+        # have access to the ownership information to verify if the current session
+        # user is actually the owner. We need to fix this by caching the HMS metadata
+        # more aggressively when the table loads. TODO(IMPALA-8937).
+        result =\
+            self._run_query_as_user("show tables in {}".format(test_db), test_user, True)
+        assert len(result.data) == 0
+        # Run a simple query that warms up the table metadata and repeat SHOW TABLES.
+        self._run_query_as_user("select * from {}.foo".format(test_db), test_user, True)
+        result =\
+            self._run_query_as_user("show tables in {}".format(test_db), test_user, True)
+        assert len(result.data) == 1
+        assert "foo" in result.data
+        # Change the owner of the db back to the admin user
+        self._run_query_as_user(
+            "alter database {} set owner user {}".format(test_db, ADMIN), ADMIN, True)
+        result =\
+            self._run_query_as_user("show tables in {}".format(test_db), test_user, False)
+        err = "User '{}' does not have privileges to access: {}.*.*".\
+            format(test_user, test_db)
+        assert err in str(result)
+        # test_user is still the owner of the table, so select should work fine.
+        self._run_query_as_user("select * from {}.foo".format(test_db), test_user, True)
+        # Change the table owner back to admin.
+        self._run_query_as_user(
+            "alter table {}.foo set owner user {}".format(test_db, ADMIN), ADMIN, True)
+        # test_user should not be authorized to run the queries anymore.
+        result = self._run_query_as_user(
+            "select * from {}.foo".format(test_db), test_user, False)
+        err = ("AuthorizationException: User '{}' does not have privileges to execute" +
+            " 'SELECT' on: {}.foo").format(test_user, test_db)
+        assert err in str(result)
+      finally:
+        TestRanger._revoke_ranger_privilege("{OWNER}", resource, access)
+    finally:
+      self._run_query_as_user("drop database {} cascade".format(test_db), ADMIN, True)