You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/19 00:42:18 UTC

[impala] branch master updated (ac4984b -> 98de1c5)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from ac4984b  Revert "IMPALA-10503: testdata load hits hive memory limit errors during hive inserts"
     new 6162343  IMPALA-10512: ALTER TABLE ADD PARTITION should bump the write id for ACID tables
     new 98de1c5  IMPALA-9234: Support Ranger row filtering policies

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/global-flags.cc                      |   3 -
 be/src/util/backend-gflag-util.cc                  |   9 +-
 common/thrift/BackendGflags.thrift                 |   2 +
 .../java/org/apache/impala/analysis/Analyzer.java  | 119 +++++---
 .../org/apache/impala/analysis/InlineViewRef.java  |  31 ++-
 .../impala/authorization/AuthorizationChecker.java |  12 +
 .../impala/authorization/AuthorizationFactory.java |   2 +-
 .../authorization/NoopAuthorizationFactory.java    |  13 +-
 .../org/apache/impala/authorization/TableMask.java |  20 ++
 .../ranger/RangerAuthorizationChecker.java         | 115 +++++---
 .../ranger/RangerAuthorizationContext.java         |  31 ++-
 .../ranger/RangerAuthorizationFactory.java         |   5 +-
 .../ranger/RangerBufferAuditHandler.java           |  12 +-
 .../java/org/apache/impala/catalog/Catalog.java    |   7 +-
 .../impala/catalog/CatalogServiceCatalog.java      |   5 +
 .../org/apache/impala/catalog/ImpaladCatalog.java  |   7 +
 .../org/apache/impala/catalog/Transaction.java     |  19 +-
 .../org/apache/impala/service/BackendConfig.java   |   5 +
 .../apache/impala/service/CatalogOpExecutor.java   |  77 ++++--
 .../org/apache/impala/util/AuthorizationUtil.java  |   4 +
 .../authorization/AuthorizationStmtTest.java       |  10 +-
 .../authorization/AuthorizationTestBase.java       |   4 +-
 .../authorization/ranger/RangerAuditLogTest.java   | 142 ++++++++++
 .../org/apache/impala/common/FrontendTestBase.java |  13 +-
 .../queries/QueryTest/full-acid-rowid.test         |  32 +--
 .../queries/QueryTest/ranger_column_masking.test   |   6 +
 .../ranger_column_masking_and_row_filtering.test   |  34 +++
 .../queries/QueryTest/ranger_row_filtering.test    | 302 +++++++++++++++++++++
 tests/authorization/test_ranger.py                 | 222 ++++++++++++++-
 tests/query_test/test_acid.py                      |  27 ++
 30 files changed, 1137 insertions(+), 153 deletions(-)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_and_row_filtering.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test

[impala] 02/02: IMPALA-9234: Support Ranger row filtering policies

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 98de1c5436415c270901e4af76e0ec06deee1f32
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Mon Jan 25 10:27:41 2021 +0800

    IMPALA-9234: Support Ranger row filtering policies
    
    Ranger row filtering policies provide customized expressions to filter
    out rows for specific users when reading from a table. This patch adds
    support for this feature. A new feature flag, enable_row_filtering, is
    added to disable this experimental feature. It defaults to be true so
    the feature is enabled by default. Enabling row-filtering requires
    --enable_column_masking=true since it depends on the column masking
    implementation.
    
    Note that row filtering policies take effects prior to any column
    masking policies, because column masking policies apply on result data.
    
    Implementation:
    The existing table masking view infrastructure can be extended to
    support row filtering. Currently when analyzing a table with column
    masking policies, we replace the TableRef with an InlineViewRef which
    contains a SelectStmt wrapping the columns with masking expressions.
    This patch adds the row filtering expressions to the WhereClause of the
    SelectStmt.
    
    Limitations:
     - Expressions using subqueries are not supported (IMPALA-10483).
     - Row filtering policies on nested tables will not be applied when
       nested collection columns are used directly in the FROM clause. This
       will leak data so we forbid such kinds of queries until IMPALA-10484
       is resolved.
    
    Tests:
     - Add FE test for error message when disabling row filtering.
     - Add e2e test with row filtering policies.
     - Add e2e test with column masking and row filtering policies both take
       place.
     - Verified audits in a CDP cluster with Ranger and Solr set up.
    
    Change-Id: I580517be241225ca15e45686381b78890178d7cc
    Reviewed-on: http://gerrit.cloudera.org:8080/16976
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |   3 -
 be/src/util/backend-gflag-util.cc                  |   9 +-
 common/thrift/BackendGflags.thrift                 |   2 +
 .../java/org/apache/impala/analysis/Analyzer.java  | 119 +++++---
 .../org/apache/impala/analysis/InlineViewRef.java  |  31 ++-
 .../impala/authorization/AuthorizationChecker.java |  12 +
 .../impala/authorization/AuthorizationFactory.java |   2 +-
 .../authorization/NoopAuthorizationFactory.java    |  13 +-
 .../org/apache/impala/authorization/TableMask.java |  20 ++
 .../ranger/RangerAuthorizationChecker.java         | 115 +++++---
 .../ranger/RangerAuthorizationContext.java         |  31 ++-
 .../ranger/RangerAuthorizationFactory.java         |   5 +-
 .../ranger/RangerBufferAuditHandler.java           |  12 +-
 .../org/apache/impala/service/BackendConfig.java   |   5 +
 .../org/apache/impala/util/AuthorizationUtil.java  |   4 +
 .../authorization/AuthorizationStmtTest.java       |  10 +-
 .../authorization/AuthorizationTestBase.java       |   4 +-
 .../authorization/ranger/RangerAuditLogTest.java   | 142 ++++++++++
 .../org/apache/impala/common/FrontendTestBase.java |  13 +-
 .../queries/QueryTest/ranger_column_masking.test   |   6 +
 .../ranger_column_masking_and_row_filtering.test   |  34 +++
 .../queries/QueryTest/ranger_row_filtering.test    | 302 +++++++++++++++++++++
 tests/authorization/test_ranger.py                 | 222 ++++++++++++++-
 23 files changed, 1004 insertions(+), 112 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 73d2987..795f2e4 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -338,9 +338,6 @@ DEFINE_bool_hidden(use_customized_user_groups_mapper_for_ranger, false,
     "If true, use the customized user-to-groups mapper when performing authorization via"
     " Ranger.");
 
