You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2020/01/15 14:29:33 UTC

[impala] 04/04: IMPALA-9009: Core support for Ranger column masking

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d66610837e53965cb969b78116aec58164bb8548
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Thu Dec 12 12:30:37 2019 +0800

    IMPALA-9009: Core support for Ranger column masking
    
    Ranger provides column masking policies about how to show masked values
    to specific users when reading specific columns. This patch adds support
    to rewrite the query AST based on column masking policies.
    
    We perform the column masking policies by replacing the TableRef with a
    subquery doing the masking. For instance, the following query
      select c_id, c_name from customer c join orders on c_id = o_cid
    will be transfomed into
      select c_id, c_name  from (
        select mask1(c_id) as c_id, mask2(c_name) as c_name from customer
      ) c
      join orders
      on c_id = o_cid
    
    The transfomation is done in AST resolution. Just like view resolution,
    if the table needs masking we replace it with a subquery(InlineViewRef)
    containing the masking expressions.
    
    This patch only adds support for mask types that don't require builtin
    mask functions. So currently supported masking types are MASK_NULL and
    CUSTOM.
    
    Current Limitations:
     - Users are required to have privileges on all columns of a masked
       table(IMPALA-9223), since the table mask subquery contains all the
       columns.
    
    Tests:
     - Add e2e tests for masked results
     - Run core tests
    
    Change-Id: I4cad60e0e69ea573b7ecfc011b142c46ef52ed61
    Reviewed-on: http://gerrit.cloudera.org:8080/14894
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/AlterViewStmt.java  |   2 +
 .../apache/impala/analysis/AnalysisContext.java    |   1 +
 .../java/org/apache/impala/analysis/Analyzer.java  |  46 ++-
 .../org/apache/impala/analysis/CreateViewStmt.java |   2 +
 .../org/apache/impala/analysis/FromClause.java     |  81 ++++-
 .../org/apache/impala/analysis/InlineViewRef.java  |  59 +++
 .../java/org/apache/impala/analysis/QueryStmt.java |   7 +
 .../org/apache/impala/analysis/SelectStmt.java     |   5 +
 .../java/org/apache/impala/analysis/TableRef.java  |  19 +
 .../java/org/apache/impala/analysis/UnionStmt.java |   7 +
 .../impala/authorization/AuthorizationChecker.java |  14 +
 .../impala/authorization/AuthorizationFactory.java |  11 +-
 .../authorization/BaseAuthorizationChecker.java    |   6 +-
 .../authorization/NoopAuthorizationFactory.java    |  17 +
 .../org/apache/impala/authorization/TableMask.java |  88 +++++
 .../ranger/RangerAuthorizationChecker.java         |  84 +++--
 .../ranger/RangerAuthorizationFactory.java         |   5 +
 .../sentry/SentryAuthorizationChecker.java         |  12 +
 .../sentry/SentryAuthorizationFactory.java         |   5 +
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |   2 +-
 .../authorization/AuthorizationStmtTest.java       | 123 -------
 .../authorization/ranger/RangerAuditLogTest.java   |  13 +-
 .../org/apache/impala/common/FrontendTestBase.java |  17 +
 .../queries/QueryTest/ranger_column_masking.test   | 400 +++++++++++++++++++++
 tests/authorization/test_ranger.py                 | 128 ++++++-
 tests/common/impala_test_suite.py                  |   2 +
 26 files changed, 968 insertions(+), 188 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterViewStmt.java
index f89dba7..f241f1d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterViewStmt.java
@@ -45,6 +45,8 @@ public class AlterViewStmt extends CreateOrAlterViewStmtBase {
   public void analyze(Analyzer analyzer) throws AnalysisException {
     // Enforce Hive column labels for view compatibility.
     analyzer.setUseHiveColLabels(true);
+    // Disable table masking since we don't actually read the data
+    viewDefStmt_.setDoTableMasking(false);
     viewDefStmt_.analyze(analyzer);
 
     Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index abce01a..836a2da 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -475,6 +475,7 @@ public class AnalysisContext {
     if (analysisResult_.requiresSubqueryRewrite()) {
       new StmtRewriter.SubqueryRewriter().rewrite(analysisResult_);
       reAnalyze = true;
+      LOG.info("Re-analyze the rewritten query.");
     }
     if (!reAnalyze) return;
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index fd85322..4a9b758 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -34,11 +34,13 @@ import java.util.function.Function;
 
 import org.apache.impala.analysis.Path.PathType;
 import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
+import org.apache.impala.authorization.AuthorizationChecker;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.authorization.AuthorizationFactory;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.authorization.PrivilegeRequest;
 import org.apache.impala.authorization.PrivilegeRequestBuilder;
+import org.apache.impala.authorization.TableMask;
 import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.DatabaseNotFoundException;
@@ -684,6 +686,10 @@ public class Analyzer {
     return result;
   }
 
+  public TableRef resolveTableRef(TableRef tableRef) throws AnalysisException {
+    return resolveTableRef(tableRef, false);
+  }
+
   /**
    * Resolves the given TableRef into a concrete BaseTableRef, ViewRef or
    * CollectionTableRef. Returns the new resolved table ref or the given table
@@ -693,8 +699,10 @@ public class Analyzer {
    * an AuthorizationException is preferred over an AnalysisException so as not to
    * accidentally reveal the non-existence of tables/databases.
    */
-  public TableRef resolveTableRef(TableRef tableRef) throws AnalysisException {
-    // Return the table if it is already resolved.
+  public TableRef resolveTableRef(TableRef tableRef, boolean doTableMasking)
+      throws AnalysisException {
+    // Return the table if it is already resolved. This also avoids the table being
+    // masked again.
     if (tableRef.isResolved()) return tableRef;
     // Try to find a matching local view.
     if (tableRef.getPath().size() == 1) {
@@ -747,13 +755,33 @@ public class Analyzer {
     Preconditions.checkNotNull(resolvedPath);
     if (resolvedPath.destTable() != null) {
       FeTable table = resolvedPath.destTable();
-      if (table instanceof FeView) return new InlineViewRef((FeView) table, tableRef);
-      // The table must be a base table.
-      Preconditions.checkState(table instanceof FeFsTable ||
-          table instanceof FeKuduTable ||
-          table instanceof FeHBaseTable ||
-          table instanceof FeDataSourceTable);
-      return new BaseTableRef(tableRef, resolvedPath);
+      TableRef resolvedTableRef;
+      if (table instanceof FeView) {
+        resolvedTableRef = new InlineViewRef((FeView) table, tableRef);
+      } else {
+        // The table must be a base table.
+        Preconditions.checkState(table instanceof FeFsTable ||
+            table instanceof FeKuduTable ||
+            table instanceof FeHBaseTable ||
+            table instanceof FeDataSourceTable);
+        resolvedTableRef = new BaseTableRef(tableRef, resolvedPath);
+      }
+      if (!doTableMasking || !getAuthzFactory().getAuthorizationConfig().isEnabled()
+          || !getAuthzFactory().supportsColumnMasking()) {
+        return resolvedTableRef;
+      }
+
+      AuthorizationChecker authChecker = getAuthzFactory().newAuthorizationChecker(
+          getCatalog().getAuthPolicy());
+      TableMask tableMask = new TableMask(authChecker, table, user_);
+      try {
+        if (!tableMask.needsMaskingOrFiltering()) return resolvedTableRef;
+        return InlineViewRef.createTableMaskView(resolvedPath, resolvedTableRef,
+            tableMask);
+      } catch (InternalException e) {
+        LOG.error("Error performing table masking", e);
+        throw new AnalysisException("Error performing table masking", e);
+      }
     } else {
       return new CollectionTableRef(tableRef, resolvedPath);
     }
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
index a387022..3ff828a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateViewStmt.java
@@ -46,6 +46,8 @@ public class CreateViewStmt extends CreateOrAlterViewStmtBase {
     Analyzer viewAnalyzerr = new Analyzer(analyzer);
     // Enforce Hive column labels for view compatibility.
     viewAnalyzerr.setUseHiveColLabels(true);
+    // Disable table masking since we don't actually read the data.
+    viewDefStmt_.setDoTableMasking(false);
     viewDefStmt_.analyze(viewAnalyzerr);
 
     dbName_ = analyzer.getTargetDbName(tableName_);
diff --git a/fe/src/main/java/org/apache/impala/analysis/FromClause.java b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
index a13b2dd..159f45c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FromClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
@@ -39,6 +39,12 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
 
   private boolean analyzed_ = false;
 
+  // Whether we should perform table masking. It will replace each table/view with a
+  // masked subquery if there're column masking policies for the user on this table/view.
+  // Turned off for CreateView and AlterView statements since they're not actually
+  // reading data.
+  private boolean doTableMasking_ = true;
+
   public FromClause(List<TableRef> tableRefs) {
     tableRefs_ = Lists.newArrayList(tableRefs);
     // Set left table refs to ensure correct toSql() before analysis.
@@ -50,6 +56,15 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
   public FromClause() { tableRefs_ = new ArrayList<>(); }
   public List<TableRef> getTableRefs() { return tableRefs_; }
 
+  public void setDoTableMasking(boolean doTableMasking) {
+    doTableMasking_ = doTableMasking;
+    for (TableRef tableRef : tableRefs_) {
+      if (!(tableRef instanceof InlineViewRef)) continue;
+      InlineViewRef viewRef = (InlineViewRef) tableRef;
+      viewRef.getViewStmt().setDoTableMasking(doTableMasking);
+    }
+  }
+
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     if (analyzed_) return;
@@ -58,7 +73,7 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
     for (int i = 0; i < tableRefs_.size(); ++i) {
       TableRef tblRef = tableRefs_.get(i);
       // Replace non-InlineViewRef table refs with a BaseTableRef or ViewRef.
-      tblRef = analyzer.resolveTableRef(tblRef);
+      tblRef = analyzer.resolveTableRef(tblRef, doTableMasking_);
       tableRefs_.set(i, Preconditions.checkNotNull(tblRef));
       tblRef.setLeftTblRef(leftTblRef);
       tblRef.analyze(analyzer);
@@ -93,26 +108,62 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
     return new FromClause(clone);
   }
 
+  /**
+   * Unmask, un-resolve and reset all the tableRefs.
+   * TableMasking views are created by analysis so we should unwrap them to restore the
+   * unmasked TableRef.
+   * 'un-resolve' here means replacing resolved tableRefs with unresolved ones. To make
+   * sure we get the same results in later resolution, the unresolved tableRefs should
+   * use fully qualified paths. Otherwise, non-fully qualified paths might incorrectly
+   * match a local view.
+   * However, we don't un-resolve views because local views don't have fully qualified
+   * paths. Due to this we don't unmask a TableMasking view if the underlying TableRef is
+   * a view. TODO(IMPALA-9286): These may make this FromClause still dirty after reset()
+   * since it doesn't come back to the state before analyze(). We should introduce fully
+   * qualified paths for local views to fix this.
+   */
   public void reset() {
     for (int i = 0; i < size(); ++i) {
-      TableRef origTblRef = get(i);
-      if (origTblRef.isResolved() && !(origTblRef instanceof InlineViewRef)) {
-        // Replace resolved table refs with unresolved ones.
-        TableRef newTblRef = new TableRef(origTblRef);
-        // Use the fully qualified raw path to preserve the original resolution.
-        // Otherwise, non-fully qualified paths might incorrectly match a local view.
-        // TODO for 2.3: This full qualification preserves analysis state which is
-        // contrary to the intended semantics of reset(). We could address this issue by
-        // changing the WITH-clause analysis to register local views that have
-        // fully-qualified table refs, and then remove the full qualification here.
-        newTblRef.rawPath_ = origTblRef.getResolvedPath().getFullyQualifiedRawPath();
-        set(i, newTblRef);
-      }
-      get(i).reset();
+      unmaskAndUnresolveTableRef(i);
+      get(i).reset();   // Reset() recursion happens here for views
     }
     this.analyzed_ = false;
   }
 
+  /**
+   * Replace the i-th tableRef with an unmasked and unresolved one if the result is not a
+   * view. See more in comments of reset().
+   */
+  private void unmaskAndUnresolveTableRef(int i) {
+    TableRef origTblRef = get(i);
+    if (!origTblRef.isResolved()
+        || (origTblRef instanceof InlineViewRef && !origTblRef.isTableMaskingView())) {
+      return;
+    }
+    // Unmasked the TableRef if it's an inline view for table masking.
+    if (origTblRef.isTableMaskingView()) {
+      Preconditions.checkState(origTblRef instanceof InlineViewRef);
+      TableRef unMaskedTableRef = ((InlineViewRef) origTblRef).getUnMaskedTableRef();
+      if (unMaskedTableRef instanceof InlineViewRef) return;
+      // Migrate back the properties (e.g. join ops, hints) since we are going to
+      // replace it with an unresolved one.
+      origTblRef.migratePropertiesTo(unMaskedTableRef);
+      origTblRef = unMaskedTableRef;
+    }
+    // Replace the resolved TableRef with an unresolved one if it's not a view.
+    if (!(origTblRef instanceof InlineViewRef)) {
+      TableRef newTblRef = new TableRef(origTblRef);
+      // Use the fully qualified raw path to preserve the original resolution.
+      // Otherwise, non-fully qualified paths might incorrectly match a local view.
+      // TODO(IMPALA-9286): This full qualification preserves analysis state which is
+      // contrary to the intended semantics of reset(). We could address this issue by
+      // changing the WITH-clause analysis to register local views that have
+      // fully-qualified table refs, and then remove the full qualification here.
+      newTblRef.rawPath_ = origTblRef.getResolvedPath().getFullyQualifiedRawPath();
+      set(i, newTblRef);
+    }
+  }
+
   @Override
   public final String toSql() {
     return toSql(DEFAULT);
diff --git a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
index 50c70de..43ed5ef 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
@@ -21,11 +21,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.impala.authorization.TableMask;
+import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
 import org.apache.impala.rewrite.ExprRewriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +72,9 @@ public class InlineViewRef extends TableRef {
   // Map inline view's output slots to the corresponding baseTblResultExpr of queryStmt.
   protected final ExprSubstitutionMap baseTblSmap_;
 
+  // Whether this is an inline view generated for table masking.
+  private boolean isTableMaskingView_ = false;
+
   // END: Members that need to be reset()
   /////////////////////////////////////////
 
@@ -127,6 +133,45 @@ public class InlineViewRef extends TableRef {
     materializedTupleIds_.addAll(other.materializedTupleIds_);
     smap_ = other.smap_.clone();
     baseTblSmap_ = other.baseTblSmap_.clone();
+    isTableMaskingView_ = other.isTableMaskingView_;
+  }
+
+  /**
+   * Creates an inline-view doing table masking for column masking and row filtering
+   * policies. Callers should replace 'tableRef' with the returned view.
+   *
+   * @param resolvedPath resolved path for the original table/view
+   * @param tableRef original resolved table/view
+   * @param tableMask TableMask providing column masking and row filtering policies
+   */
+  static InlineViewRef createTableMaskView(Path resolvedPath, TableRef tableRef,
+      TableMask tableMask) throws AnalysisException, InternalException {
+    Preconditions.checkNotNull(resolvedPath);
+    Preconditions.checkNotNull(resolvedPath.getRootTable());
+    Preconditions.checkNotNull(tableRef);
+    Preconditions.checkState(tableRef instanceof InlineViewRef
+        || tableRef instanceof BaseTableRef);
+    List<Column> columns = resolvedPath.getRootTable().getColumnsInHiveOrder();
+    List<SelectListItem> items = Lists.newArrayListWithCapacity(columns.size());
+    for (Column col: columns) {
+      // TODO: only add materialized columns to avoid introducing new privilege
+      //  requirements (IMPALA-9223)
+      items.add(new SelectListItem(
+          tableMask.createColumnMask(col.getName(), col.getType()), col.getName()));
+    }
+    SelectList selectList = new SelectList(items);
+    FromClause fromClause = new FromClause(Lists.newArrayList(tableRef));
+    SelectStmt tableMaskStmt = new SelectStmt(selectList, fromClause,
+        null, null, null, null, null);
+    InlineViewRef viewRef = new InlineViewRef(/*alias*/ null, tableMaskStmt,
+        (TableSampleClause) null);
+    tableRef.migratePropertiesTo(viewRef);
+    viewRef.isTableMaskingView_ = true;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Replacing '{}' with subquery: {}", tableRef.toSql(),
+          tableMaskStmt.toSql());
+    }
+    return viewRef;
   }
 
   /**
@@ -310,6 +355,20 @@ public class InlineViewRef extends TableRef {
 
   public FeView getView() { return view_; }
 
+  public boolean isTableMaskingView() { return isTableMaskingView_; }
+
+  /**
+   * Return the unmasked TableRef if this is an inline view for table masking.
+   */
+  public TableRef getUnMaskedTableRef() {
+    Preconditions.checkState(isTableMaskingView_);
+    Preconditions.checkState(queryStmt_ instanceof SelectStmt);
+    SelectStmt selectStmt = (SelectStmt) queryStmt_;
+    Preconditions.checkNotNull(selectStmt.fromClause_);
+    Preconditions.checkState(selectStmt.fromClause_.size() == 1);
+    return selectStmt.fromClause_.get(0);
+  }
+
   @Override
   protected TableRef clone() { return new InlineViewRef(this); }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index 7dacfcf..0d1e6d7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -168,6 +168,13 @@ public abstract class QueryStmt extends StatementBase {
   }
 
   /**
+   * Disable table masking when analyzing the FromClauses. Used in CreateView and
+   * AlterView statements since they don't actually read the data.
+   * @param doTableMasking
+   */
+  public void setDoTableMasking(boolean doTableMasking) {}
+
+  /**
    * Returns a list containing all the materialized tuple ids that this stmt is
    * correlated with (i.e., those tuple ids from outer query blocks that TableRefs
    * inside this stmt are rooted at).
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 26a9422..5b747df 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -177,6 +177,11 @@ public class SelectStmt extends QueryStmt {
     return tableAliasGenerator_;
   }
 
+  @Override
+  public void setDoTableMasking(boolean doTableMasking) {
+    fromClause_.setDoTableMasking(doTableMasking);
+  }
+
   /**
    * Creates resultExprs and baseTblResultExprs.
    */
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index f86236b..cd5f51c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeFsTable;
@@ -634,4 +635,22 @@ public class TableRef extends StmtNode {
     correlatedTupleIds_.clear();
     desc_ = null;
   }
+
+  public boolean isTableMaskingView() { return false; }
+
+  void migratePropertiesTo(TableRef other) {
+    other.aliases_ = aliases_;
+    other.onClause_ = onClause_;
+    other.usingColNames_ = usingColNames_;
+    other.joinOp_ = joinOp_;
+    other.joinHints_ = joinHints_;
+    other.tableHints_ = tableHints_;
+    // Clear properties. Don't clear aliases_ since it's still used in resolving slots
+    // in the query block of 'other'.
+    onClause_ = null;
+    usingColNames_ = null;
+    joinOp_ = null;
+    joinHints_ = new ArrayList<>();
+    tableHints_ = new ArrayList<>();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
index f32fa4b..bf4c0bf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
@@ -215,6 +215,13 @@ public class UnionStmt extends QueryStmt {
   }
 
   @Override
+  public void setDoTableMasking(boolean doTableMasking) {
+    for (UnionOperand op : operands_) {
+      op.getQueryStmt().setDoTableMasking(doTableMasking);
+    }
+  }
+
+  @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     if (isAnalyzed()) return;
     super.analyze(analyzer);
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
index e590d91..bb1c587 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
@@ -23,6 +23,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TSessionState;
 import org.apache.impala.util.EventSequence;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 
@@ -82,4 +83,17 @@ public interface AuthorizationChecker {
    * Invalidates an authorization cache.
    */
   void invalidateAuthorizationCache();
+
+  /**
+   * Returns whether the given table needs column masking or row filtering when read by
+   * the given user.
+   */
+  boolean needsMaskingOrFiltering(User user, String dbName, String tableName,
+      List<String> requiredColumns) throws InternalException;
+
+  /**
+   * Returns the column mask string for the given column.
+   */
+  String createColumnMask(User user, String dbName, String tableName, String columnName)
+      throws InternalException;
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java
index bc85254..3a8c9d8 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java
@@ -39,13 +39,12 @@ public interface AuthorizationFactory {
   /**
    * Creates a new instance of {@link AuthorizationChecker}.
    */
-  AuthorizationChecker newAuthorizationChecker(AuthorizationPolicy authzPolicy)
-      throws ImpalaException;
+  AuthorizationChecker newAuthorizationChecker(AuthorizationPolicy authzPolicy);
 
   /**
    * Creates a new instance of {@link AuthorizationChecker}.
    */
-  default AuthorizationChecker newAuthorizationChecker() throws ImpalaException {
+  default AuthorizationChecker newAuthorizationChecker() {
     return newAuthorizationChecker(null);
   }
 
@@ -70,4 +69,10 @@ public interface AuthorizationFactory {
    */
   AuthorizationManager newAuthorizationManager(CatalogServiceCatalog catalog)
       throws ImpalaException;
+
+  /**
+   * Returns whether the authorization implementation supports column masking and row
+   * filtering. Currently, only Ranger implementation supports these.
+   */
+  boolean supportsColumnMasking();
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
index 3920f83..d41b5ed 100644
--- a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java
@@ -224,10 +224,10 @@ public abstract class BaseAuthorizationChecker implements AuthorizationChecker {
       throws AuthorizationException, InternalException {
     Preconditions.checkArgument(!requests.isEmpty());
     Analyzer analyzer = analysisResult.getAnalyzer();
-    // We need to temporarily deny access when column masking or row filtering feature is
-    // enabled until Impala has full implementation of column masking and row filtering.
+    // We need to temporarily deny access when row filtering feature is enabled until
+    // Impala has full implementation of row filtering.
     // This is to prevent data leak since we do not want Impala to show any information
-    // when Hive has column masking and row filtering enabled.
+    // when Hive has row filtering enabled.
     authorizeRowFilterAndColumnMask(analysisResult.getAnalyzer().getUser(), requests);
 
     boolean hasTableSelectPriv = true;
diff --git a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
index 33f2414..f57e943 100644
--- a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
@@ -212,6 +212,18 @@ public class NoopAuthorizationFactory implements AuthorizationFactory {
           String sqlStmt, TSessionState sessionState, Optional<EventSequence> timeline) {
         return new AuthorizationContext(timeline);
       }
+
+      @Override
+      public boolean needsMaskingOrFiltering(User user, String dbName,
+          String tableName, List<String> requiredColumns) {
+        return false;
+      }
+
+      @Override
+      public String createColumnMask(User user, String dbName, String tableName,
+          String columnName) {
+        return null;
+      }
     };
   }
 
@@ -225,4 +237,9 @@ public class NoopAuthorizationFactory implements AuthorizationFactory {
   public AuthorizationManager newAuthorizationManager(CatalogServiceCatalog catalog) {
     return new NoopAuthorizationManager();
   }
+
+  @Override
+  public boolean supportsColumnMasking() {
+    return false;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/TableMask.java b/fe/src/main/java/org/apache/impala/authorization/TableMask.java
new file mode 100644
index 0000000..9323b96
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/authorization/TableMask.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.authorization;
+
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.Parser;
+import org.apache.impala.analysis.SelectStmt;
+import org.apache.impala.analysis.SlotRef;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A TableMask instance contains all the information for generating a subquery for table
+ * masking (column masking / row filtering).
+ */
+public class TableMask {
+  private static final Logger LOG = LoggerFactory.getLogger(TableMask.class);
+
+  private AuthorizationChecker authChecker_;
+  private String dbName_;
+  private String tableName_;
+  private List<String> requiredColumns_;
+  private User user_;
+
+  public TableMask(AuthorizationChecker authzChecker, FeTable table, User user) {
+    this.authChecker_ = authzChecker;
+    this.dbName_ = table.getDb().getName();
+    this.tableName_ = table.getName();
+    // TODO: only require materialize columns to avoid unneccessary masking so we won't
+    //       hit IMPALA-9223
+    this.requiredColumns_ = table.getColumnNames();
+    this.user_ = user;
+  }
+
+  /**
+   * Returns whether the table/view has column masking or row filtering policies.
+   */
+  public boolean needsMaskingOrFiltering() throws InternalException {
+    return authChecker_.needsMaskingOrFiltering(user_, dbName_, tableName_,
+        requiredColumns_);
+  }
+
+  /**
+   * Return the masked Expr of the given column
+   */
+  public Expr createColumnMask(String colName, Type colType)
+      throws InternalException, AnalysisException {
+    String maskedValue = authChecker_.createColumnMask(user_, dbName_, tableName_,
+        colName);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Performing column masking on table {}.{}: {} => {}",
+          dbName_, tableName_, colName, maskedValue);
+    }
+    if (maskedValue == null) return new SlotRef(colName);
+    SelectStmt maskStmt = (SelectStmt) Parser.parse(
+        String.format("SELECT CAST(%s AS %s)", maskedValue, colType));
+    if (maskStmt.getSelectList().getItems().size() != 1 || maskStmt.hasGroupByClause()
+        || maskStmt.hasHavingClause() || maskStmt.hasWhereClause()) {
+      throw new AnalysisException("Illegal column masked value: " + maskedValue);
+    }
+    Expr res = maskStmt.getSelectList().getItems().get(0).getExpr();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Returned Expr: " + res.toSql());
+    }
+    return res;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
index 7d06e29..f77ab9d 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
@@ -19,6 +19,7 @@ package org.apache.impala.authorization.ranger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
 import org.apache.impala.authorization.Authorizable;
@@ -38,6 +39,8 @@ import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TSessionState;
 import org.apache.impala.util.EventSequence;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
@@ -47,7 +50,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -185,17 +187,10 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
       List<PrivilegeRequest> privilegeRequests)
       throws AuthorizationException, InternalException {
     for (PrivilegeRequest request : privilegeRequests) {
-      if (request.getAuthorizable().getType() == Type.COLUMN) {
-        authorizeColumnMask(user,
+      if (request.getAuthorizable().getType() == Type.TABLE) {
+        authorizeRowFilter(user,
             request.getAuthorizable().getDbName(),
-            request.getAuthorizable().getTableName(),
-            request.getAuthorizable().getColumnName());
-      } else if (request.getAuthorizable().getType() == Type.TABLE) {
-        if (request.getAuthorizable().getType() == Type.TABLE) {
-          authorizeRowFilter(user,
-              request.getAuthorizable().getDbName(),
-              request.getAuthorizable().getTableName());
-        }
+            request.getAuthorizable().getTableName());
       }
     }
   }
@@ -267,13 +262,64 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
     }
   }
 
+  @Override
+  public boolean needsMaskingOrFiltering(User user, String dbName, String tableName,
+      List<String> requiredColumns) throws InternalException {
+    for (String column: requiredColumns) {
+      if (evalColumnMask(user, dbName, tableName, column).isMaskEnabled()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public String createColumnMask(User user, String dbName, String tableName,
+      String columnName) throws InternalException {
+    RangerAccessResult accessResult = evalColumnMask(user, dbName, tableName,
+        columnName);
+    // No column masking policies, return the original column.
+    if (!accessResult.isMaskEnabled()) return columnName;
+    String maskType = accessResult.getMaskType();
+    RangerServiceDef.RangerDataMaskTypeDef maskTypeDef = accessResult.getMaskTypeDef();
+    Preconditions.checkNotNull(maskType);
+    // The expression used to replace the original column.
+    String maskedColumn = columnName;
+    // The expression of the mask type. Column names are referenced by "{col}".
+    // Transformer examples for the builtin mask types:
+    //   mask type           transformer
+    //   MASK                mask({col})
+    //   MASK_SHOW_LAST_4    mask_show_last_n({col}, 4, 'x', 'x', 'x', -1, '1')
+    //   MASK_SHOW_FIRST_4   mask_show_first_n({col}, 4, 'x', 'x', 'x', -1, '1')
+    //   MASK_HASH           mask_hash({col})
+    //   MASK_DATE_SHOW_YEAR mask({col}, 'x', 'x', 'x', -1, '1', 1, 0, -1)
+    String transformer = null;
+    if (maskTypeDef != null) {
+      transformer = maskTypeDef.getTransformer();
+    }
+    if (StringUtils.equalsIgnoreCase(maskType, RangerPolicy.MASK_TYPE_NULL)) {
+      maskedColumn = "NULL";
+    } else if (StringUtils.equalsIgnoreCase(maskType, RangerPolicy.MASK_TYPE_CUSTOM)) {
+      String maskedValue = accessResult.getMaskedValue();
+      if (maskedValue == null) {
+        maskedColumn = "NULL";
+      } else {
+        maskedColumn = maskedValue.replace("{col}", columnName);
+      }
+    } else if (StringUtils.isNotEmpty(transformer)) {
+      maskedColumn = transformer.replace("{col}", columnName);
+    }
+    LOG.info("dbName: {}, tableName: {}, column: {}, maskType: {}, columnTransformer: {}",
+        dbName, tableName, columnName, maskType, maskedColumn);
+    return maskedColumn;
+  }
+
   /**
-   * This method checks if column mask is enabled on the given columns and deny access
-   * when column mask is enabled by throwing an {@link AuthorizationException}. This is
-   * to prevent data leak when Hive has column mask enabled but not in Impala.
+   * Evaluate column masking policies on the given column and returns the result.
+   * A RangerAccessResult contains the matched policy details and the masked column.
    */
-  private void authorizeColumnMask(User user, String dbName, String tableName,
-      String columnName) throws InternalException, AuthorizationException {
+  private RangerAccessResult evalColumnMask(User user, String dbName,
+      String tableName, String columnName) throws InternalException {
     RangerAccessResourceImpl resource = new RangerImpalaResourceBuilder()
         .database(dbName)
         .table(tableName)
@@ -281,11 +327,7 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
         .build();
     RangerAccessRequest req = new RangerAccessRequestImpl(resource,
         SELECT_ACCESS_TYPE, user.getShortName(), getUserGroups(user));
-    if (plugin_.evalDataMaskPolicies(req, null).isMaskEnabled()) {
-      throw new AuthorizationException(String.format(
-          "Impala does not support column masking yet. Column masking is enabled on " +
-              "column: %s.%s.%s", dbName, tableName, columnName));
-    }
+    return plugin_.evalDataMaskPolicies(req, null);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
index c4711b1..f528daa 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
@@ -99,4 +99,9 @@ public class RangerAuthorizationFactory implements AuthorizationFactory {
     plugin.init();
     return new RangerCatalogdAuthorizationManager(() -> plugin, catalog);
   }
+
+  @Override
+  public boolean supportsColumnMasking() {
+    return true;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java
index cd0f5ff..3f1b0e8 100644
--- a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java
@@ -90,6 +90,18 @@ public class SentryAuthorizationChecker extends BaseAuthorizationChecker {
   }
 
   @Override
+  public boolean needsMaskingOrFiltering(User user, String dbName, String tableName,
+      List<String> requiredColumns) {
+    return false;
+  }
+
+  @Override
+  public String createColumnMask(User user, String dbName, String tableName,
+      String columnName) {
+    return columnName;
+  }
+
+  @Override
   public AuthorizationContext createAuthorizationContext(boolean doAudits,
       String sqlStmt, TSessionState sessionState, Optional<EventSequence> timeline) {
     return new AuthorizationContext(timeline);
diff --git a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationFactory.java
index 2ad1ec3..70182a6 100644
--- a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationFactory.java
@@ -101,4 +101,9 @@ public class SentryAuthorizationFactory implements AuthorizationFactory {
     return new SentryCatalogdAuthorizationManager(
         (SentryAuthorizationConfig) getAuthorizationConfig(), catalog);
   }
+
+  @Override
+  public boolean supportsColumnMasking() {
+    return false;
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index d9f9252..7f52760 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4045,7 +4045,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     // Also check TableRefs.
     testNumberOfMembers(TableRef.class, 21);
     testNumberOfMembers(BaseTableRef.class, 0);
-    testNumberOfMembers(InlineViewRef.class, 8);
+    testNumberOfMembers(InlineViewRef.class, 9);
   }
 
   @SuppressWarnings("rawtypes")
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
index 401a333..e68f013 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
@@ -2857,129 +2857,6 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
   }
 
   @Test
-  public void testColumnMaskEnabled() throws ImpalaException {
-    if (authzProvider_ == AuthorizationProvider.SENTRY) return;
-
-    String policyName = "col_mask";
-    for (String tableName: new String[]{"alltypes", "alltypes_view"}) {
-      String json = String.format("{\n" +
-          "  \"name\": \"%s\",\n" +
-          "  \"policyType\": 1,\n" +
-          "  \"serviceType\": \"%s\",\n" +
-          "  \"service\": \"%s\",\n" +
-          "  \"resources\": {\n" +
-          "    \"database\": {\n" +
-          "      \"values\": [\"functional\"],\n" +
-          "      \"isExcludes\": false,\n" +
-          "      \"isRecursive\": false\n" +
-          "    },\n" +
-          "    \"table\": {\n" +
-          "      \"values\": [\"%s\"],\n" +
-          "      \"isExcludes\": false,\n" +
-          "      \"isRecursive\": false\n" +
-          "    },\n" +
-          "    \"column\": {\n" +
-          "      \"values\": [\"string_col\"],\n" +
-          "      \"isExcludes\": false,\n" +
-          "      \"isRecursive\": false\n" +
-          "    }\n" +
-          "  },\n" +
-          "  \"dataMaskPolicyItems\": [\n" +
-          "    {\n" +
-          "      \"accesses\": [\n" +
-          "        {\n" +
-          "          \"type\": \"select\",\n" +
-          "          \"isAllowed\": true\n" +
-          "        }\n" +
-          "      ],\n" +
-          "      \"users\": [\"%s\"],\n" +
-          "      \"dataMaskInfo\": {\"dataMaskType\": \"MASK\"}\n" +
-          "    }\n" +
-          "  ]\n" +
-          "}", policyName, RANGER_SERVICE_TYPE, RANGER_SERVICE_NAME, tableName,
-          user_.getShortName());
-
-      try {
-        // Clear existing row filter policies, otherwise they will cause different
-        // error message since we check them before any column masking policies.
-        clearRangerRowFilterPolicies("functional", tableName);
-        createRangerPolicy(policyName, json);
-        rangerImpalaPlugin_.refreshPoliciesAndTags();
-
-        // Queries on columns that are not masked should be allowed.
-        authorize("select id from functional.alltypes")
-            .ok(onServer(TPrivilegeLevel.ALL));
-        authorize("select x from functional.alltypes_view_sub")
-            .ok(onServer(TPrivilegeLevel.ALL));
-        authorize("select string_col from functional_kudu.alltypes")
-            .ok(onServer(TPrivilegeLevel.ALL));
-
-        // Normal select.
-        authorize(String.format("select string_col from functional.%s", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // Column within a function.
-        authorize(String.format(
-            "select substr(string_col, 0, 1) from functional.%s", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // Select with *.
-        authorize(String.format("select * from functional.%s", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // Sub-query.
-        authorize(String.format(
-            "select t.string_col from (select * from functional.%s) t", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // CTE.
-        authorize(String.format("with t as (select * from functional.%s) " +
-            "select string_col from t", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // CTAS.
-        authorize(String.format(
-            "create table t as select * from functional.%s", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // Create view.
-        authorize(String.format(
-            "create view v as select * from functional.%s", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // Alter view.
-        authorize(String.format("alter view functional.alltypes_view_sub as " +
-            "select * from functional.%s", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // Union.
-        authorize(String.format(
-            "select string_col from functional.%s union select 'hello'", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // Update.
-        authorize(String.format(
-            "update functional_kudu.alltypes set int_col = 1 where string_col in " +
-            "(select string_col from functional.%s)", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // Delete.
-        authorize(String.format("delete functional_kudu.alltypes where string_col in " +
-            "(select string_col from functional.%s)", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-        // Copy testcase.
-        authorize(String.format(
-            "copy testcase to '/tmp' select * from functional.%s", tableName))
-            .error(columnMaskError(String.format("functional.%s.string_col", tableName)),
-                onServer(TPrivilegeLevel.ALL));
-      } finally {
-        deleteRangerPolicy(policyName);
-      }
-    }
-  }
-
-  @Test
   public void testRowFilterEnabled() throws ImpalaException {
     if (authzProvider_ == AuthorizationProvider.SENTRY) return;
 
diff --git a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
index df90829..463ee0b 100644
--- a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.authorization.ranger;
 
+import com.google.common.base.Preconditions;
 import org.apache.impala.authorization.AuthorizationChecker;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.authorization.AuthorizationContext;
@@ -39,7 +40,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class RangerAuditLogTest extends AuthorizationTestBase {
-  private RangerAuthorizationCheckerSpy authzChecker_;
+  private static RangerAuthorizationCheckerSpy authzChecker_ = null;
 
   private static class RangerAuthorizationCheckerSpy extends RangerAuthorizationChecker {
     private AuthorizationContext authzCtx_;
@@ -205,6 +206,8 @@ public class RangerAuditLogTest extends AuthorizationTestBase {
     authorize(stmt).ok(privileges);
     RangerAuthorizationContext rangerCtx =
         (RangerAuthorizationContext) authzChecker_.authzCtx_;
+    Preconditions.checkNotNull(rangerCtx);
+    Preconditions.checkNotNull(rangerCtx.getAuditHandler());
     resultChecker.accept(rangerCtx.getAuditHandler().getAuthzEvents());
   }
 
@@ -213,6 +216,8 @@ public class RangerAuditLogTest extends AuthorizationTestBase {
     authorize(stmt).error("", privileges);
     RangerAuthorizationContext rangerCtx =
         (RangerAuthorizationContext) authzChecker_.authzCtx_;
+    Preconditions.checkNotNull(rangerCtx);
+    Preconditions.checkNotNull(rangerCtx.getAuditHandler());
     resultChecker.accept(rangerCtx.getAuditHandler().getAuthzEvents());
   }
 
@@ -238,7 +243,11 @@ public class RangerAuditLogTest extends AuthorizationTestBase {
       @Override
       public AuthorizationChecker newAuthorizationChecker(
           AuthorizationPolicy authzPolicy) {
-        authzChecker_ = new RangerAuthorizationCheckerSpy(authzConfig_);
+        // Do not create a new instance if we already have one. This is consistent with
+        // RangerAuthorizationFactory#newAuthorizationChecker().
+        if (authzChecker_ == null) {
+          authzChecker_ = new RangerAuthorizationCheckerSpy(authzConfig_);
+        }
         return authzChecker_;
       }
     };
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 64d7807..a4f43c4 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -385,6 +385,18 @@ public class FrontendTestBase extends AbstractFrontendTest {
           public void invalidateAuthorizationCache() {}
 
           @Override
+          public boolean needsMaskingOrFiltering(User user, String dbName,
+              String tableName, List<String> requiredColumns) {
+            return false;
+          }
+
+          @Override
+          public String createColumnMask(User user, String dbName, String tableName,
+              String columnName) {
+            return null;
+          }
+
+          @Override
           public AuthorizationContext createAuthorizationContext(boolean doAudits,
               String sqlStmt, TSessionState sessionState,
               Optional<EventSequence> timeline) {
@@ -403,6 +415,11 @@ public class FrontendTestBase extends AbstractFrontendTest {
       public AuthorizationManager newAuthorizationManager(CatalogServiceCatalog catalog) {
         return new NoopAuthorizationManager();
       }
+
+      @Override
+      public boolean supportsColumnMasking() {
+        return false;
+      }
     };
   }
 }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
new file mode 100644
index 0000000..4c71a80
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
@@ -0,0 +1,400 @@
+====
+---- QUERY
+select id, bool_col, string_col from functional.alltypestiny
+---- RESULTS
+0,NULL,'0aaa'
+100,NULL,'1aaa'
+200,NULL,'0aaa'
+300,NULL,'1aaa'
+400,NULL,'0aaa'
+500,NULL,'1aaa'
+600,NULL,'0aaa'
+700,NULL,'1aaa'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test on table alias
+select t.id, bool_col, string_col from functional.alltypestiny t
+---- RESULTS
+0,NULL,'0aaa'
+100,NULL,'1aaa'
+200,NULL,'0aaa'
+300,NULL,'1aaa'
+400,NULL,'0aaa'
+500,NULL,'1aaa'
+600,NULL,'0aaa'
+700,NULL,'1aaa'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test on slot path resolution
+select functional.alltypestiny.id, alltypestiny.bool_col, string_col
+from functional.alltypestiny
+---- RESULTS
+0,NULL,'0aaa'
+100,NULL,'1aaa'
+200,NULL,'0aaa'
+300,NULL,'1aaa'
+400,NULL,'0aaa'
+500,NULL,'1aaa'
+600,NULL,'0aaa'
+700,NULL,'1aaa'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test on star select item
+select * from functional.alltypestiny
+---- RESULTS
+0,NULL,0,0,0,0,0,0,'01/01/09','0aaa',2009-01-01 00:00:00,2009,1
+100,NULL,1,1,1,10,1.100000023841858,10.1,'01/01/09','1aaa',2009-01-01 00:01:00,2009,1
+200,NULL,0,0,0,0,0,0,'02/01/09','0aaa',2009-02-01 00:00:00,2009,2
+300,NULL,1,1,1,10,1.100000023841858,10.1,'02/01/09','1aaa',2009-02-01 00:01:00,2009,2
+400,NULL,0,0,0,0,0,0,'03/01/09','0aaa',2009-03-01 00:00:00,2009,3
+500,NULL,1,1,1,10,1.100000023841858,10.1,'03/01/09','1aaa',2009-03-01 00:01:00,2009,3
+600,NULL,0,0,0,0,0,0,'04/01/09','0aaa',2009-04-01 00:00:00,2009,4
+700,NULL,1,1,1,10,1.100000023841858,10.1,'04/01/09','1aaa',2009-04-01 00:01:00,2009,4
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Test on star select item with table alias
+select * from functional.alltypestiny t
+---- RESULTS
+0,NULL,0,0,0,0,0,0,'01/01/09','0aaa',2009-01-01 00:00:00,2009,1
+100,NULL,1,1,1,10,1.100000023841858,10.1,'01/01/09','1aaa',2009-01-01 00:01:00,2009,1
+200,NULL,0,0,0,0,0,0,'02/01/09','0aaa',2009-02-01 00:00:00,2009,2
+300,NULL,1,1,1,10,1.100000023841858,10.1,'02/01/09','1aaa',2009-02-01 00:01:00,2009,2
+400,NULL,0,0,0,0,0,0,'03/01/09','0aaa',2009-03-01 00:00:00,2009,3
+500,NULL,1,1,1,10,1.100000023841858,10.1,'03/01/09','1aaa',2009-03-01 00:01:00,2009,3
+600,NULL,0,0,0,0,0,0,'04/01/09','0aaa',2009-04-01 00:00:00,2009,4
+700,NULL,1,1,1,10,1.100000023841858,10.1,'04/01/09','1aaa',2009-04-01 00:01:00,2009,4
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Test on predicate. Should evaluate on masked values.
+select * from functional.alltypestiny where id = 1
+---- RESULTS
+====
+---- QUERY
+# Test on predicate. Should evaluate on masked values.
+select * from functional.alltypestiny where id = 100
+---- RESULTS
+100,NULL,1,1,1,10,1.100000023841858,10.1,'01/01/09','1aaa',2009-01-01 00:01:00,2009,1
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Test on predicate. Should evaluate on masked values.
+select concat(string_col, date_string_col) from (
+  select string_col, date_string_col from functional.alltypestiny where string_col = "1"
+) t
+---- RESULTS
+====
+---- QUERY
+# Test on predicate. Should evaluate on masked values.
+select concat(string_col, date_string_col) from (
+  select string_col, date_string_col from functional.alltypestiny where string_col = "1aaa"
+) t
+---- RESULTS
+'1aaa01/01/09'
+'1aaa02/01/09'
+'1aaa03/01/09'
+'1aaa04/01/09'
+---- TYPES
+STRING
+====
+---- QUERY
+# Test on slot path resolution for multiple tables
+select t.id, s.bool_col, t.bool_col, s.string_col, t.string_col
+from functional.alltypessmall s join functional.alltypestiny t
+on s.id = t.id
+---- RESULTS
+0,true,NULL,'0','0aaa'
+---- TYPES
+INT,BOOLEAN,BOOLEAN,STRING,STRING
+====
+---- QUERY
+# Test on slot path resolution for multiple tables
+select t.id, s.bool_col, t.bool_col, s.string_col, t.string_col
+from functional.alltypessmall s join functional.alltypestiny t using (id)
+---- RESULTS
+0,true,NULL,'0','0aaa'
+---- TYPES
+INT,BOOLEAN,BOOLEAN,STRING,STRING
+====
+---- QUERY
+# Test on slot path resolution for multiple tables
+select a.id, s.bool_col, t.bool_col, s.string_col, t.string_col
+from functional.alltypes a
+  join functional.alltypestiny t using (id)
+  join functional.alltypessmall s on t.id = s.id
+---- RESULTS
+0,true,NULL,'0','0aaa'
+---- TYPES
+INT,BOOLEAN,BOOLEAN,STRING,STRING
+====
+---- QUERY
+select t.* from functional.alltypessmall s join functional.alltypestiny t on s.id = t.id
+---- RESULTS
+0,NULL,0,0,0,0,0,0,'01/01/09','0aaa',2009-01-01 00:00:00,2009,1
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+select
+  alltypessmall.id, alltypestiny.id + 1,
+  alltypessmall.bool_col, functional.alltypestiny.bool_col is null,
+  alltypessmall.string_col, concat('xxx', alltypestiny.string_col)
+from functional.alltypessmall join functional.alltypestiny
+on alltypessmall.id = functional.alltypestiny.id
+where alltypestiny.id % 200 = 0
+---- RESULTS
+0,1,true,true,'0','xxx0aaa'
+---- TYPES
+INT,BIGINT,BOOLEAN,BOOLEAN,STRING,STRING
+====
+---- QUERY
+# Test on subqueries
+select id, bool_col, int_col, string_col from (
+  select id, bool_col, int_col, string_col from (
+    select id, bool_col, int_col, upper(string_col) as string_col
+    from functional.alltypestiny
+  ) t1 where id % 200 = 0
+) t0 where id = 400
+---- RESULTS
+400,NULL,0,'0AAA'
+---- TYPES
+INT,BOOLEAN,INT,STRING
+====
+---- QUERY
+# Test on union
+select id, bool_col, string_col from functional.alltypestiny
+union all
+select id, bool_col, string_col from functional.alltypestiny
+---- RESULTS
+0,NULL,'0aaa'
+100,NULL,'1aaa'
+200,NULL,'0aaa'
+300,NULL,'1aaa'
+400,NULL,'0aaa'
+500,NULL,'1aaa'
+600,NULL,'0aaa'
+700,NULL,'1aaa'
+0,NULL,'0aaa'
+100,NULL,'1aaa'
+200,NULL,'0aaa'
+300,NULL,'1aaa'
+400,NULL,'0aaa'
+500,NULL,'1aaa'
+600,NULL,'0aaa'
+700,NULL,'1aaa'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test on view and it's underlying table. alltypes_view is a view on table alltypes.
+# They both have column masking policies and are applied.
+select id, bool_col, int_col, string_col from functional.alltypes_view
+order by id limit 10
+---- RESULTS
+0,true,0,'vvv0ttt'
+100,false,1,'vvv1ttt'
+200,true,2,'vvv2ttt'
+300,false,3,'vvv3ttt'
+400,true,4,'vvv4ttt'
+500,false,5,'vvv5ttt'
+600,true,6,'vvv6ttt'
+700,false,7,'vvv7ttt'
+800,true,8,'vvv8ttt'
+900,false,9,'vvv9ttt'
+---- TYPES
+INT,BOOLEAN,INT,STRING
+====
+---- QUERY
+# Test on local view (CTE). Correctly ignore masking on local view names so the result
+# won't be 100 (affected by policy id => id * 100).
+use functional;
+with alltypestiny as (select 1 as id)
+select * from alltypestiny
+---- RESULTS
+1
+====
+---- QUERY
+# Test on local view (CTE). Correctly ignore masking on local view names so the value
+# of local view 'alltypes' won't be 10000 (affected by policy id => id * 100).
+use functional;
+with alltypes as (select 100 as id)
+select alltypes.id from alltypestiny join alltypes using (id)
+---- RESULTS
+100
+====
+---- QUERY
+# Test on local view (CTE). Correctly mask table used in local view.
+use functional;
+with iv as (select id, bool_col, string_col from alltypestiny)
+select * from iv
+---- RESULTS
+0,NULL,'0aaa'
+100,NULL,'1aaa'
+200,NULL,'0aaa'
+300,NULL,'1aaa'
+400,NULL,'0aaa'
+500,NULL,'1aaa'
+600,NULL,'0aaa'
+700,NULL,'1aaa'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test on local view (CTE). Correctly mask view used in local view.
+use functional;
+with iv as (select id, bool_col, string_col from alltypes_view order by id limit 10)
+select * from iv
+---- RESULTS
+0,true,'vvv0ttt'
+100,false,'vvv1ttt'
+200,true,'vvv2ttt'
+300,false,'vvv3ttt'
+400,true,'vvv4ttt'
+500,false,'vvv5ttt'
+600,true,'vvv6ttt'
+700,false,'vvv7ttt'
+800,true,'vvv8ttt'
+900,false,'vvv9ttt'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test on local view (CTE).
+use functional;
+with iv1 as (select id, bool_col, string_col from alltypestiny),
+     iv2 as (select int_col, count(int_col) as cnt from alltypestiny group by int_col)
+select iv1.*, iv2.*, v.string_col from iv1, iv2, alltypes_view v
+where iv1.id = iv2.int_col and iv1.id = v.id
+---- RESULTS
+0,NULL,'0aaa',0,4,'vvv0ttt'
+---- TYPES
+INT,BOOLEAN,STRING,INT,BIGINT,STRING
+====
+---- QUERY
+# Test on local view (CTE). The join properties are associated at TableRef of
+# alltypes_view. Test those properties are migrated correctly in masking and
+# unmasking. See more in FromClause#reset().
+with iv as (select v.string_col
+  from functional.alltypestiny t
+  join functional.alltypes_view v
+  on t.id = v.id)
+select * from iv;
+---- RESULTS
+'vvv0ttt'
+'vvv1ttt'
+'vvv2ttt'
+'vvv3ttt'
+'vvv4ttt'
+'vvv5ttt'
+'vvv6ttt'
+'vvv7ttt'
+---- TYPES
+STRING
+====
+---- QUERY
+# Test on local view (CTE). The join properties are associated at TableRef of
+# alltypestiny. Test those properties are migrated correctly in masking and
+# unmasking. See more in FromClause#reset().
+with iv as (select v.string_col
+  from functional.alltypes_view v
+  join functional.alltypestiny t
+  on t.id = v.id)
+select * from iv;
+---- RESULTS
+'vvv0ttt'
+'vvv1ttt'
+'vvv2ttt'
+'vvv3ttt'
+'vvv4ttt'
+'vvv5ttt'
+'vvv6ttt'
+'vvv7ttt'
+---- TYPES
+STRING
+====
+---- QUERY
+# Test on masking table inside correlated subquery.
+use functional;
+select id, string_col from alltypes a
+where exists (select id from alltypestiny where id = a.id)
+order by id
+---- RESULTS
+0,'0ttt'
+100,'1ttt'
+200,'2ttt'
+300,'3ttt'
+400,'4ttt'
+500,'5ttt'
+600,'6ttt'
+700,'7ttt'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Test on masking table inside correlated subquery.
+use functional;
+select id from alltypessmall
+where id * 10 in (select id from alltypestiny)
+order by id
+---- RESULTS
+0
+10
+20
+30
+40
+50
+60
+70
+---- TYPES
+INT
+====
+---- QUERY
+# Test on masking table inside uncorrelated subquery.
+use functional;
+select id from alltypessmall
+where id = (select count(1) from alltypestiny where id < 500)
+---- RESULTS
+5
+---- TYPES
+INT
+====
+---- QUERY
+# Test on CTAS
+create table $UNIQUE_DB.masked_tbl as select * from alltypestiny;
+select * from $UNIQUE_DB.masked_tbl;
+---- RESULTS
+0,NULL,0,0,0,0,0,0,'01/01/09','0aaa',2009-01-01 00:00:00,2009,1
+100,NULL,1,1,1,10,1.100000023841858,10.1,'01/01/09','1aaa',2009-01-01 00:01:00,2009,1
+200,NULL,0,0,0,0,0,0,'02/01/09','0aaa',2009-02-01 00:00:00,2009,2
+300,NULL,1,1,1,10,1.100000023841858,10.1,'02/01/09','1aaa',2009-02-01 00:01:00,2009,2
+400,NULL,0,0,0,0,0,0,'03/01/09','0aaa',2009-03-01 00:00:00,2009,3
+500,NULL,1,1,1,10,1.100000023841858,10.1,'03/01/09','1aaa',2009-03-01 00:01:00,2009,3
+600,NULL,0,0,0,0,0,0,'04/01/09','0aaa',2009-04-01 00:00:00,2009,4
+700,NULL,1,1,1,10,1.100000023841858,10.1,'04/01/09','1aaa',2009-04-01 00:01:00,2009,4
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Test on CreateView. Should not mask the columns when used in sql generations.
+create view $UNIQUE_DB.masked_view as select * from alltypestiny;
+show create view $UNIQUE_DB.masked_view;
+---- RESULTS
+'CREATE VIEW $UNIQUE_DB.masked_view AS\nSELECT * FROM functional.alltypestiny'
+====
+---- QUERY
+# Test on AlterView. Should not mask the columns when used in sql generations.
+alter view $UNIQUE_DB.masked_view as select id from alltypestiny;
+show create view $UNIQUE_DB.masked_view;
+---- RESULTS
+'CREATE VIEW $UNIQUE_DB.masked_view AS\nSELECT id FROM functional.alltypestiny'
+====
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 6af1199..b5402ce 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -30,6 +30,7 @@ from tests.util.calculation_util import get_random_id
 ADMIN = "admin"
 RANGER_AUTH = ("admin", "admin")
 RANGER_HOST = "http://localhost:6080"
+REST_HEADERS = {"Content-Type": "application/json", "Accept": "application/json"}
 IMPALAD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
                "--ranger_app_id=impala --authorization_provider=ranger"
 CATALOGD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
@@ -45,6 +46,10 @@ class TestRanger(CustomClusterTestSuite):
   Tests for Apache Ranger integration with Apache Impala.
   """
 
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
@@ -488,11 +493,9 @@ class TestRanger(CustomClusterTestSuite):
       "isRecursive": "false",
       "clusterName": "server1"
     }
-
-    headers = {"Content-Type": "application/json", "Accept": "application/json"}
     r = requests.post("{0}/service/plugins/services/grant/test_impala?pluginId=impala"
                       .format(RANGER_HOST),
-                      auth=RANGER_AUTH, json=data, headers=headers)
+                      auth=RANGER_AUTH, json=data, headers=REST_HEADERS)
     assert 200 <= r.status_code < 300
 
   @staticmethod
@@ -510,11 +513,9 @@ class TestRanger(CustomClusterTestSuite):
       "isRecursive": "false",
       "clusterName": "server1"
     }
-
-    headers = {"Content-Type": "application/json", "Accept": "application/json"}
     r = requests.post("{0}/service/plugins/services/revoke/test_impala?pluginId=impala"
                       .format(RANGER_HOST),
-                      auth=RANGER_AUTH, json=data, headers=headers)
+                      auth=RANGER_AUTH, json=data, headers=REST_HEADERS)
     assert 200 <= r.status_code < 300
 
   @staticmethod
@@ -534,19 +535,16 @@ class TestRanger(CustomClusterTestSuite):
 
   @staticmethod
   def _get_ranger_privileges(user):
-    headers = {"Content-Type": "application/json", "Accept": "application/json"}
     r = requests.get("{0}/service/plugins/policies"
                      .format(RANGER_HOST),
-                     auth=RANGER_AUTH, headers=headers)
+                     auth=RANGER_AUTH, headers=REST_HEADERS)
     return json.loads(r.content)["policies"]
 
   def _add_ranger_user(self, user):
     data = {"name": user, "password": "password123", "userRoleList": ["ROLE_USER"]}
-    headers = {"Content-Type": "application/json", "Accept": "application/json"}
-
     r = requests.post("{0}/service/xusers/secure/users".format(RANGER_HOST),
                       auth=RANGER_AUTH,
-                      json=data, headers=headers)
+                      json=data, headers=REST_HEADERS)
     return json.loads(r.content)["id"]
 
   def _remove_ranger_user(self, id):
@@ -555,6 +553,61 @@ class TestRanger(CustomClusterTestSuite):
     assert 300 > r.status_code >= 200
 
   @staticmethod
+  def _add_column_masking_policy(
+      policy_name, user, db, table, column, mask_type, value_expr=None):
+    """ Adds a column masking policy and returns the policy id"""
+    data = {
+      "name": policy_name,
+      "policyType": 1,
+      "serviceType": "hive",
+      "service": "test_impala",
+      "resources": {
+        "database": {
+          "values": [db],
+          "isExcludes": False,
+          "isRecursive": False
+        },
+        "table": {
+          "values": [table],
+          "isExcludes": False,
+          "isRecursive": False
+        },
+        "column": {
+          "values": [column],
+          "isExcludes": False,
+          "isRecursive": False
+        }
+      },
+      "dataMaskPolicyItems": [
+        {
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": True
+            }
+          ],
+          "users": [user],
+          "dataMaskInfo": {
+            "dataMaskType": mask_type,
+            "valueExpr": value_expr
+          }
+        }
+      ]
+    }
+    r = requests.post("{0}/service/public/v2/api/policy".format(RANGER_HOST),
+                      auth=RANGER_AUTH, json=data, headers=REST_HEADERS)
+    assert 300 > r.status_code >= 200, r.content
+    return json.loads(r.content)["id"]
+
+  @staticmethod
+  def _remove_column_masking_policy(policy_name):
+    r = requests.delete(
+        "{0}/service/public/v2/api/policy?servicename=test_impala&policyname={1}".format(
+            RANGER_HOST, policy_name),
+        auth=RANGER_AUTH, headers=REST_HEADERS)
+    assert 300 > r.status_code >= 200, r.content
+
+  @staticmethod
   def _check_privileges(result, expected):
     def columns(row):
       cols = row.split("\t")
@@ -735,3 +788,56 @@ class TestRanger(CustomClusterTestSuite):
         TestRanger._revoke_ranger_privilege("{OWNER}", resource, access)
     finally:
       self._run_query_as_user("drop database {0} cascade".format(test_db), ADMIN, True)
+
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def test_column_masking(self, vector, unique_name):
+    user = getuser()
+    unique_database = unique_name + '_db'
+    # Create another client for admin user since current user doesn't have privileges to
+    # create/drop databases or refresh authorization.
+    admin_client = self.create_impala_client()
+    admin_client.execute("drop database if exists %s cascade" % unique_database,
+                         user=ADMIN)
+    admin_client.execute("create database %s" % unique_database, user=ADMIN)
+    # Grant CREATE on database to current user for tests on CTAS, CreateView etc.
+    admin_client.execute("grant create on database %s to user %s"
+                         % (unique_database, user))
+    policy_cnt = 0
+    try:
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypestiny", "id",
+        "CUSTOM", "id * 100")   # use column name 'id' directly
+      policy_cnt += 1
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypestiny", "bool_col",
+        "MASK_NULL")
+      policy_cnt += 1
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypestiny", "string_col",
+        "CUSTOM", "concat({col}, 'aaa')")   # use column reference '{col}'
+      policy_cnt += 1
+      # Add policy to a view
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypes_view", "string_col",
+        "CUSTOM", "concat('vvv', {col})")
+      policy_cnt += 1
+      # Add policy to the table used in the view
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypes", "id",
+        "CUSTOM", "{col} * 100")
+      policy_cnt += 1
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional", "alltypes", "string_col",
+        "CUSTOM", "concat({col}, 'ttt')")
+      policy_cnt += 1
+      self.execute_query_expect_success(admin_client, "refresh authorization",
+                                        user=ADMIN)
+      self.run_test_case("QueryTest/ranger_column_masking", vector,
+                         test_file_vars={'$UNIQUE_DB': unique_database})
+    finally:
+      admin_client.execute("revoke create on database %s from user %s"
+                           % (unique_database, user))
+      admin_client.execute("drop database %s cascade" % unique_database)
+      for i in range(policy_cnt):
+        TestRanger._remove_column_masking_policy(unique_name + str(i))
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 8f5066e..2eaece5 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -682,6 +682,8 @@ class ImpalaTestSuite(BaseTestSuite):
         # ERRORS, TYPES, LABELS, etc. which doesn't make sense if there are two
         # different result sets to consider (IMPALA-4471).
         assert 'DML_RESULTS' not in test_section
+        test_section['RESULTS'] = self.__do_replacements(
+            test_section['RESULTS'], use_db=use_db, extra=test_file_vars)
         self.__verify_results_and_errors(vector, test_section, result, use_db)
       else:
         # TODO: Can't validate errors without expected results for now.