-DEFINE_bool(enable_column_masking, true,
-    "If false, disable the column masking feature. Defaults to be true.");
-
 DEFINE_bool(enable_incremental_metadata_updates, true,
     "If true, Catalog Server will send incremental table updates in partition level in "
     "the statestore topic updates. Legacy coordinators will apply the partition updates "
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 9d0daf6..fb99457 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -85,7 +85,6 @@ DECLARE_string(min_privilege_set_for_show_stmts);
 DECLARE_int32(num_expected_executors);
 DECLARE_int32(num_check_authorization_threads);
 DECLARE_bool(use_customized_user_groups_mapper_for_ranger);
-DECLARE_bool(enable_column_masking);
 DECLARE_bool(compact_catalog_topic);
 DECLARE_bool(enable_incremental_metadata_updates);
 DECLARE_int64(topic_update_tbl_max_wait_time_ms);
@@ -153,6 +152,13 @@ DEFINE_bool_hidden(saml2_ee_test_mode, false,
     "401 Unauthorized to allow checking cookies dealing with Thrift protocol. "
     "Should be only used in test environments." );
 
+DEFINE_bool(enable_column_masking, true,
+    "If false, disable the column masking feature. Defaults to be true.");
+
+DEFINE_bool(enable_row_filtering, true,
+    "If false, disable the row filtering feature. Defaults to be true. Enabling this flag"
+    " requires enable_column_masking to be true.");
+
 namespace impala {
 
 Status GetConfigFromCommand(const string& flag_cmd, string& result) {
@@ -245,6 +251,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_use_customized_user_groups_mapper_for_ranger(
       FLAGS_use_customized_user_groups_mapper_for_ranger);
   cfg.__set_enable_column_masking(FLAGS_enable_column_masking);
+  cfg.__set_enable_row_filtering(FLAGS_enable_row_filtering);
   cfg.__set_compact_catalog_topic(FLAGS_compact_catalog_topic);
   cfg.__set_enable_incremental_metadata_updates(
       FLAGS_enable_incremental_metadata_updates);
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index f468755..5dfcf50 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -191,4 +191,6 @@ struct TBackendGflags {
   83: required bool saml2_ee_test_mode
 
   84: required string scratch_dirs
+
+  85: required bool enable_row_filtering
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 1432d02..0f67726 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -59,7 +59,6 @@ import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.local.LocalKuduTable;
 import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.Id;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
@@ -775,25 +774,7 @@ public class Analyzer {
       // Register privilege requests to prefer reporting an authorization error over
       // an analysis error. We should not accidentally reveal the non-existence of a
       // table/database if the user is not authorized.
-      if (rawPath.size() > 1) {
-        registerPrivReq(builder -> {
-          builder.onTableUnknownOwner(
-              rawPath.get(0), rawPath.get(1)).allOf(tableRef.getPrivilege());
-          if (tableRef.requireGrantOption()) {
-            builder.grantOption();
-          }
-          return builder.build();
-        });
-      }
-
-      registerPrivReq(builder -> {
-        builder.onTableUnknownOwner(
-            getDefaultDb(), rawPath.get(0)).allOf(tableRef.getPrivilege());
-        if (tableRef.requireGrantOption()) {
-          builder.grantOption();
-        }
-        return builder.build();
-      });
+      registerPrivReqOnRawPath(tableRef, rawPath);
       throw e;
     } catch (TableLoadingException e) {
       throw new AnalysisException(String.format(
@@ -814,27 +795,87 @@ public class Analyzer {
             table instanceof FeDataSourceTable);
         resolvedTableRef = new BaseTableRef(tableRef, resolvedPath);
       }
-      // Only do table masking when authorization is enabled and the authorization
-      // factory supports column masking. If both of these are false, return the unmasked
-      // table ref.
-      if (!doTableMasking || !getAuthzFactory().getAuthorizationConfig().isEnabled()
-          || !getAuthzFactory().supportsColumnMasking()) {
-        return resolvedTableRef;
-      }
-      // Performing table masking.
-      AuthorizationChecker authChecker = getAuthzFactory().newAuthorizationChecker(
-          getCatalog().getAuthPolicy());
-      TableMask tableMask = new TableMask(authChecker, table, user_);
-      try {
-        if (!tableMask.needsMaskingOrFiltering()) return resolvedTableRef;
-        return InlineViewRef.createTableMaskView(resolvedPath, resolvedTableRef,
+      if (!doTableMasking) return resolvedTableRef;
+      return resolveTableMask(resolvedTableRef, table);
+    } else {
+      CollectionTableRef res = new CollectionTableRef(tableRef, resolvedPath);
+      // Relative table refs don't need masking. Its base table will be masked.
+      if (!doTableMasking || res.isRelative()) return res;
+      return resolveTableMask(res, res.getTable());
+    }
+  }
+
+  /**
+   * Register privilege requests based on the 'tableRawPath'. Only used when we fail to
+   * resolve the TableRef. With these requests we can prefer reporting an authorization
+   * error over an analysis error. See more in resolveTableRef().
+   */
+  private void registerPrivReqOnRawPath(TableRef tableRef, List<String> tableRawPath) {
+    if (tableRawPath.size() > 1) {
+      registerPrivReq(builder -> {
+        builder.onTableUnknownOwner(
+            tableRawPath.get(0), tableRawPath.get(1)).allOf(tableRef.getPrivilege());
+        if (tableRef.requireGrantOption()) {
+          builder.grantOption();
+        }
+        return builder.build();
+      });
+    }
+
+    registerPrivReq(builder -> {
+      builder.onTableUnknownOwner(
+          getDefaultDb(), tableRawPath.get(0)).allOf(tableRef.getPrivilege());
+      if (tableRef.requireGrantOption()) {
+        builder.grantOption();
+      }
+      return builder.build();
+    });
+  }
+
+  /**
+   * Resolves column-masking/row-filtering policies on the given table. Returns a table
+   * masking view if any of these policies exist. The TableRef should be resolved first
+   * so we know the target table/view/collection.
+   *
+   * @param resolvedTableRef A resolved TableRef for table masking
+   * @param basedTable FeTable of the resolved table or the collection's based table
+   */
+  private TableRef resolveTableMask(TableRef resolvedTableRef, FeTable basedTable)
+      throws AnalysisException {
+    Preconditions.checkState(resolvedTableRef.isResolved(), "Table should be resolved");
+    // Only do table masking when authorization is enabled and the authorization
+    // factory supports column-masking/row-filtering. If both of these are false,
+    // return the unmasked table ref.
+    if (!getAuthzFactory().getAuthorizationConfig().isEnabled()
+        || !getAuthzFactory().supportsTableMasking()) {
+      return resolvedTableRef;
+    }
+    // Performing table masking.
+    AuthorizationChecker authChecker = getAuthzFactory().newAuthorizationChecker(
+        getCatalog().getAuthPolicy());
+    TableMask tableMask = new TableMask(authChecker, basedTable, user_);
+    try {
+      if (resolvedTableRef instanceof CollectionTableRef) {
+        Preconditions.checkState(!resolvedTableRef.isRelative(),
+            "Relative table refs don't need masking. Its base table will be masked.");
+        if (tableMask.needsRowFiltering()) {
+          // TODO: Support this in IMPALA-10484.
+          throw new AnalysisException(String.format("Using non-relative collection " +
+              "column %s of table %s is not supported since there are row-filtering " +
+              "policies on this table (IMPALA-10484). Rewrite query to use relative " +
+              "reference.",
+              String.join(".", resolvedTableRef.getResolvedPath().getRawPath()),
+              basedTable.getFullName()));
+        }
+      } else if (tableMask.needsMaskingOrFiltering()) {
+        return InlineViewRef.createTableMaskView(basedTable, resolvedTableRef,
             tableMask, getAuthzCtx());
-      } catch (InternalException e) {
-        LOG.error("Error performing table masking", e);
-        throw new AnalysisException("Error performing table masking", e);
       }
-    } else {
-      return new CollectionTableRef(tableRef, resolvedPath);
+      return resolvedTableRef;
+    } catch (InternalException e) {
+      String msg = "Error performing table masking on " + basedTable.getFullName();
+      LOG.error(msg, e);
+      throw new AnalysisException(msg, e);
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
index 0482797..770c7ec 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
@@ -20,6 +20,7 @@ package org.apache.impala.analysis;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.impala.authorization.AuthorizationContext;
 import org.apache.impala.authorization.TableMask;
@@ -142,21 +143,20 @@ public class InlineViewRef extends TableRef {
    * Creates an inline-view doing table masking for column masking and row filtering
    * policies. Callers should replace 'tableRef' with the returned view.
    *
-   * @param resolvedPath resolved path for the original table/view
+   * @param resolvedTable resolved FeTable for the original table/view
    * @param tableRef original resolved table/view
    * @param tableMask TableMask providing column masking and row filtering policies
    * @param authzCtx AuthorizationContext containing RangerBufferAuditHandler
    */
-  static InlineViewRef createTableMaskView(Path resolvedPath, TableRef tableRef,
+  static InlineViewRef createTableMaskView(FeTable resolvedTable, TableRef tableRef,
       TableMask tableMask, AuthorizationContext authzCtx) throws AnalysisException,
       InternalException {
-    Preconditions.checkNotNull(resolvedPath);
-    Preconditions.checkNotNull(resolvedPath.getRootTable());
+    Preconditions.checkNotNull(resolvedTable);
     Preconditions.checkNotNull(tableRef);
     Preconditions.checkNotNull(authzCtx);
     Preconditions.checkState(tableRef instanceof InlineViewRef
         || tableRef instanceof BaseTableRef);
-    List<Column> columns = resolvedPath.getRootTable().getColumnsInHiveOrder();
+    List<Column> columns = resolvedTable.getColumnsInHiveOrder();
     List<SelectListItem> items = Lists.newArrayListWithCapacity(columns.size());
     for (Column col: columns) {
       if (col.getType().isComplexType()) continue;
@@ -168,8 +168,25 @@ public class InlineViewRef extends TableRef {
     }
     SelectList selectList = new SelectList(items);
     FromClause fromClause = new FromClause(Lists.newArrayList(tableRef));
-    SelectStmt tableMaskStmt = new SelectStmt(selectList, fromClause,
-        null, null, null, null, null);
+    Expr wherePredicate = tableMask.createRowFilter(authzCtx);
+    SelectStmt tableMaskStmt = new SelectStmt(selectList, fromClause, wherePredicate,
+        null, null, null, null);
+    // TODO(IMPALA-10483): Column-masking/Row-filtering expressions may have subqueries
+    //  which may introduce new tables. We should trigger StmtMetadataLoader#loadTables()
+    //  on 'tableMaskStmt'. Otherwise, they can't be resolved. Reject the query in this
+    //  case.
+    List<TableRef> tableRefsInView = tableMaskStmt.collectTableRefs();
+    // Should only contain the base table ref.
+    if (tableRefsInView.size() > 1) {
+      tableRefsInView.remove(tableRef);
+      throw new AnalysisException("Column-masking/Row-filtering expressions using " +
+          "subqueries are not supported (IMPALA-10483). Table(s) in the subquery: " +
+          tableRefsInView.stream()
+              .map(t -> String.join(".", t.getPath()))
+              .collect(Collectors.toList())
+      );
+    }
+
     InlineViewRef viewRef = new InlineViewRef(/*alias*/ null, tableMaskStmt,
         (TableSampleClause) null);
     tableRef.migratePropertiesTo(viewRef);
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
index f10c984..17749b5 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
@@ -91,12 +91,24 @@ public interface AuthorizationChecker {
       List<String> requiredColumns) throws InternalException;
 
   /**
+   * Returns whether the given table needs row filtering when read by the given user.
+   */
+  boolean needsRowFiltering(User user, String dbName, String tableName)
+      throws InternalException;
+
+  /**
    * Returns the column mask string for the given column.
    */
   String createColumnMask(User user, String dbName, String tableName, String columnName,
       AuthorizationContext authzCtx) throws InternalException;
 
   /**
+   * Returns the row filter for the given table.
+   */
+  String createRowFilter(User user, String dbName, String tableName,
+      AuthorizationContext rangerCtx) throws InternalException;
+
+  /**
    * This method is to be executed after AnalysisContext#analyze() is completed.
    */
   void postAnalyze(AuthorizationContext authzCtx);
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java
index 3a8c9d8..9b9dba8 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java
@@ -74,5 +74,5 @@ public interface AuthorizationFactory {
    * Returns whether the authorization implementation supports column masking and row
    * filtering. Currently, only Ranger implementation supports these.
    */
-  boolean supportsColumnMasking();
+  boolean supportsTableMasking();
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
index b1d2a2e..e3097f6 100644
--- a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
@@ -220,12 +220,23 @@ public class NoopAuthorizationFactory implements AuthorizationFactory {
       }
 
       @Override
+      public boolean needsRowFiltering(User user, String dbName, String tableName) {
+        return false;
+      }
+
+      @Override
       public String createColumnMask(User user, String dbName, String tableName,
           String columnName, AuthorizationContext authzCtx) {
         return null;
       }
 
       @Override
+      public String createRowFilter(User user, String dbName, String tableName,
+          AuthorizationContext rangerCtx) {
+        return null;
+      }
+
+      @Override
       public void postAnalyze(AuthorizationContext authzCtx) {
       }
     };
@@ -243,7 +254,7 @@ public class NoopAuthorizationFactory implements AuthorizationFactory {
   }
 
   @Override
-  public boolean supportsColumnMasking() {
+  public boolean supportsTableMasking() {
     return false;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/TableMask.java b/fe/src/main/java/org/apache/impala/authorization/TableMask.java
index 732c3ed..81b0349 100644
--- a/fe/src/main/java/org/apache/impala/authorization/TableMask.java
+++ b/fe/src/main/java/org/apache/impala/authorization/TableMask.java
@@ -64,6 +64,13 @@ public class TableMask {
   }
 
   /**
+   * Returns whether the table/view has row filtering policies.
+   */
+  public boolean needsRowFiltering() throws InternalException {
+    return authChecker_.needsRowFiltering(user_, dbName_, tableName_);
+  }
+
+  /**
    * Return the masked Expr of the given column
    */
   public Expr createColumnMask(String colName, Type colType,
@@ -91,4 +98,17 @@ public class TableMask {
     }
     return res;
   }
+
+  /**
+   * Return the row filter Expr
+   */
+  public Expr createRowFilter(AuthorizationContext authzCtx)
+      throws InternalException, AnalysisException {
+    String rowFilter = authChecker_.createRowFilter(user_, dbName_, tableName_, authzCtx);
+    if (rowFilter == null) return null;
+    // Parse the row filter string to AST by using it in a fake query.
+    String stmtSql = String.format("SELECT 1 FROM foo WHERE %s", rowFilter);
+    SelectStmt selectStmt = (SelectStmt) Parser.parse(stmtSql);
+    return selectStmt.getWhereClause();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
index eb62599..876659c 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
@@ -192,15 +192,17 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
       List<PrivilegeRequest> privilegeRequests)
       throws AuthorizationException, InternalException {
     boolean isColumnMaskingEnabled = BackendConfig.INSTANCE.isColumnMaskingEnabled();
+    boolean isRowFilteringEnabled = BackendConfig.INSTANCE.isRowFilteringEnabled();
     for (PrivilegeRequest request : privilegeRequests) {
       if (!isColumnMaskingEnabled
           && request.getAuthorizable().getType() == Type.COLUMN) {
-        authorizeColumnMask(user,
+        throwIfColumnMaskingRequired(user,
             request.getAuthorizable().getDbName(),
             request.getAuthorizable().getTableName(),
             request.getAuthorizable().getColumnName());
-      } else if (request.getAuthorizable().getType() == Type.TABLE) {
-        authorizeRowFilter(user,
+      } else if (!isRowFilteringEnabled
+          && request.getAuthorizable().getType() == Type.TABLE) {
+        throwIfRowFilteringRequired(user,
             request.getAuthorizable().getDbName(),
             request.getAuthorizable().getTableName());
       }
@@ -274,11 +276,11 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
   }
 
   /**
-   * This method checks if column mask is enabled on the given columns and deny access
-   * when column mask is enabled by throwing an {@link AuthorizationException}. This is
-   * to prevent data leak when Hive has column mask enabled but not in Impala.
+   * This method throws an {@link AuthorizationException} if column mask is enabled on the
+   * given column. This is used to prevent data leak when Hive has column mask enabled but
+   * it's disabled in Impala.
    */
-  private void authorizeColumnMask(User user, String dbName, String tableName,
+  private void throwIfColumnMaskingRequired(User user, String dbName, String tableName,
       String columnName) throws InternalException, AuthorizationException {
     if (evalColumnMask(user, dbName, tableName, columnName, null).isMaskEnabled()) {
       throw new AuthorizationException(String.format(
@@ -297,17 +299,26 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
         return true;
       }
     }
-    return false;
+    return needsRowFiltering(user, dbName, tableName);
   }
 
   @Override
-  public String createColumnMask(User user, String dbName, String tableName,
-      String columnName, AuthorizationContext rangerCtx) throws InternalException {
-    RangerBufferAuditHandler auditHandler =
-        ((RangerAuthorizationContext) rangerCtx).getAuditHandler();
-    int numAuthzAuditEventsBefore = auditHandler.getAuthzEvents().size();
-    RangerAccessResult accessResult = evalColumnMask(user, dbName, tableName, columnName,
-        auditHandler);
+  public boolean needsRowFiltering(User user, String dbName, String tableName)
+      throws InternalException {
+    return evalRowFilter(user, dbName, tableName, null).isRowFilterEnabled();
+  }
+
+  /**
+   * Util method for removing stale audit events of column masking and row filtering
+   * policies. See comments below for cases we need this.
+   *
+   * TODO: Revisit RangerBufferAuditHandler and compare it to
+   *  org.apache.ranger.authorization.hive.authorizer.RangerHiveAuditHandler to see
+   *  whether we can avoid generating stale audit events there.
+   * TODO: Do we really need this for row-filtering?
+   */
+  private void removeStaleAudits(RangerBufferAuditHandler auditHandler,
+      int numAuthzAuditEventsBefore) {
     // When a user adds an "Unmasked" policy for 'columnName' that retains the original
     // value, accessResult.getMaskType() would be "MASK_NONE". We do not need to log such
     // an event. Removing such an event also makes the logged audits consistent when
@@ -335,8 +346,7 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
     // logged in RangerBufferAuditHandler#flush().
     List<AuthzAuditEvent> auditEvents = auditHandler.getAuthzEvents();
     Preconditions.checkState(auditEvents.size() - numAuthzAuditEventsBefore <= 1);
-    if (auditEvents.size() > numAuthzAuditEventsBefore &&
-        !accessResult.isMaskEnabled()) {
+    if (auditEvents.size() > numAuthzAuditEventsBefore) {
       // Recall that the same instance of RangerAuthorizationContext is passed to
       // createColumnMask() every time this method is called. Thus the same instance of
       // RangerBufferAuditHandler is provided for evalColumnMask() to log the event.
@@ -345,13 +355,24 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
       // evaluates to true, we only need to process the last event on 'auditEvents'.
       auditHandler.getAuthzEvents().remove(auditEvents.size() - 1);
     }
-    // No column masking policies, return the original column.
-    if (!accessResult.isMaskEnabled()) return columnName;
+  }
+
+  @Override
+  public String createColumnMask(User user, String dbName, String tableName,
+      String columnName, AuthorizationContext rangerCtx) throws InternalException {
+    RangerBufferAuditHandler auditHandler =
+        ((RangerAuthorizationContext) rangerCtx).getAuditHandler();
+    int numAuthzAuditEventsBefore = auditHandler.getAuthzEvents().size();
+    RangerAccessResult accessResult = evalColumnMask(user, dbName, tableName, columnName,
+        auditHandler);
+    if (!accessResult.isMaskEnabled()) {
+      // No column masking policies, remove any possible stale audit events and
+      // return the original column.
+      removeStaleAudits(auditHandler, numAuthzAuditEventsBefore);
+      return columnName;
+    }
     String maskType = accessResult.getMaskType();
     RangerServiceDef.RangerDataMaskTypeDef maskTypeDef = accessResult.getMaskTypeDef();
-    Preconditions.checkNotNull(maskType);
-    Preconditions.checkState(!auditEvents.isEmpty());
-    auditEvents.get(auditEvents.size() - 1).setAccessType(maskType.toLowerCase());
     // The expression used to replace the original column.
     String maskedColumn = columnName;
     // The expression of the mask type. Column names are referenced by "{col}".
@@ -384,14 +405,32 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
   }
 
   @Override
+  public String createRowFilter(User user, String dbName, String tableName,
+      AuthorizationContext rangerCtx) throws InternalException {
+    RangerBufferAuditHandler auditHandler =
+        ((RangerAuthorizationContext) rangerCtx).getAuditHandler();
+    int numAuthzAuditEventsBefore = auditHandler.getAuthzEvents().size();
+    RangerAccessResult accessResult = evalRowFilter(user, dbName, tableName,
+        auditHandler);
+    if (!accessResult.isRowFilterEnabled()) {
+      // No row filtering policies, remove any possible stale audit events.
+      removeStaleAudits(auditHandler, numAuthzAuditEventsBefore);
+      return null;
+    }
+    String filter = accessResult.getFilterExpr();
+    LOG.info("dbName: {}, tableName: {}, rowFilter: {}", dbName, tableName, filter);
+    return filter;
+  }
+
+  @Override
   public void postAnalyze(AuthorizationContext authzCtx) {
     Preconditions.checkArgument(authzCtx instanceof RangerAuthorizationContext);
-    ((RangerAuthorizationContext) authzCtx).stashAuditEvents(plugin_);
+    ((RangerAuthorizationContext) authzCtx).stashTableMaskingAuditEvents(plugin_);
   }
 
   /**
-   * Evaluate column masking policies on the given column and returns the result.
-   * A RangerAccessResult contains the matched policy details and the masked column.
+   * Evaluate column masking policies on the given column and returns the result,
+   * a RangerAccessResult contains the matched policy details and the masked column.
    * Note that Ranger will add an AuthzAuditEvent to auditHandler.getAuthzEvents() as
    * long as there exists a policy for the given column even though the policy does not
    * apply to 'user'.
@@ -415,22 +454,32 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
   }
 
   /**
-   * This method checks if row filter is enabled on the given tables and deny access
-   * when row filter is enabled by throwing an {@link AuthorizationException} . This is
-   * to prevent data leak when Hive has row filter enabled but not in Impala.
+   * Evaluate row filtering policies on the given table and returns the result,
+   * a RangerAccessResult contains the matched policy details and the filter string.
    */
-  private void authorizeRowFilter(User user, String dbName, String tableName)
-      throws InternalException, AuthorizationException {
+  private RangerAccessResult evalRowFilter(User user, String dbName, String tableName,
+      RangerBufferAuditHandler auditHandler) throws InternalException {
     RangerAccessResourceImpl resource = new RangerImpalaResourceBuilder()
         .database(dbName)
         .table(tableName)
         .build();
     RangerAccessRequest req = new RangerAccessRequestImpl(resource,
         SELECT_ACCESS_TYPE, user.getShortName(), getUserGroups(user));
-    if (plugin_.evalRowFilterPolicies(req, null).isRowFilterEnabled()) {
+    return plugin_.evalRowFilterPolicies(req, auditHandler);
+  }
+
+  /**
+   * This method throws an {@link AuthorizationException} if row filter is enabled on the
+   * given tables. This is used to prevent data leak when Hive has row filter enabled but
+   * it's disabled in Impala.
+   */
+  private void throwIfRowFilteringRequired(User user, String dbName, String tableName)
+      throws InternalException, AuthorizationException {
+    if (evalRowFilter(user, dbName, tableName, null).isRowFilterEnabled()) {
       throw new AuthorizationException(String.format(
-          "Impala does not support row filtering yet. Row filtering is enabled " +
-              "on table: %s.%s", dbName, tableName));
+          "Row filtering is disabled by --enable_row_filtering flag. Can't access " +
+              "table %s.%s that has row filtering policy.",
+          dbName, tableName));
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationContext.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationContext.java
index 43f710e..b5d72b0 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationContext.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationContext.java
@@ -53,23 +53,26 @@ public class RangerAuthorizationContext extends AuthorizationContext {
   public RangerBufferAuditHandler getAuditHandler() { return auditHandler_; }
 
   /**
-   * We stash the List of AuthzAuditEvent's in a Map after the analysis of the query and
-   * thus the AuthzAuditEvent's in the Map are deduplicated. At this point, there are
-   * only column masking-related events on the List. We will add back the deduplicated
-   * events by applyDeduplicatedAuthzEvents() only if the authorization of the query is
-   * successful. Thus, the relative order between the column masking-related events and
-   * other events on the List of auditHandler_.getAuthzEvents() is changed afterwards and
-   * only those column masking-related events are deduplicated.
+   * Stash and deduplicate the audit events produced by table masking (Column-masking /
+   * Row-filtering) which are performed during the analyze phase. Called at the end of
+   * analyzing. These stashed events will be added back after the query pass the
+   * authorization phase. Note that normal events (select, insert, drop, etc.) are
+   * produced in the authorization phase. Stashing table masking events avoids exposing
+   * them when the query fails authorization. Refer to IMPALA-9597 for further details.
    */
-  public void stashAuditEvents(RangerImpalaPlugin plugin) {
-    Set<String> unfilteredMaskNames = plugin.getUnfilteredMaskNames(
+  public void stashTableMaskingAuditEvents(RangerImpalaPlugin plugin) {
+    // Collect all the column masking types except "MASK_NONE", because MASK_NONE events
+    // have been removed in RangerAuthorizationChecker#removeStaleAudits().
+    Set<String> legalEventTypes = plugin.getUnfilteredMaskNames(
         Arrays.asList("MASK_NONE"));
+    // Row filter policies produce ROW_FILTER events.
+    legalEventTypes.add(RangerBufferAuditHandler.ACCESS_TYPE_ROWFILTER.toUpperCase());
     for (AuthzAuditEvent event : auditHandler_.getAuthzEvents()) {
-      // We assume that all the logged events until now are column masking-related. Since
-      // we remove those AuthzAuditEvent's corresponding to the "Unmasked" policy of type
-      // "MASK_NONE", we exclude this type of mask.
-      Preconditions.checkState(unfilteredMaskNames
-          .contains(event.getAccessType().toUpperCase()));
+      // We assume that all the logged events until now are table masking-related.
+      Preconditions.checkState(legalEventTypes
+          .contains(event.getAccessType().toUpperCase()),
+          "Illegal event access type: %s. Should be one of %s. Event details: %s",
+          event.getAccessType(), legalEventTypes, event);
 
       // event.getEventKey() is the concatenation of the following fields in an
       // AuthzAuditEvent: 'user', 'accessType', 'resourcePath', 'resourceType', 'action',
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
index eb8a1a0..c0a04e4 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
@@ -101,7 +101,8 @@ public class RangerAuthorizationFactory implements AuthorizationFactory {
   }
 
   @Override
-  public boolean supportsColumnMasking() {
-    return BackendConfig.INSTANCE.isColumnMaskingEnabled();
+  public boolean supportsTableMasking() {
+    return BackendConfig.INSTANCE.isColumnMaskingEnabled()
+        || BackendConfig.INSTANCE.isRowFilteringEnabled();
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerBufferAuditHandler.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerBufferAuditHandler.java
index b922080..1b27929 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerBufferAuditHandler.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerBufferAuditHandler.java
@@ -20,6 +20,7 @@ package org.apache.impala.authorization.ranger;
 import com.google.common.base.Preconditions;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessResource;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
@@ -44,6 +45,7 @@ import java.util.Optional;
 public class RangerBufferAuditHandler implements RangerAccessResultProcessor {
   private static final Logger LOG = LoggerFactory.getLogger(
       RangerBufferAuditHandler.class);
+  public static final String ACCESS_TYPE_ROWFILTER = "row_filter";
   private final RangerDefaultAuditHandler auditHandler_ = new RangerDefaultAuditHandler();
   private final List<AuthzAuditEvent> auditEvents_ = new ArrayList<>();
   private final String sqlStmt_; // The SQL statement to be logged
@@ -128,7 +130,15 @@ public class RangerBufferAuditHandler implements RangerAccessResultProcessor {
     String resourceType = resource != null ? resource.getLeafName() : null;
 
     AuthzAuditEvent auditEvent = auditHandler_.getAuthzEvents(result);
-    auditEvent.setAccessType(request.getAccessType().toUpperCase());
+    int policyType = result.getPolicyType();
+    if (policyType == RangerPolicy.POLICY_TYPE_DATAMASK && result.isMaskEnabled()) {
+      auditEvent.setAccessType(result.getMaskType().toLowerCase());
+    } else if (policyType == RangerPolicy.POLICY_TYPE_ROWFILTER) {
+      auditEvent.setAccessType(ACCESS_TYPE_ROWFILTER);
+    } else {
+      // TODO: Whether we should use lowercase or uppercase accessType?
+      auditEvent.setAccessType(request.getAccessType().toUpperCase());
+    }
     auditEvent.setRequestData(sqlStmt_);
     auditEvent.setClientIP(clientIp_);
     auditEvent.setClusterName(clusterName_);
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 68a8665..00e9f1c 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -206,7 +206,12 @@ public class BackendConfig {
     backendCfg_.setEnable_column_masking(columnMaskingEnabled);
   }
 
+  public void setRowFilteringEnabled(boolean rowFilteringEnabled) {
+    backendCfg_.setEnable_row_filtering(rowFilteringEnabled);
+  }
+
   public boolean isColumnMaskingEnabled() { return backendCfg_.enable_column_masking; }
+  public boolean isRowFilteringEnabled() { return backendCfg_.enable_row_filtering; }
 
   public boolean isCompactCatalogTopic() { return backendCfg_.compact_catalog_topic; }
 
diff --git a/fe/src/main/java/org/apache/impala/util/AuthorizationUtil.java b/fe/src/main/java/org/apache/impala/util/AuthorizationUtil.java
index 2e4846c..32ac4c1 100644
--- a/fe/src/main/java/org/apache/impala/util/AuthorizationUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/AuthorizationUtil.java
@@ -88,6 +88,10 @@ public class AuthorizationUtil {
       throw new InternalException(
           "Unable to instantiate authorization provider: " + authzFactoryClassName, e);
     }
+    if (!beCfg.isColumnMaskingEnabled() && beCfg.isRowFilteringEnabled()) {
+      throw new InternalException("Unable to enable row-filtering without column-masking."
+          + " Please set --enable_column_masking to true as well");
+    }
     final AuthorizationConfig authzConfig = authzFactory.getAuthorizationConfig();
 
     if (!authzConfig.isEnabled()) {
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
index 23c843b..db8032c 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
@@ -2824,13 +2824,15 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
   }
 
   /**
-   * Column Masking is disabled by default. Test the error messages.
+   * Test the error messages when Column Masking is disabled.
    */
   @Test
   public void testColumnMaskEnabled() throws ImpalaException {
     String policyName = "col_mask";
     for (String tableName: new String[]{"alltypes", "alltypes_view"}) {
       BackendConfig.INSTANCE.setColumnMaskingEnabled(false);
+      // Row filtering depends on column masking. So we should disable it as well.
+      BackendConfig.INSTANCE.setRowFilteringEnabled(false);
       String json = String.format("{\n" +
               "  \"name\": \"%s\",\n" +
               "  \"policyType\": 1,\n" +
@@ -2942,14 +2944,19 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
       } finally {
         deleteRangerPolicy(policyName);
         BackendConfig.INSTANCE.setColumnMaskingEnabled(true);
+        BackendConfig.INSTANCE.setRowFilteringEnabled(true);
       }
     }
   }
 
+  /**
+   * Test the error messages when Row Filtering is disabled.
+   */
   @Test
   public void testRowFilterEnabled() throws ImpalaException {
     String policyName = "row_filter";
     for (String tableName: new String[]{"alltypes", "alltypes_view"}) {
+      BackendConfig.INSTANCE.setRowFilteringEnabled(false);
       String json = String.format("{\n" +
           "  \"name\": \"%s\",\n" +
           "  \"policyType\": 2,\n" +
@@ -3048,6 +3055,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
                 onServer(TPrivilegeLevel.ALL));
       } finally {
         deleteRangerPolicy(policyName);
+        BackendConfig.INSTANCE.setRowFilteringEnabled(true);
       }
     }
   }
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
index 45db49b..b2948bb 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
@@ -648,8 +648,8 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
   }
 
   protected static String rowFilterError(String object) {
-    return "Impala does not support row filtering yet. Row filtering is enabled on " +
-        "table: " + object;
+    return "Row filtering is disabled by --enable_row_filtering flag. Can't access " +
+        "table " + object + " that has row filtering policy.";
   }
 
   protected ScalarFunction addFunction(String db, String fnName, List<Type> argTypes,
diff --git a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
index 06bba79..a2e52d3 100644
--- a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
@@ -488,6 +488,148 @@ public class RangerAuditLogTest extends AuthorizationTestBase {
     }
   }
 
+  @Test
+  public void testAuditsForRowFiltering() throws ImpalaException {
+    // Two row filter policies will be added. The first one affects 'user_' and keeps rows
+    // of "functional.alltypestiny" satisfied "id=0". The second one affects user
+    // "non_owner_2" and keeps rows of "functional.alltypes" satisfied "id=1".
+    String databaseName = "functional";
+    String tableNames[] = {"alltypestiny", "alltypes"};
+    String policyNames[] = {"tiny_filter", "all_filter"};
+    String users[] = {user_.getShortName(), "non_owner_2"};
+    String filters[] = {"id=0", "id=1"};
+
+    List<String> policies = new ArrayList<>();
+    for (int i = 0; i < filters.length; ++i) {
+      String json = String.format("{\n" +
+          "  \"name\": \"%s\",\n" +
+          "  \"policyType\": 2,\n" +
+          "  \"serviceType\": \"%s\",\n" +
+          "  \"service\": \"%s\",\n" +
+          "  \"resources\": {\n" +
+          "    \"database\": {\n" +
+          "      \"values\": [\"%s\"],\n" +
+          "      \"isExcludes\": false,\n" +
+          "      \"isRecursive\": false\n" +
+          "    },\n" +
+          "    \"table\": {\n" +
+          "      \"values\": [\"%s\"],\n" +
+          "      \"isExcludes\": false,\n" +
+          "      \"isRecursive\": false\n" +
+          "    }\n" +
+          "  },\n" +
+          "  \"rowFilterPolicyItems\": [\n" +
+          "    {\n" +
+          "      \"accesses\": [\n" +
+          "        {\n" +
+          "          \"type\": \"select\",\n" +
+          "          \"isAllowed\": true\n" +
+          "        }\n" +
+          "      ],\n" +
+          "      \"users\": [\"%s\"],\n" +
+          "      \"rowFilterInfo\": {\"filterExpr\": \"%s\"}\n" +
+          "    }\n" +
+          "  ]\n" +
+          "}", policyNames[i], RANGER_SERVICE_TYPE, RANGER_SERVICE_NAME, databaseName,
+          tableNames[i], users[i], filters[i]);
+      policies.add(json);
+    }
+    try {
+      for (int i = 0; i < filters.length; ++i) {
+        String policyName = policyNames[i];
+        String json = policies.get(i);
+        createRangerPolicy(policyName, json);
+      }
+
+      authzOk(events -> {
+        assertEquals(15, events.size());
+        assertEquals("select * from functional.alltypestiny",
+            events.get(0).getRequestData());
+        assertEventEquals("@table", "select", "functional/alltypestiny", 1,
+            events.get(0));
+        assertEventEquals("@column", "select", "functional/alltypestiny/id", 1,
+            events.get(1));
+        assertEventEquals("@column", "select", "functional/alltypestiny/bool_col", 1,
+            events.get(2));
+        assertEventEquals("@column", "select", "functional/alltypestiny/tinyint_col", 1,
+            events.get(3));
+        assertEventEquals("@column", "select", "functional/alltypestiny/smallint_col", 1,
+            events.get(4));
+        assertEventEquals("@column", "select", "functional/alltypestiny/int_col", 1,
+            events.get(5));
+        assertEventEquals("@column", "select", "functional/alltypestiny/bigint_col", 1,
+            events.get(6));
+        assertEventEquals("@column", "select", "functional/alltypestiny/float_col", 1,
+            events.get(7));
+        assertEventEquals("@column", "select", "functional/alltypestiny/double_col", 1,
+            events.get(8));
+        assertEventEquals("@column", "select",
+            "functional/alltypestiny/date_string_col", 1, events.get(9));
+        assertEventEquals("@column", "select", "functional/alltypestiny/string_col", 1,
+            events.get(10));
+        assertEventEquals("@column", "select", "functional/alltypestiny/timestamp_col", 1,
+            events.get(11));
+        assertEventEquals("@column", "select", "functional/alltypestiny/year", 1,
+            events.get(12));
+        assertEventEquals("@column", "select", "functional/alltypestiny/month", 1,
+            events.get(13));
+        assertEventEquals("@table", "row_filter", "functional/alltypestiny", 1,
+            events.get(14));
+      }, "select * from functional.alltypestiny", onTable("functional", "alltypestiny",
+          TPrivilegeLevel.SELECT));
+
+      authzOk(events -> {
+        assertEquals(14, events.size());
+        assertEquals("select * from functional.alltypes",
+            events.get(0).getRequestData());
+        assertEventEquals("@table", "select", "functional/alltypes", 1,
+            events.get(0));
+        assertEventEquals("@column", "select", "functional/alltypes/id", 1,
+            events.get(1));
+        assertEventEquals("@column", "select", "functional/alltypes/bool_col", 1,
+            events.get(2));
+        assertEventEquals("@column", "select", "functional/alltypes/tinyint_col", 1,
+            events.get(3));
+        assertEventEquals("@column", "select", "functional/alltypes/smallint_col", 1,
+            events.get(4));
+        assertEventEquals("@column", "select", "functional/alltypes/int_col", 1,
+            events.get(5));
+        assertEventEquals("@column", "select", "functional/alltypes/bigint_col", 1,
+            events.get(6));
+        assertEventEquals("@column", "select", "functional/alltypes/float_col", 1,
+            events.get(7));
+        assertEventEquals("@column", "select", "functional/alltypes/double_col", 1,
+            events.get(8));
+        assertEventEquals("@column", "select", "functional/alltypes/date_string_col", 1,
+            events.get(9));
+        assertEventEquals("@column", "select", "functional/alltypes/string_col", 1,
+            events.get(10));
+        assertEventEquals("@column", "select", "functional/alltypes/timestamp_col", 1,
+            events.get(11));
+        assertEventEquals("@column", "select", "functional/alltypes/year", 1,
+            events.get(12));
+        assertEventEquals("@column", "select", "functional/alltypes/month", 1,
+            events.get(13));
+      }, "select * from functional.alltypes", onTable("functional", "alltypes",
+          TPrivilegeLevel.SELECT));
+
+      // When fails with not enough privileges, no audit logs for row filtering is
+      // generated. Only the event of the first column (i.e. id) that failed the
+      // authorization will be logged.
+      authzError(events -> {assertEquals(1, events.size());
+        assertEquals("select * from functional.alltypestiny",
+            events.get(0).getRequestData());
+        assertEventEquals("@column", "select", "functional/alltypestiny/id", 0,
+            events.get(0));
+      },"select * from functional.alltypestiny", onTable("functional", "alltypestiny"));
+    } finally {
+      for (int i = 0; i < filters.length; ++i) {
+        String policyName = policyNames[i];
+        deleteRangerPolicy(policyName);
+      }
+    }
+  }
+
   private void authzOk(Consumer<List<AuthzAuditEvent>> resultChecker, String stmt,
       TPrivilege[]... privileges) throws ImpalaException {
     authorize(stmt).ok(privileges);
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 980dca4..d32baea 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -398,12 +398,23 @@ public class FrontendTestBase extends AbstractFrontendTest {
           }
 
           @Override
+          public boolean needsRowFiltering(User user, String dbName, String tableName) {
+            return false;
+          }
+
+          @Override
           public String createColumnMask(User user, String dbName, String tableName,
               String columnName, AuthorizationContext authzCtx) {
             return null;
           }
 
           @Override
+          public String createRowFilter(User user, String dbName, String tableName,
+              AuthorizationContext rangerCtx) {
+            return null;
+          }
+
+          @Override
           public void postAnalyze(AuthorizationContext authzCtx) {
           }
 
@@ -428,7 +439,7 @@ public class FrontendTestBase extends AbstractFrontendTest {
       }
 
       @Override
-      public boolean supportsColumnMasking() {
+      public boolean supportsTableMasking() {
         return false;
       }
     };
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
index 4c71a80..2d1aed6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
@@ -398,3 +398,9 @@ show create view $UNIQUE_DB.masked_view;
 ---- RESULTS
 'CREATE VIEW $UNIQUE_DB.masked_view AS\nSELECT id FROM functional.alltypestiny'
 ====
+---- QUERY
+select id, bigint_col from functional.alltypesagg order by id limit 10
+---- CATCH
+AnalysisException: Column-masking/Row-filtering expressions using subqueries are not
+ supported (IMPALA-10483). Table(s) in the subquery: [functional.alltypestiny]
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_and_row_filtering.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_and_row_filtering.test
new file mode 100644
index 0000000..f431ee4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_and_row_filtering.test
@@ -0,0 +1,34 @@
+====
+---- QUERY
+# Row-filtering policy keeps rows with "id % 3 = 0".
+# Column-masking policies mask "id" to "id + 100" and redact column "date_string_col".
+# Note that row filtering policies take effects prior to any column masking policies,
+# because column masking policies apply on result data.
+select id, bool_col, date_string_col, year, month from functional.alltypestiny
+---- RESULTS
+100,true,'nn/nn/nn',2009,1
+103,false,'nn/nn/nn',2009,2
+106,true,'nn/nn/nn',2009,4
+---- TYPES
+INT,BOOLEAN,STRING,INT,INT
+====
+---- QUERY
+# Column-masking policies of functional.alltypes mask "id" to "-id" and redact column
+# "date_string_col". Row-filtering policy of functional.alltypes_view keeps rows with
+# "id >= -8 and date_string_col = 'nn/nn/nn'". functional.alltypes_view is a view based
+# on table functional.alltypes, so column masking policies were applied to 'alltypes'
+# before the row-filter policies are applied to 'alltypes_view'.
+select id, bool_col, date_string_col, year, month from functional.alltypes_view
+---- RESULTS
+0,true,'nn/nn/nn',2009,1
+-1,false,'nn/nn/nn',2009,1
+-2,true,'nn/nn/nn',2009,1
+-3,false,'nn/nn/nn',2009,1
+-4,true,'nn/nn/nn',2009,1
+-5,false,'nn/nn/nn',2009,1
+-6,true,'nn/nn/nn',2009,1
+-7,false,'nn/nn/nn',2009,1
+-8,true,'nn/nn/nn',2009,1
+---- TYPES
+INT,BOOLEAN,STRING,INT,INT
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test
new file mode 100644
index 0000000..2f420a4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test
@@ -0,0 +1,302 @@
+====
+---- QUERY
+# Row-filtering policy keeps rows with "id % 2 = 0"
+select * from functional.alltypestiny
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Row-filtering policy keeps rows with
+# "(string_col = '0' and id <= 0) or (string_col = '1' and bool_col = true and id > 90)"
+select id, string_col, bool_col, year, month from functional.alltypessmall
+---- RESULTS
+0,'0',true,2009,1
+96,'1',true,2009,4
+---- TYPES
+INT,STRING,BOOLEAN,INT,INT
+====
+---- QUERY
+# Test joins on the above two tables.
+select t.id, t.string_col, t.bool_col, t.year, t.month
+from functional.alltypestiny t join functional.alltypessmall s using (id)
+---- RESULTS
+0,'0',true,2009,1
+---- TYPES
+INT,STRING,BOOLEAN,INT,INT
+====
+---- QUERY
+# Test joins on the above two tables.
+select t.id, t.string_col, t.bool_col, t.year, t.month, s.id
+from functional.alltypestiny t left join functional.alltypessmall s using (id)
+---- RESULTS
+0,'0',true,2009,1,0
+2,'0',true,2009,2,NULL
+4,'0',true,2009,3,NULL
+6,'0',true,2009,4,NULL
+---- TYPES
+INT,STRING,BOOLEAN,INT,INT,INT
+====
+---- QUERY
+# Test joins on the above two tables.
+select t.id, s.id, s.string_col, s.bool_col, s.year, s.month
+from functional.alltypestiny t right join functional.alltypessmall s using (id)
+---- RESULTS
+0,0,'0',true,2009,1
+NULL,96,'1',true,2009,4
+---- TYPES
+INT,INT,STRING,BOOLEAN,INT,INT
+====
+---- QUERY
+# Row-filtering policy keeps rows with "year = 2009 and month = 1". Test on aggregate.
+select count(*) from functional.alltypes
+---- RESULTS
+310
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test on local views. functional.alltypestiny has row filter "id % 2 = 0".
+with v as (select id, bool_col, string_col from functional.alltypestiny)
+select * from v
+---- RESULTS
+0,true,'0'
+2,true,'0'
+4,true,'0'
+6,true,'0'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test on local views. Correctly ignore masking on local view names so no row filters are
+# applied here.
+use functional;
+with alltypestiny as (select 1 as id)
+select * from alltypestiny
+---- RESULTS
+1
+====
+---- QUERY
+# Test on local views. Correctly ignore masking on local view names so row filter of
+# functional.alltypessmall won't be applied here, and row filter of alltypestiny is
+# correctly applied.
+use functional;
+with alltypessmall as (select 1 as id)
+select alltypessmall.id from alltypestiny join alltypessmall using (id)
+---- RESULTS
+====
+---- QUERY
+# Negative test for illegal row filter that references a non-existing column 'test_id',
+# which results in AnalysisException.
+select * from functional_parquet.alltypes limit 10
+---- CATCH
+AnalysisException: Could not resolve column/field reference: 'test_id'
+====
+---- QUERY
+# Negative test for illegal row filter "100 id = int_col" which has syntax error.
+select * from functional_parquet.alltypessmall limit 10
+---- CATCH
+ParseException: Syntax error in line 1
+====
+---- QUERY
+# Row-filtering policy on 'functional_parquet.alltypes' references a nonexisting column
+# 'test_id' which exists in 'functional.jointbl'. But it won't be resolved as corelated
+# reference and will still hit AnalysisException.
+select * from functional.jointbl
+where exists(select * from functional_parquet.alltypes);
+---- CATCH
+AnalysisException: Could not resolve column/field reference: 'test_id'
+====
+---- QUERY
+# Row-filtering policy on 'functional_parquet.alltypes' references a nonexisting column
+# 'test_id' which exists in 'functional.jointbl'. But it won't be resolved as corelated
+# reference and will still hit AnalysisException.
+select * from functional.jointbl, functional_parquet.alltypes limit 10
+---- CATCH
+AnalysisException: Could not resolve column/field reference: 'test_id'
+====
+---- QUERY
+# Row-filtering policy on the view keeps rows with "id < 5". Row-filtering policy on the
+# underlying table 'alltypes' keeps rows with "year = 2009 and month = 1".
+select id, bool_col, int_col, string_col, date_string_col, year, month
+from functional.alltypes_view where id % 2 = 0
+---- RESULTS
+0,true,0,'0','01/01/09',2009,1
+2,true,2,'2','01/01/09',2009,1
+4,true,4,'4','01/01/09',2009,1
+---- TYPES
+INT,BOOLEAN,INT,STRING,STRING,INT,INT
+====
+---- QUERY
+# The query has no results since the where-clause is the opposite of the row-filter expr.
+select * from functional.alltypes_view where id >= 5
+---- RESULTS
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Test local view using views. Row filter "id < 5" is applied on alltypes_view. Row filter
+# "year = 2009 and month = 1" is applied on alltypes inside alltypes_view.
+with v as (
+  select id, bool_col, int_col, string_col, date_string_col, year, month
+  from functional.alltypes_view where id % 2 = 0
+) select * from v where id != 0
+---- RESULTS
+2,true,2,'2','01/01/09',2009,1
+4,true,4,'4','01/01/09',2009,1
+---- TYPES
+INT,BOOLEAN,INT,STRING,STRING,INT,INT
+====
+---- QUERY
+# Test on WITH clause and views. functional.alltypestiny has row filter "id % 2 = 0".
+# functional.alltypessmall has row filter
+# "(string_col = '0' and id <= 0) or (string_col = '1' and bool_col = true and id > 90)".
+# functional.alltypes_view has row filter "id < 5".
+# functional.alltypes used in alltypes_view has row filter "year = 2009 and month = 1".
+with v1 as (select id, bool_col, string_col from functional.alltypestiny),
+     v2 as (select id, bool_col, string_col from functional.alltypessmall)
+select v.id, v.bool_col, v.string_col from v1, v2, functional.alltypes_view v
+where v1.id = v2.id and v2.id = v.id
+---- RESULTS
+0,true,'0'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test on CTAS
+create table $UNIQUE_DB.masked_tbl as select * from functional.alltypestiny;
+select * from $UNIQUE_DB.masked_tbl;
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Test on SELECT used in INSERT statement
+create table $UNIQUE_DB.masked_tbl2 like functional.alltypestiny stored as textfile;
+insert into $UNIQUE_DB.masked_tbl2 partition(year, month) select * from functional.alltypestiny;
+select * from $UNIQUE_DB.masked_tbl2;
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Test on CreateView. Should not add row filters when used in sql generations.
+create view $UNIQUE_DB.masked_view as select * from functional.alltypestiny;
+show create view $UNIQUE_DB.masked_view;
+---- RESULTS
+'CREATE VIEW $UNIQUE_DB.masked_view AS\nSELECT * FROM functional.alltypestiny'
+====
+---- QUERY
+# The row filter on underlying table 'functional.alltypestiny' still take place.
+select * from $UNIQUE_DB.masked_view
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Test on AlterView. Should not add row filters when used in sql generations.
+alter view $UNIQUE_DB.masked_view as select id from functional.alltypestiny;
+show create view $UNIQUE_DB.masked_view;
+---- RESULTS
+'CREATE VIEW $UNIQUE_DB.masked_view AS\nSELECT id FROM functional.alltypestiny'
+====
+---- QUERY
+# The row filter on underlying table 'functional.alltypestiny' still take place.
+select * from $UNIQUE_DB.masked_view
+---- RESULTS
+0
+2
+4
+6
+---- TYPES
+INT
+====
+---- QUERY
+# TODO(IMPALA-10483): support using subquery on the same table as the row-filter expression
+select count(*) from functional.alltypesagg
+---- RESULTS
+---- CATCH
+AnalysisException: Column-masking/Row-filtering expressions using subqueries are not
+ supported (IMPALA-10483). Table(s) in the subquery: [functional.alltypesagg]
+====
+---- QUERY
+# TODO(IMPALA-10483): support using subquery on other tables as the row-filter expression
+select count(*) from functional_parquet.alltypesagg
+---- CATCH
+AnalysisException: Column-masking/Row-filtering expressions using subqueries are not
+ supported (IMPALA-10483). Table(s) in the subquery: [functional.alltypestiny]
+====
+---- QUERY
+# Row-filtering policy keeps rows with "nested_struct.a is not NULL"
+select id, nested_struct.* from functional_parquet.complextypestbl
+---- RESULTS
+1,1
+7,7
+8,-1
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# Row-filtering policy keeps rows with "nested_struct.a is not NULL"
+select id, nested_struct.a, arr.item
+from functional_parquet.complextypestbl t, t.nested_struct.b arr
+---- RESULTS
+1,1,1
+7,7,2
+7,7,3
+7,7,NULL
+8,-1,-1
+---- TYPES
+BIGINT,INT,INT
+====
+---- QUERY
+# Row-filtering policy keeps rows with "nested_struct.a is not NULL"
+select id, b.item from functional_parquet.complextypestbl t, t.nested_struct.b
+---- RESULTS
+1,1
+7,2
+7,3
+7,NULL
+8,-1
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# TODO: Row-filtering policy should keep rows with "nested_struct.a is not NULL" on base
+# table 'complextypestbl'. But now we can't apply it since the collection column 'b' is
+# non-relative. Fails these queries until IMPALA-10484 is resolved. See the next query
+# for desired results.
+select * from functional_parquet.complextypestbl.nested_struct.b
+---- CATCH
+AnalysisException: Using non-relative collection column nested_struct.b of table
+ functional_parquet.complextypestbl is not supported since there are row-filtering
+ policies on this table (IMPALA-10484). Rewrite query to use relative reference.
+====
+---- QUERY
+# The above query should be manually rewritten to this until IMPALA-10484 is resolved.
+select b.item from functional_parquet.complextypestbl t, t.nested_struct.b
+---- RESULTS
+1
+2
+3
+NULL
+-1
+---- TYPES
+INT
+====
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 4df2b39..1411494 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -712,7 +712,62 @@ class TestRanger(CustomClusterTestSuite):
     return json.loads(r.content)["id"]
 
   @staticmethod
-  def _remove_column_masking_policy(policy_name):
+  def _add_row_filtering_policy(policy_name, user, db, table, filter_expr):
+    """Adds a row filtering policy and returns the policy id"""
+    TestRanger._add_multiuser_row_filtering_policy(policy_name, db, table, [user],
+                                                   [filter_expr])
+
+  @staticmethod
+  def _add_multiuser_row_filtering_policy(policy_name, db, table, users, filters):
+    """Adds a row filtering policy on 'db'.'table' and returns the policy id.
+    users[0] has filters[0], users[1] has filters[1] and so on."""
+    assert len(users) > 0
+    assert len(users) == len(filters)
+    items = []
+    for i in range(len(users)):
+      items.append({
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": True
+            }
+          ],
+          "users": [users[i]],
+          "rowFilterInfo": {"filterExpr": filters[i]}
+        })
+    TestRanger._add_row_filtering_policy_with_items(policy_name, db, table, items)
+
+  @staticmethod
+  def _add_row_filtering_policy_with_items(policy_name, db, table, items):
+    """ Adds a row filtering policy and returns the policy id"""
+    policy_data = {
+      "name": policy_name,
+      "policyType": 2,
+      "serviceType": "hive",
+      "service": "test_impala",
+      "resources": {
+        "database": {
+          "values": [db],
+          "isExcludes": False,
+          "isRecursive": False
+        },
+        "table": {
+          "values": [table],
+          "isExcludes": False,
+          "isRecursive": False
+        }
+      },
+      "rowFilterPolicyItems": items
+    }
+    r = requests.post("{0}/service/public/v2/api/policy".format(RANGER_HOST),
+                      auth=RANGER_AUTH, json=policy_data, headers=REST_HEADERS)
+    assert 300 > r.status_code >= 200, r.content
+    LOG.info("Added row filtering policy on table {0}.{1} for using items {2}"
+             .format(db, table, items))
+    return json.loads(r.content)["id"]
+
+  @staticmethod
+  def _remove_policy(policy_name):
     r = requests.delete(
         "{0}/service/public/v2/api/policy?servicename=test_impala&policyname={1}".format(
             RANGER_HOST, policy_name),
@@ -968,6 +1023,11 @@ class TestRanger(CustomClusterTestSuite):
         unique_name + str(policy_cnt), user, "functional", "alltypes", "string_col",
         "CUSTOM", "concat({col}, 'ttt')")
       policy_cnt += 1
+      # Add policy to mask "bigint_col" using a subquery. It will hit IMPALA-10483.
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypesagg", "bigint_col",
+        "CUSTOM", "(select count(*) from functional.alltypestiny)")
+      policy_cnt += 1
       self.execute_query_expect_success(admin_client, "refresh authorization",
                                         user=ADMIN)
       self.run_test_case("QueryTest/ranger_column_masking", vector,
@@ -996,7 +1056,7 @@ class TestRanger(CustomClusterTestSuite):
                            % (unique_database, user))
       admin_client.execute("drop database %s cascade" % unique_database)
       for i in range(policy_cnt):
-        TestRanger._remove_column_masking_policy(unique_name + str(i))
+        TestRanger._remove_policy(unique_name + str(i))
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -1038,10 +1098,160 @@ class TestRanger(CustomClusterTestSuite):
                                           user=ADMIN)
         self.run_test_case("QueryTest/ranger_alltypes_" + mask_type.lower(), vector)
         while policy_names:
-          TestRanger._remove_column_masking_policy(policy_names.pop())
+          TestRanger._remove_policy(policy_names.pop())
     finally:
       while policy_names:
-        TestRanger._remove_column_masking_policy(policy_names.pop())
+        TestRanger._remove_policy(policy_names.pop())
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def test_row_filtering(self, vector, unique_name):
+    user = getuser()
+    unique_database = unique_name + '_db'
+    # Create another client for admin user since current user doesn't have privileges to
+    # create/drop databases or refresh authorization.
+    admin_client = self.create_impala_client()
+    admin_client.execute("drop database if exists %s cascade" % unique_database,
+                         user=ADMIN)
+    admin_client.execute("create database %s" % unique_database, user=ADMIN)
+    # Grant CREATE on database to current user for tests on CTAS, CreateView etc.
+    # Note that 'user' is the owner of the test tables. No additional GRANTs are required.
+    admin_client.execute("grant create on database %s to user %s"
+                         % (unique_database, user))
+    policy_cnt = 0
+    try:
+      #######################################################
+      # Test row filters on current user
+      #######################################################
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional", "alltypestiny", "id % 2 = 0")
+      policy_cnt += 1
+      # Add a filter using builtin functions
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional", "alltypessmall",
+          """(string_col = concat('0', '') and id <= 0) or
+             (string_col = '1' and bool_col = true and id > 90)""")
+      policy_cnt += 1
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional", "alltypes",
+          "year = 2009 and month = 1")
+      policy_cnt += 1
+      # Add a row-filtering policy using a nonexisting column 'test_id'. Queries in this
+      # table will fail in resolving the column.
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional_parquet", "alltypes",
+          "test_id = id")
+      policy_cnt += 1
+      # Add an illegal row filter that could cause parsing error.
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional_parquet", "alltypessmall",
+          "100 id = int_col")
+      policy_cnt += 1
+      # Add a row-filtering policy on a view. 'alltypes_view' is a view on table
+      # 'alltypes' which also has a row-filtering policy. They will both be performed.
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional", "alltypes_view",
+          "id < 5")
+      policy_cnt += 1
+      # Row-filtering expr using subquery on current table.
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional", "alltypesagg",
+          "id = (select min(id) from functional.alltypesagg)")
+      policy_cnt += 1
+      # Row-filtering expr using subquery on other tables.
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional_parquet", "alltypesagg",
+          "id in (select id from functional.alltypestiny)")
+      policy_cnt += 1
+      # Row-filtering expr on nested types
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional_parquet", "complextypestbl",
+          "nested_struct.a is not NULL")
+      policy_cnt += 1
+      admin_client.execute("refresh authorization")
+      self.run_test_case("QueryTest/ranger_row_filtering", vector,
+                         test_file_vars={'$UNIQUE_DB': unique_database})
+
+      #######################################################
+      # Test row filter policy on multiple users
+      #######################################################
+      TestRanger._add_multiuser_row_filtering_policy(
+          unique_name + str(policy_cnt), "functional_parquet", "alltypestiny",
+          [user, "non_owner", "non_owner_2"],
+          ["id=0", "id=1", "id=2"])
+      policy_cnt += 1
+      admin_client.execute(
+          "grant select on table functional_parquet.alltypestiny to user non_owner")
+      admin_client.execute(
+          "grant select on table functional_parquet.alltypestiny to user non_owner_2")
+      admin_client.execute("refresh authorization")
+      non_owner_client = self.create_impala_client()
+      non_owner_2_client = self.create_impala_client()
+      query = "select id from functional_parquet.alltypestiny"
+      assert self.client.execute(query).get_data() == "0"
+      assert non_owner_client.execute(query, user="non_owner").get_data() == "1"
+      assert non_owner_2_client.execute(query, user="non_owner_2").get_data() == "2"
+      query = "select max(id) from functional_parquet.alltypestiny"
+      assert self.client.execute(query).get_data() == "0"
+      assert non_owner_client.execute(query, user="non_owner").get_data() == "1"
+      assert non_owner_2_client.execute(query, user="non_owner_2").get_data() == "2"
+    finally:
+      for i in range(policy_cnt):
+        TestRanger._remove_policy(unique_name + str(i))
+      cleanup_statements = [
+        "revoke select on table functional_parquet.alltypestiny from user non_owner",
+        "revoke select on table functional_parquet.alltypestiny from user non_owner_2"
+      ]
+      for statement in cleanup_statements:
+        admin_client.execute(statement, user=ADMIN)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def test_column_masking_and_row_filtering(self, vector, unique_name):
+    user = getuser()
+    admin_client = self.create_impala_client()
+    policy_cnt = 0
+    try:
+      # 2 column masking policies and 1 row filtering policy on functional.alltypestiny.
+      # The row filtering policy will take effect first, then the column masking policies
+      # mask the final results.
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypestiny", "id",
+        "CUSTOM", "id + 100")   # use column name 'id' directly
+      policy_cnt += 1
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypestiny",
+        "date_string_col", "MASK")
+      policy_cnt += 1
+      TestRanger._add_row_filtering_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypestiny", "id % 3 = 0")
+      policy_cnt += 1
+      # 2 column masking policies on functional.alltypes and 1 row filtering policy on
+      # functional.alltypesview which is a view on functional.alltypes. The column masking
+      # policies on functional.alltypes will take effect first, which affects the results
+      # of functional.alltypesview. Then the row filtering policy of the view filters
+      # out rows of functional.alltypesview.
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypes", "id",
+        "CUSTOM", "-id")   # use column name 'id' directly
+      policy_cnt += 1
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypes",
+        "date_string_col", "MASK")
+      policy_cnt += 1
+      TestRanger._add_row_filtering_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypes_view",
+        "id >= -8 and date_string_col = 'nn/nn/nn'")
+      policy_cnt += 1
+
+      self.execute_query_expect_success(admin_client, "refresh authorization",
+                                        user=ADMIN)
+      self.run_test_case("QueryTest/ranger_column_masking_and_row_filtering", vector)
+    finally:
+      for i in range(policy_cnt):
+        TestRanger._remove_policy(unique_name + str(i))
 
   @pytest.mark.execute_serially
   @SkipIfABFS.hive
@@ -1066,7 +1276,7 @@ class TestRanger(CustomClusterTestSuite):
       self.run_test_case("QueryTest/hive_ranger_integration", vector)
     finally:
       check_call([script])
-      TestRanger._remove_column_masking_policy("col_mask_for_hive")
+      TestRanger._remove_policy("col_mask_for_hive")
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -1266,4 +1476,4 @@ class TestRangerColumnMaskingTpchNested(CustomClusterTestSuite):
       for tbl in tbl_cols:
         for col in tbl_cols[tbl]:
           policy_name = "%s_%s_mask" % (tbl, col)
-          TestRanger._remove_column_masking_policy(policy_name)
+          TestRanger._remove_policy(policy_name)

[impala] 01/02: IMPALA-10512: ALTER TABLE ADD PARTITION should bump the write id for ACID tables

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 61623438428deebdeb73225f664242d5a4aeba46
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Thu Feb 18 13:51:32 2021 +0100

    IMPALA-10512: ALTER TABLE ADD PARTITION should bump the write id for ACID tables
    
    ALTER TABLE ADD PARTITION should bump the write id for ACID tables.
    Both for INSERT-only and full ACID tables.
    
    For transational tables we are adding partitions in an ACID
    transaction in the following sequence:
    
    1. open transaction
    2. allocate write id for table
    3. add partitions to HMS table
    4. commit transaction
    
    However, please note that table metadata modifications are
    independent of ACID transactions. I.e. if add partitions succeed,
    but we cannot commit the transaction, then we the newly added
    partitions won't get removed.
    
    So why are we opening a txn then? We are doing it in order to bump
    the write id in a best-effort way. This aids table metadata caching,
    so by looking at the table write id we can determine if the cached
    table metadata is up-to-date.
    
    Testing:
     * added e2e test
    
    Change-Id: Iad247008b7c206db00516326c1447bd00a9b34bd
    Reviewed-on: http://gerrit.cloudera.org:8080/17081
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/catalog/Catalog.java    |  7 +-
 .../impala/catalog/CatalogServiceCatalog.java      |  5 ++
 .../org/apache/impala/catalog/ImpaladCatalog.java  |  7 ++
 .../org/apache/impala/catalog/Transaction.java     | 19 +++++-
 .../apache/impala/service/CatalogOpExecutor.java   | 77 ++++++++++++++++------
 .../queries/QueryTest/full-acid-rowid.test         | 32 ++++-----
 tests/query_test/test_acid.py                      | 27 ++++++++
 7 files changed, 133 insertions(+), 41 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index d232f0a..c662c78 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -125,6 +125,11 @@ public abstract class Catalog implements AutoCloseable {
   }
 
   /**
+   * Returns the Hive ACID user id used by this catalog.
+   */
+  public abstract String getAcidUserId();
+
+  /**
    * Adds a new database to the catalog, replacing any existing database with the same
    * name.
    */
@@ -684,7 +689,7 @@ public abstract class Catalog implements AutoCloseable {
    */
   public Transaction openTransaction(IMetaStoreClient hmsClient, HeartbeatContext ctx)
       throws TransactionException {
-    return new Transaction(hmsClient, transactionKeepalive_, "Impala Catalog", ctx);
+    return new Transaction(hmsClient, transactionKeepalive_, getAcidUserId(), ctx);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 704bac4..0f7d8f1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3333,6 +3333,11 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  @Override
+  public String getAcidUserId() {
+    return String.format("CatalogD %s", getCatalogServiceId());
+  }
+
   /**
    * Gets the id for this catalog service
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index d1452a1..edbfee2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -51,6 +51,7 @@ import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.TByteBuffer;
+import org.apache.impala.util.TUniqueIdUtil;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.slf4j.Logger;
@@ -655,5 +656,11 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
   }
 
   @Override
+  public String getAcidUserId() {
+    return String.format("Impala Catalog %s",
+        TUniqueIdUtil.PrintId(getCatalogServiceId()));
+  }
+
+  @Override
   public TUniqueId getCatalogServiceId() { return catalogServiceId_; }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Transaction.java b/fe/src/main/java/org/apache/impala/catalog/Transaction.java
index e773f34..f4af599 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Transaction.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Transaction.java
@@ -46,24 +46,37 @@ public class Transaction implements AutoCloseable {
     hmsClient_ = hmsClient;
     keepalive_ = keepalive;
     transactionId_ = MetastoreShim.openTransaction(hmsClient_);
-    LOG.info("Opened transaction: " + String.valueOf(transactionId_));
+    LOG.info(String.format("Opened transaction %d by user '%s' ", transactionId_, user));
     keepalive_.addTransaction(transactionId_, ctx);
   }
 
+  /**
+   * Constructor for short-running transactions that we don't want to heartbeat.
+   */
+  public Transaction(IMetaStoreClient hmsClient, String user, String context)
+      throws TransactionException {
+    Preconditions.checkNotNull(hmsClient);
+    hmsClient_ = hmsClient;
+    transactionId_ = MetastoreShim.openTransaction(hmsClient_);
+    LOG.info(String.format("Opened transaction %d by user '%s' in context: %s",
+        transactionId_, user, context));
+  }
+
   public long getId() { return transactionId_; }
 
   public void commit() throws TransactionException {
     Preconditions.checkState(transactionId_ > 0);
-    keepalive_.deleteTransaction(transactionId_);
+    if (keepalive_ != null) keepalive_.deleteTransaction(transactionId_);
     MetastoreShim.commitTransaction(hmsClient_, transactionId_);
     transactionId_ = -1;
   }
 
   @Override
   public void close() {
+    // Return early if transaction was committed successfully.
     if (transactionId_ <= 0) return;
 
-    keepalive_.deleteTransaction(transactionId_);
+    if (keepalive_ != null) keepalive_.deleteTransaction(transactionId_);
     try {
       MetastoreShim.abortTransaction(hmsClient_, transactionId_);
     } catch (TransactionException e) {
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 0de5c5d..2a89b77 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -79,6 +79,7 @@ import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.authorization.AuthorizationDelta;
 import org.apache.impala.authorization.AuthorizationManager;
 import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogObject;
 import org.apache.impala.catalog.CatalogServiceCatalog;
@@ -1036,20 +1037,18 @@ public class CatalogOpExecutor {
     result.setVersion(updatedCatalogObject.getCatalog_version());
   }
 
-  private Table addHdfsPartitions(Table tbl, List<Partition> partitions)
-      throws CatalogException {
+  private Table addHdfsPartitions(MetaStoreClient msClient, Table tbl,
+      List<Partition> partitions) throws CatalogException {
     Preconditions.checkNotNull(tbl);
     Preconditions.checkNotNull(partitions);
     if (!(tbl instanceof HdfsTable)) {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table");
     }
     HdfsTable hdfsTable = (HdfsTable) tbl;
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      List<HdfsPartition> hdfsPartitions = hdfsTable.createAndLoadPartitions(
-          msClient.getHiveClient(), partitions);
-      for (HdfsPartition hdfsPartition : hdfsPartitions) {
-        catalog_.addPartition(hdfsPartition);
-      }
+    List<HdfsPartition> hdfsPartitions = hdfsTable.createAndLoadPartitions(
+        msClient.getHiveClient(), partitions);
+    for (HdfsPartition hdfsPartition : hdfsPartitions) {
+      catalog_.addPartition(hdfsPartition);
     }
     return hdfsTable;
   }
@@ -3143,18 +3142,9 @@ public class CatalogOpExecutor {
     if (allHmsPartitionsToAdd.isEmpty()) return null;
 
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      List<Partition> addedHmsPartitions = Lists.newArrayList();
+      List<Partition> addedHmsPartitions = addHmsPartitionsInTransaction(msClient,
+          tbl, allHmsPartitionsToAdd, ifNotExists);
 
-      for (List<Partition> hmsSublist :
-          Lists.partition(allHmsPartitionsToAdd, MAX_PARTITION_UPDATES_PER_RPC)) {
-        try {
-          addedHmsPartitions.addAll(msClient.getHiveClient().add_partitions(hmsSublist,
-              ifNotExists, true));
-        } catch (TException e) {
-          throw new ImpalaRuntimeException(
-              String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e);
-        }
-      }
       // Handle HDFS cache. This is done in a separate round bacause we have to apply
       // caching only to newly added partitions.
       alterTableCachePartitions(msTbl, msClient, tableName, addedHmsPartitions,
@@ -3169,12 +3159,57 @@ public class CatalogOpExecutor {
         addedHmsPartitions.addAll(
             getPartitionsFromHms(msTbl, msClient, tableName, difference));
       }
-      addHdfsPartitions(tbl, addedHmsPartitions);
+      addHdfsPartitions(msClient, tbl, addedHmsPartitions);
     }
     return tbl;
   }
 
   /**
+   * Adds partitions in 'allHmsPartitionsToAdd' in batches via 'msClient'.
+   * Returns the created partitions.
+   */
+  List<Partition> addHmsPartitions(MetaStoreClient msClient,
+      List<Partition> allHmsPartitionsToAdd, boolean ifNotExists)
+      throws ImpalaRuntimeException {
+    List<Partition> addedHmsPartitions = Lists.newArrayList();
+    for (List<Partition> hmsSublist : Lists.partition(
+        allHmsPartitionsToAdd, MAX_PARTITION_UPDATES_PER_RPC)) {
+      try {
+        addedHmsPartitions.addAll(msClient.getHiveClient().add_partitions(hmsSublist,
+            ifNotExists, true));
+      } catch (TException e) {
+        throw new ImpalaRuntimeException(
+            String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e);
+      }
+    }
+    return addedHmsPartitions;
+  }
+
+  /**
+   * Invokes addHmsPartitions() in transaction for transactional tables. For
+   * non-transactional tables it just simply invokes addHmsPartitions().
+   * Please note that once addHmsPartitions() succeeded, then even if the transaction
+   * fails, the HMS table modification won't be reverted.
+   * Returns the list of the newly added partitions.
+   */
+  List<Partition> addHmsPartitionsInTransaction(MetaStoreClient msClient, Table tbl,
+      List<Partition> partitions, boolean ifNotExists) throws ImpalaException {
+    if (!AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) {
+      return addHmsPartitions(msClient, partitions, ifNotExists);
+    }
+    try (Transaction txn = new Transaction(
+        msClient.getHiveClient(),
+        catalog_.getAcidUserId(),
+        String.format("ADD PARTITION for %s", tbl.getFullName()))) {
+      MetastoreShim.allocateTableWriteId(msClient.getHiveClient(), txn.getId(),
+          tbl.getDb().getName(), tbl.getName());
+      List<Partition> ret = addHmsPartitions(msClient, partitions, ifNotExists);
+      txn.commit();
+      return ret;
+    }
+  }
+
+  /**
    * Returns the list of Partition objects from 'aList' that cannot be found in 'bList'.
    * Partition objects are distinguished by partition values only.
    */
@@ -3901,7 +3936,7 @@ public class CatalogOpExecutor {
         // ifNotExists and needResults are true.
         List<Partition> hmsAddedPartitions =
             msClient.getHiveClient().add_partitions(hmsSublist, true, true);
-        addHdfsPartitions(tbl, hmsAddedPartitions);
+        addHdfsPartitions(msClient, tbl, hmsAddedPartitions);
         // Handle HDFS cache.
         if (cachePoolName != null) {
           for (Partition partition: hmsAddedPartitions) {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test b/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test
index a0654fc..025591b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test
@@ -20,14 +20,14 @@ select row__id.*, * from functional_orc_def.alltypestiny;
 ---- LABELS
 OPERATION, ORIGINALTRANSACTION, BUCKET, ROWID, CURRENTTRANSACTION, ID, BOOL_COL, TINYINT_COL, SMALLINT_COL, INT_COL, BIGINT_COL, FLOAT_COL, DOUBLE_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, MONTH
 ---- RESULTS
-0,1,536870912,0,1,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
-0,1,536870912,1,1,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
-0,1,536870912,0,1,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
-0,1,536870912,1,1,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
-0,1,536870912,0,1,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
-0,1,536870912,1,1,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
-0,1,536870912,0,1,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
-0,1,536870912,1,1,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+0,5,536870912,0,5,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+0,5,536870912,1,5,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+0,5,536870912,0,5,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+0,5,536870912,1,5,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+0,5,536870912,0,5,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+0,5,536870912,1,5,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+0,5,536870912,0,5,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+0,5,536870912,1,5,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
 ---- TYPES
 INT, BIGINT, INT, BIGINT, BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
 ====
@@ -37,14 +37,14 @@ from functional_orc_def.alltypestiny;
 ---- LABELS
 ROW__ID.OPERATION, ROW__ID.ROWID, ROW__ID.ORIGINALTRANSACTION, ID, BOOL_COL, TINYINT_COL, SMALLINT_COL, INT_COL, BIGINT_COL, FLOAT_COL, DOUBLE_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, MONTH
 ---- RESULTS
-0,0,1,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
-0,1,1,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
-0,0,1,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
-0,1,1,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
-0,0,1,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
-0,1,1,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
-0,0,1,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
-0,1,1,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+0,0,5,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+0,1,5,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+0,0,5,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+0,1,5,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+0,0,5,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+0,1,5,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+0,0,5,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+0,1,5,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
 ---- TYPES
 INT, BIGINT, BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
 ====
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index dee59ee..b8ea970 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -27,6 +27,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import (SkipIf, SkipIfHive2, SkipIfCatalogV2, SkipIfS3, SkipIfABFS,
                                SkipIfADLS, SkipIfIsilon, SkipIfGCS, SkipIfLocal)
 from tests.common.test_dimensions import create_single_exec_option_dimension
+from tests.util.acid_txn import AcidTxn
 
 
 class TestAcid(ImpalaTestSuite):
@@ -344,3 +345,29 @@ class TestAcid(ImpalaTestSuite):
     self.execute_query("refresh {}".format(fq_table_name))
     result = self.execute_query("select count(*) from {0}".format(fq_table_name))
     assert "3" in result.data
+
+  def test_add_partition_write_id(self, vector, unique_database):
+    """Test that ALTER TABLE ADD PARTITION increases the write id of the table."""
+    # Test INSERT-only table
+    io_tbl_name = "insert_only_table"
+    self.client.execute("""CREATE TABLE {0}.{1} (i int) PARTITIONED BY (p int)
+        TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only')
+        """.format(unique_database, io_tbl_name))
+    self._check_add_partition_write_id_change(unique_database, io_tbl_name)
+
+    # Test Full ACID table
+    full_acid_name = "full_acid_table"
+    self.client.execute("""CREATE TABLE {0}.{1} (i int) PARTITIONED BY (p int)
+        STORED AS ORC TBLPROPERTIES('transactional'='true')
+        """.format(unique_database, full_acid_name))
+    self._check_add_partition_write_id_change(unique_database, full_acid_name)
+
+  def _check_add_partition_write_id_change(self, db_name, tbl_name):
+    acid_util = AcidTxn(self.hive_client)
+    valid_write_ids = acid_util.get_valid_write_ids(db_name, tbl_name)
+    orig_write_id = valid_write_ids.tblValidWriteIds[0].writeIdHighWaterMark
+    self.client.execute("""alter table {0}.{1} add partition (p=1)
+        """.format(db_name, tbl_name))
+    valid_write_ids = acid_util.get_valid_write_ids(db_name, tbl_name)
+    new_write_id = valid_write_ids.tblValidWriteIds[0].writeIdHighWaterMark
+    assert new_write_id > orig_write_id