You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2023/02/27 06:50:53 UTC

[kyuubi] branch master updated: [KYUUBI #4202] Fix reference resolution when data masking enabled for V2 relations

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aed270c4 [KYUUBI #4202] Fix reference resolution when data masking enabled for V2 relations
5aed270c4 is described below

commit 5aed270c453516fd3559aa668e56b2e1badf1c93
Author: Kent Yao <ya...@apache.org>
AuthorDate: Mon Feb 27 14:50:39 2023 +0800

    [KYUUBI #4202] Fix reference resolution when data masking enabled for V2 relations
    
    ### _Why are the changes needed?_
    
    Follow up to close #4304 with a complete solution.
    
    Before this PR, we did row filtering, and data masking in the same analyzer rule as both of them added patterns upon the scans, which are leaf nodes.
    
    After this, we separate them into two different rules. Because the filtering still adds filter patterns upon the leaves, but the masking now adds masker expressions upon the leaves first and later on the root of a query.
    
    The reason why we move the maskers from leaves to the root is to fix attribute resolution errors. The errors #4202 occur when we try to insert a node with resolved expressions between the root and its child, while if the parents are resolved ahead, the newly added expressions may become missing attributes. The new approach can avoid the bug.
    
    The full data masking rule contains two separate stages.
    
     * Step1: RuleApplyDataMaskingStage0
        - lookup the full plan for supported scans
        - once found, get masker configuration from external column by column
        - use spark sql parser to generate an unresolved expression for each masker
        - add a projection with new output on the right top of the original scan if the output has      changed
        - Add DataMaskingStage0Marker to track the original expression and its masker expression.
    
     * Step2: Spark native rules will resolve our newly added maskers
    
     * Step3: RuleApplyDataMaskingStage1
       - It will fulfill the missing attributes with a related masker expression buffered by DataMaskingStage0Marker.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4358 from yaooqinn/PR_4304.
    
    Closes #4202
    
    18d6a9b4a [Kent Yao] nit
    fc11f0250 [liangbowen] nit
    39a72afb6 [Kent Yao] fix 3.0
    ad1209abe [Kent Yao] iceberg catalog 3.1
    c0db77d93 [Kent Yao] jdbc catalog 3.1
    75ee799d4 [Kent Yao] perm view
    7f8b929a7 [liangbowen] unused imports
    a03fc690a [liangbowen] skip cleanup derby test db in iceberg suite
    20ffed95e [liangbowen] sort union result
    d397cc1d6 [Kent Yao] add more tests for data masking
    72143798f [Kent Yao] add more tests for data masking
    96f521574 [Kent Yao] new approach temp commit
    5271f7d0e [Kent Yao] addr
    ce5ac0c32 [Kent Yao] Update extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala
    7776b807e [Kent Yao] fix
    aecbc0080 [Kent Yao] fix
    0f710a4b2 [Kent Yao] [KYUUBI #4202] Fix Datamasking for V2 relations
    213705145 [Kent Yao] Merge branch 'master' into PR_4304
    138dd0d63 [Kent Yao] [KYUUBI #4202] Fix Datamasking for V2 relations
    42ca7e4d9 [Kent Yao] [KYUUBI #4202] Fix Datamasking for V2 relations
    fd3bec37d [Kent Yao] [KYUUBI #4202] Fix Datamasking for V2 relations
    5880da8b6 [liangbowen] add ut for v2jdbc
    ce49feef5 [liangbowen] add ut of column masking for iceberg
    
    Lead-authored-by: Kent Yao <ya...@apache.org>
    Co-authored-by: liangbowen <li...@gf.com.cn>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../src/main/resources/table_command_spec.json     |  17 ++
 .../spark/authz/ranger/RangerSparkExtension.scala  |   6 +-
 .../ranger/RuleApplyRowFilterAndDataMasking.scala  |  90 -------
 .../plugin/spark/authz/ranger/RuleHelper.scala     |  52 ++++
 .../authz/ranger/SparkRangerAdminPlugin.scala      |   3 +-
 .../datamasking/DataMaskingStage0Marker.scala}     |  18 +-
 .../datamasking/DataMaskingStage1Marker.scala}     |  10 +-
 .../datamasking/RuleApplyDataMaskingStage0.scala   |  74 ++++++
 .../datamasking/RuleApplyDataMaskingStage1.scala   |  84 +++++++
 .../rowfilter/RowFilterMarker.scala}               |  10 +-
 .../ranger/rowfilter/RuleApplyRowFilter.scala      |  50 ++++
 .../spark/authz/util/RuleEliminateMarker.scala     |  16 +-
 .../src/test/resources/sparkSql_hive_jenkins.json  |   7 +-
 .../plugin/spark/authz/SparkSessionProvider.scala  |  24 ++
 .../plugin/spark/authz/gen/TableCommands.scala     |   8 +
 .../authz/ranger/RangerSparkExtensionSuite.scala   | 159 ------------
 ...JdbcTableCatalogRangerSparkExtensionSuite.scala |   2 +-
 .../DataMaskingForHiveHiveParquetSuite.scala}      |  12 +-
 .../DataMaskingForHiveParquetSuite.scala}          |  12 +-
 .../datamasking/DataMaskingForIcebergSuite.scala   |  64 +++++
 .../DataMaskingForInMemoryParquetSuite.scala}      |  11 +-
 .../datamasking/DataMaskingForJDBCV2Suite.scala    |  65 +++++
 .../ranger/datamasking/DataMaskingTestBase.scala   | 267 +++++++++++++++++++++
 23 files changed, 766 insertions(+), 295 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index d36690bcf..f1c2297b3 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1176,6 +1176,23 @@
   } ],
   "opType" : "TRUNCATETABLE",
   "queryDescs" : [ ]
+}, {
+  "classname" : "org.apache.spark.sql.execution.datasources.CreateTable",
+  "tableDescs" : [ {
+    "fieldName" : "tableDesc",
+    "fieldExtractor" : "CatalogTableTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  } ],
+  "opType" : "CREATETABLE",
+  "queryDescs" : [ {
+    "fieldName" : "query",
+    "fieldExtractor" : "LogicalPlanOptionQueryExtractor"
+  } ]
 }, {
   "classname" : "org.apache.spark.sql.execution.datasources.CreateTempViewUsing",
   "tableDescs" : [ ],
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
index 5708dfeaf..f8e941d9d 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
@@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import org.apache.spark.sql.SparkSessionExtensions
 
+import org.apache.kyuubi.plugin.spark.authz.ranger.datamasking.{RuleApplyDataMaskingStage0, RuleApplyDataMaskingStage1}
+import org.apache.kyuubi.plugin.spark.authz.ranger.rowfilter.RuleApplyRowFilter
 import org.apache.kyuubi.plugin.spark.authz.util.{RuleEliminateMarker, RuleEliminateViewMarker}
 
 /**
@@ -42,7 +44,9 @@ class RangerSparkExtension extends (SparkSessionExtensions => Unit) {
     v1.injectCheckRule(AuthzConfigurationChecker)
     v1.injectResolutionRule(_ => new RuleReplaceShowObjectCommands())
     v1.injectResolutionRule(_ => new RuleApplyPermanentViewMarker())
-    v1.injectResolutionRule(new RuleApplyRowFilterAndDataMasking(_))
+    v1.injectResolutionRule(RuleApplyRowFilter)
+    v1.injectResolutionRule(RuleApplyDataMaskingStage0)
+    v1.injectResolutionRule(RuleApplyDataMaskingStage1)
     v1.injectOptimizerRule(_ => new RuleEliminateMarker())
     v1.injectOptimizerRule(new RuleAuthorization(_))
     v1.injectOptimizerRule(_ => new RuleEliminateViewMarker())
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
deleted file mode 100644
index b6961c924..000000000
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.plugin.spark.authz.ranger
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.rules.Rule
-
-import org.apache.kyuubi.plugin.spark.authz.ObjectType
-import org.apache.kyuubi.plugin.spark.authz.serde._
-import org.apache.kyuubi.plugin.spark.authz.util.{PermanentViewMarker, RowFilterAndDataMaskingMarker}
-import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
-
-class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] {
-  private def mapChildren(plan: LogicalPlan)(f: LogicalPlan => LogicalPlan): LogicalPlan = {
-    val newChildren = plan match {
-      case cmd if isKnownTableCommand(cmd) =>
-        val tableCommandSpec = getTableCommandSpec(cmd)
-        val queries = tableCommandSpec.queries(cmd)
-        cmd.children.map {
-          case c if queries.contains(c) => f(c)
-          case other => other
-        }
-      case _ =>
-        plan.children.map(f)
-    }
-    plan.withNewChildren(newChildren)
-  }
-
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    mapChildren(plan) {
-      case p: RowFilterAndDataMaskingMarker => p
-      case scan if isKnownScan(scan) && scan.resolved =>
-        val tables = getScanSpec(scan).tables(scan, spark)
-        tables.headOption.map(applyFilterAndMasking(scan, _)).getOrElse(scan)
-      case other => apply(other)
-    }
-  }
-
-  private def applyFilterAndMasking(
-      plan: LogicalPlan,
-      table: Table): LogicalPlan = {
-    val ugi = getAuthzUgi(spark.sparkContext)
-    val opType = operationType(plan)
-    val parse = spark.sessionState.sqlParser.parseExpression _
-    val are = AccessResource(ObjectType.TABLE, table.database.orNull, table.table, null)
-    val art = AccessRequest(are, ugi, opType, AccessType.SELECT)
-    val filterExprStr = SparkRangerAdminPlugin.getFilterExpr(art)
-    val newOutput = plan.output.map { attr =>
-      val are =
-        AccessResource(ObjectType.COLUMN, table.database.orNull, table.table, attr.name)
-      val art = AccessRequest(are, ugi, opType, AccessType.SELECT)
-      val maskExprStr = SparkRangerAdminPlugin.getMaskingExpr(art)
-      if (maskExprStr.isEmpty) {
-        attr
-      } else {
-        val maskExpr = parse(maskExprStr.get)
-        plan match {
-          case _: PermanentViewMarker =>
-            Alias(maskExpr, attr.name)(exprId = attr.exprId)
-          case _ =>
-            Alias(maskExpr, attr.name)()
-        }
-      }
-    }
-
-    if (filterExprStr.isEmpty) {
-      Project(newOutput, RowFilterAndDataMaskingMarker(plan))
-    } else {
-      val filterExpr = parse(filterExprStr.get)
-      Project(newOutput, Filter(filterExpr, RowFilterAndDataMaskingMarker(plan)))
-    }
-  }
-}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleHelper.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleHelper.scala
new file mode 100644
index 000000000..3cfe2b940
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleHelper.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import org.apache.kyuubi.plugin.spark.authz.serde.{getTableCommandSpec, isKnownTableCommand}
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils
+
+trait RuleHelper extends Rule[LogicalPlan] {
+
+  def spark: SparkSession
+
+  final protected val parse: String => Expression = spark.sessionState.sqlParser.parseExpression _
+
+  protected def mapChildren(plan: LogicalPlan)(f: LogicalPlan => LogicalPlan): LogicalPlan = {
+    val newChildren = plan match {
+      case cmd if isKnownTableCommand(cmd) =>
+        val tableCommandSpec = getTableCommandSpec(cmd)
+        val queries = tableCommandSpec.queries(cmd)
+        cmd.children.map {
+          case c if queries.contains(c) => f(c)
+          case other => other
+        }
+      case _ =>
+        plan.children.map(f)
+    }
+    plan.withNewChildren(newChildren)
+  }
+
+  def ugi: UserGroupInformation = AuthZUtils.getAuthzUgi(spark.sparkContext)
+
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
index 3d46563aa..78e59ff89 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
@@ -18,8 +18,7 @@
 package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.LinkedHashMap
+import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
 
 import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingStage0Marker.scala
similarity index 64%
copy from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
copy to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingStage0Marker.scala
index 357e9bfc2..b43149383 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingStage0Marker.scala
@@ -15,17 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.spark.authz.util
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
 
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
 
-case class RowFilterAndDataMaskingMarker(child: LogicalPlan) extends UnaryNode
-  with WithInternalChild {
+import org.apache.kyuubi.plugin.spark.authz.util.WithInternalChild
+case class DataMaskingStage0Marker(child: LogicalPlan, scan: LogicalPlan)
+  extends UnaryNode with WithInternalChild {
+
+  def exprToMaskers(): Map[ExprId, Attribute] = {
+    scan.output.map(_.exprId).zip(child.output).flatMap { case (id, expr) =>
+      if (id == expr.exprId) None else Some(id -> expr)
+    }.toMap
+  }
 
   override def output: Seq[Attribute] = child.output
 
-  override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(child = newChild)
+  override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(child = newChild)
 
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingStage1Marker.scala
similarity index 80%
copy from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
copy to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingStage1Marker.scala
index 357e9bfc2..aed0ac693 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingStage1Marker.scala
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.spark.authz.util
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
 
-case class RowFilterAndDataMaskingMarker(child: LogicalPlan) extends UnaryNode
-  with WithInternalChild {
+import org.apache.kyuubi.plugin.spark.authz.util.WithInternalChild
+
+case class DataMaskingStage1Marker(child: LogicalPlan) extends UnaryNode with WithInternalChild {
 
   override def output: Seq[Attribute] = child.output
 
-  override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(child = newChild)
+  override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(child = newChild)
 
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/RuleApplyDataMaskingStage0.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/RuleApplyDataMaskingStage0.scala
new file mode 100644
index 000000000..de125550a
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/RuleApplyDataMaskingStage0.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+
+import org.apache.kyuubi.plugin.spark.authz.ObjectType
+import org.apache.kyuubi.plugin.spark.authz.OperationType.QUERY
+import org.apache.kyuubi.plugin.spark.authz.ranger._
+import org.apache.kyuubi.plugin.spark.authz.serde._
+
+/**
+ * The full data masking rule contains two separate stages.
+ *
+ * Step1: RuleApplyDataMaskingStage0
+ *   - lookup the full plan for supported scans
+ *   - once found, get masker configuration from external column by column
+ *   - use spark sql parser to generate an unresolved expression for each masker
+ *   - add a projection with new output on the right top of the original scan if the output has
+ *     changed
+ *   - Add DataMaskingStage0Marker to track the original expression and its masker expression.
+ *
+ * Step2: Spark native rules will resolve our newly added maskers
+ *
+ * Step3: [[RuleApplyDataMaskingStage1]]
+ */
+case class RuleApplyDataMaskingStage0(spark: SparkSession) extends RuleHelper {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    val newPlan = mapChildren(plan) {
+      case p: DataMaskingStage0Marker => p
+      case p: DataMaskingStage1Marker => p
+      case scan if isKnownScan(scan) && scan.resolved =>
+        val tables = getScanSpec(scan).tables(scan, spark)
+        tables.headOption.map(applyMasking(scan, _)).getOrElse(scan)
+      case other => apply(other)
+    }
+    newPlan
+  }
+
+  private def applyMasking(
+      plan: LogicalPlan,
+      table: Table): LogicalPlan = {
+    val newOutput = plan.output.map { attr =>
+      val are =
+        AccessResource(ObjectType.COLUMN, table.database.orNull, table.table, attr.name)
+      val art = AccessRequest(are, ugi, QUERY, AccessType.SELECT)
+      val maskExprStr = SparkRangerAdminPlugin.getMaskingExpr(art)
+      maskExprStr.map(parse).map(Alias(_, attr.name)()).getOrElse(attr)
+    }
+    if (newOutput == plan.output) {
+      plan
+    } else {
+      DataMaskingStage0Marker(Project(newOutput, plan), plan)
+    }
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/RuleApplyDataMaskingStage1.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/RuleApplyDataMaskingStage1.scala
new file mode 100644
index 000000000..9589be2e9
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/RuleApplyDataMaskingStage1.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+
+import org.apache.kyuubi.plugin.spark.authz.ranger.RuleHelper
+import org.apache.kyuubi.plugin.spark.authz.serde._
+
+/**
+ * See [[RuleApplyDataMaskingStage0]] also.
+ *
+ * This is the second step for data masking. It will fulfill the missing attributes that
+ * have a related masker expression buffered by DataMaskingStage0Marker.
+ */
+case class RuleApplyDataMaskingStage1(spark: SparkSession) extends RuleHelper {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+
+    plan match {
+      case marker0: DataMaskingStage0Marker => marker0
+      case marker1: DataMaskingStage1Marker => marker1
+      case cmd if isKnownTableCommand(cmd) =>
+        val tableCommandSpec = getTableCommandSpec(cmd)
+        val queries = tableCommandSpec.queries(cmd)
+        cmd.mapChildren {
+          case marker0: DataMaskingStage0Marker => marker0
+          case marker1: DataMaskingStage1Marker => marker1
+          case query if queries.contains(query) && query.resolved =>
+            applyDataMasking(query)
+          case o => o
+        }
+      case cmd: Command if cmd.childrenResolved =>
+        cmd.mapChildren(applyDataMasking)
+      case cmd: Command => cmd
+      case other if other.resolved => applyDataMasking(other)
+      case other => other
+    }
+  }
+
+  private def applyDataMasking(plan: LogicalPlan): LogicalPlan = {
+    assert(plan.resolved, "the current masking approach relies on a resolved plan")
+    def replaceOriginExprWithMasker(plan: LogicalPlan): LogicalPlan = plan match {
+      case m: DataMaskingStage0Marker => m
+      case m: DataMaskingStage1Marker => m
+      case p =>
+        val maskerExprs = p.collect {
+          case marker: DataMaskingStage0Marker if marker.resolved => marker.exprToMaskers()
+        }.flatten.toMap
+        if (maskerExprs.isEmpty) {
+          p
+        } else {
+          val t = p.transformExpressionsUp {
+            case e: NamedExpression => maskerExprs.getOrElse(e.exprId, e)
+          }
+          t.withNewChildren(t.children.map(replaceOriginExprWithMasker))
+        }
+    }
+    val newPlan = replaceOriginExprWithMasker(plan)
+
+    if (newPlan == plan) {
+      plan
+    } else {
+      DataMaskingStage1Marker(newPlan)
+    }
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfilter/RowFilterMarker.scala
similarity index 80%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfilter/RowFilterMarker.scala
index 357e9bfc2..8817958b5 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfilter/RowFilterMarker.scala
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.spark.authz.util
+package org.apache.kyuubi.plugin.spark.authz.ranger.rowfilter
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
 
-case class RowFilterAndDataMaskingMarker(child: LogicalPlan) extends UnaryNode
-  with WithInternalChild {
+import org.apache.kyuubi.plugin.spark.authz.util.WithInternalChild
+
+case class RowFilterMarker(child: LogicalPlan) extends UnaryNode with WithInternalChild {
 
   override def output: Seq[Attribute] = child.output
 
-  override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
-    copy(child = newChild)
+  override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(child = newChild)
 
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfilter/RuleApplyRowFilter.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfilter/RuleApplyRowFilter.scala
new file mode 100644
index 000000000..22bcfae49
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/rowfilter/RuleApplyRowFilter.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger.rowfilter
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+
+import org.apache.kyuubi.plugin.spark.authz.ObjectType
+import org.apache.kyuubi.plugin.spark.authz.OperationType.QUERY
+import org.apache.kyuubi.plugin.spark.authz.ranger._
+import org.apache.kyuubi.plugin.spark.authz.serde._
+
+case class RuleApplyRowFilter(spark: SparkSession) extends RuleHelper {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    val newPlan = mapChildren(plan) {
+      case p: RowFilterMarker => p
+      case scan if isKnownScan(scan) && scan.resolved =>
+        val tables = getScanSpec(scan).tables(scan, spark)
+        tables.headOption.map(applyFilter(scan, _)).getOrElse(scan)
+      case other => apply(other)
+    }
+    newPlan
+  }
+
+  private def applyFilter(
+      plan: LogicalPlan,
+      table: Table): LogicalPlan = {
+    val are = AccessResource(ObjectType.TABLE, table.database.orNull, table.table, null)
+    val art = AccessRequest(are, ugi, QUERY, AccessType.SELECT)
+    val filterExpr = SparkRangerAdminPlugin.getFilterExpr(art).map(parse)
+    val filtered = filterExpr.foldLeft(plan)((p, expr) => Filter(expr, RowFilterMarker(p)))
+    filtered
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
index d2da72570..448439b84 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
@@ -17,11 +17,25 @@
 
 package org.apache.kyuubi.plugin.spark.authz.util
 
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
+import org.apache.kyuubi.plugin.spark.authz.ranger.datamasking.{DataMaskingStage0Marker, DataMaskingStage1Marker}
+import org.apache.kyuubi.plugin.spark.authz.ranger.rowfilter.RowFilterMarker
+
 class RuleEliminateMarker extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.child }
+    plan.transformUp { case p =>
+      p.transformExpressionsUp {
+        case p: SubqueryExpression =>
+          p.withNewPlan(apply(p.plan))
+      } match {
+        case marker0: DataMaskingStage0Marker => marker0.child
+        case marker1: DataMaskingStage1Marker => marker1.child
+        case rf: RowFilterMarker => rf.child
+        case other => other
+      }
+    }
   }
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
index b5b069c46..84b0e30eb 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
+++ b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
@@ -280,7 +280,8 @@
           "values": [
             "default",
             "spark_catalog",
-            "iceberg_ns"
+            "iceberg_ns",
+            "ns1"
           ],
           "isExcludes": false,
           "isRecursive": false
@@ -900,7 +901,9 @@
         "database": {
           "values": [
             "default",
-            "spark_catalog"
+            "spark_catalog",
+            "iceberg_ns",
+            "ns1"
           ],
           "isExcludes": false,
           "isRecursive": false
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
index 0ab88917b..a1f2d7197 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
@@ -71,4 +71,28 @@ trait SparkSessionProvider {
 
   protected val sql: String => DataFrame = spark.sql
 
+  protected def doAs[T](user: String, f: => T): T = {
+    UserGroupInformation.createRemoteUser(user).doAs[T](
+      new PrivilegedExceptionAction[T] {
+        override def run(): T = f
+      })
+  }
+  protected def withCleanTmpResources[T](res: Seq[(String, String)])(f: => T): T = {
+    try {
+      f
+    } finally {
+      res.foreach {
+        case (t, "table") => doAs("admin", sql(s"DROP TABLE IF EXISTS $t"))
+        case (db, "database") => doAs("admin", sql(s"DROP DATABASE IF EXISTS $db"))
+        case (fn, "function") => doAs("admin", sql(s"DROP FUNCTION IF EXISTS $fn"))
+        case (view, "view") => doAs("admin", sql(s"DROP VIEW IF EXISTS $view"))
+        case (cacheTable, "cache") => if (isSparkV32OrGreater) {
+            doAs("admin", sql(s"UNCACHE TABLE IF EXISTS $cacheTable"))
+          }
+        case (_, e) =>
+          throw new RuntimeException(s"the resource whose resource type is $e cannot be cleared")
+      }
+    }
+  }
+
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
index d24583e76..a8b8121e2 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
@@ -346,6 +346,13 @@ object TableCommands {
     TableCommandSpec(cmd, Nil, CREATEVIEW)
   }
 
+  val CreateTable = {
+    val cmd = "org.apache.spark.sql.execution.datasources.CreateTable"
+    val tableDesc = TableDesc("tableDesc", classOf[CatalogTableTableExtractor])
+    val queryDesc = QueryDesc("query", "LogicalPlanOptionQueryExtractor")
+    TableCommandSpec(cmd, Seq(tableDesc), CREATETABLE, queryDescs = Seq(queryDesc))
+  }
+
   val CreateDataSourceTable = {
     val cmd = "org.apache.spark.sql.execution.command.CreateDataSourceTableCommand"
     val tableDesc = TableDesc("table", classOf[CatalogTableTableExtractor])
@@ -607,6 +614,7 @@ object TableCommands {
     CreateHiveTableAsSelect,
     CreateHiveTableAsSelect.copy(classname =
       "org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand"),
+    CreateTable,
     CreateTableLike,
     CreateTableV2,
     CreateTableV2.copy(classname =
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 8f95a3f9f..48f374255 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -17,12 +17,8 @@
 
 package org.apache.kyuubi.plugin.spark.authz.ranger
 
-import java.security.PrivilegedExceptionAction
-import java.sql.Timestamp
-
 import scala.util.Try
 
-import org.apache.commons.codec.digest.DigestUtils
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.spark.sql.{Row, SparkSessionExtensions}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -43,13 +39,6 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
   // scalastyle:on
   override protected val extension: SparkSessionExtensions => Unit = new RangerSparkExtension
 
-  protected def doAs[T](user: String, f: => T): T = {
-    UserGroupInformation.createRemoteUser(user).doAs[T](
-      new PrivilegedExceptionAction[T] {
-        override def run(): T = f
-      })
-  }
-
   override def afterAll(): Unit = {
     spark.stop()
     super.afterAll()
@@ -62,24 +51,6 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
     s"Permission denied: user [$user] does not have [$privilege] privilege on [$resource]"
   }
 
-  protected def withCleanTmpResources[T](res: Seq[(String, String)])(f: => T): T = {
-    try {
-      f
-    } finally {
-      res.foreach {
-        case (t, "table") => doAs("admin", sql(s"DROP TABLE IF EXISTS $t"))
-        case (db, "database") => doAs("admin", sql(s"DROP DATABASE IF EXISTS $db"))
-        case (fn, "function") => doAs("admin", sql(s"DROP FUNCTION IF EXISTS $fn"))
-        case (view, "view") => doAs("admin", sql(s"DROP VIEW IF EXISTS $view"))
-        case (cacheTable, "cache") => if (isSparkV32OrGreater) {
-            doAs("admin", sql(s"UNCACHE TABLE IF EXISTS $cacheTable"))
-          }
-        case (_, e) =>
-          throw new RuntimeException(s"the resource whose resource type is $e cannot be cleared")
-      }
-    }
-  }
-
   /**
    * Drops temporary view `viewNames` after calling `f`.
    */
@@ -324,135 +295,6 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
     }
   }
 
-  test("data masking") {
-    val db = "default"
-    val table = "src"
-    val col = "key"
-    val create =
-      s"CREATE TABLE IF NOT EXISTS $db.$table" +
-        s" ($col int, value1 int, value2 string, value3 string, value4 timestamp, value5 string)" +
-        s" USING $format"
-
-    withCleanTmpResources(Seq(
-      (s"$db.${table}2", "table"),
-      (s"$db.$table", "table"))) {
-      doAs("admin", assert(Try { sql(create) }.isSuccess))
-      doAs(
-        "admin",
-        sql(
-          s"INSERT INTO $db.$table SELECT 1, 1, 'hello', 'world', " +
-            s"timestamp'2018-11-17 12:34:56', 'World'"))
-      doAs(
-        "admin",
-        sql(
-          s"INSERT INTO $db.$table SELECT 20, 2, 'kyuubi', 'y', " +
-            s"timestamp'2018-11-17 12:34:56', 'world'"))
-      doAs(
-        "admin",
-        sql(
-          s"INSERT INTO $db.$table SELECT 30, 3, 'spark', 'a'," +
-            s" timestamp'2018-11-17 12:34:56', 'world'"))
-
-      doAs(
-        "kent",
-        assert(sql(s"SELECT key FROM $db.$table order by key").collect() ===
-          Seq(Row(1), Row(20), Row(30))))
-
-      Seq(
-        s"SELECT value1, value2, value3, value4, value5 FROM $db.$table",
-        s"SELECT value1 as key, value2, value3, value4, value5 FROM $db.$table",
-        s"SELECT max(value1), max(value2), max(value3), max(value4), max(value5) FROM $db.$table",
-        s"SELECT coalesce(max(value1), 1), coalesce(max(value2), 1), coalesce(max(value3), 1), " +
-          s"coalesce(max(value4), timestamp '2018-01-01 22:33:44'), coalesce(max(value5), 1) " +
-          s"FROM $db.$table",
-        s"SELECT value1, value2, value3, value4, value5 FROM $db.$table WHERE value2 in" +
-          s" (SELECT value2 as key FROM $db.$table)")
-        .foreach { q =>
-          doAs(
-            "bob", {
-              withClue(q) {
-                assert(sql(q).collect() ===
-                  Seq(
-                    Row(
-                      DigestUtils.md5Hex("1"),
-                      "xxxxx",
-                      "worlx",
-                      Timestamp.valueOf("2018-01-01 00:00:00"),
-                      "Xorld")))
-              }
-            })
-        }
-      doAs(
-        "bob", {
-          sql(s"CREATE TABLE $db.src2 using $format AS SELECT value1 FROM $db.$table")
-          assert(sql(s"SELECT value1 FROM $db.${table}2").collect() ===
-            Seq(Row(DigestUtils.md5Hex("1"))))
-        })
-    }
-  }
-
-  test("[KYUUBI #3581]: data masking on permanent view") {
-    assume(isSparkV31OrGreater)
-
-    val db = "default"
-    val table = "src"
-    val permView = "perm_view"
-    val col = "key"
-    val create =
-      s"CREATE TABLE IF NOT EXISTS $db.$table" +
-        s" ($col int, value1 int, value2 string)" +
-        s" USING $format"
-
-    val createView =
-      s"CREATE OR REPLACE VIEW $db.$permView" +
-        s" AS SELECT * FROM $db.$table"
-
-    withCleanTmpResources(Seq(
-      (s"$db.$table", "table"),
-      (s"$db.$permView", "view"))) {
-      doAs("admin", assert(Try { sql(create) }.isSuccess))
-      doAs("admin", assert(Try { sql(createView) }.isSuccess))
-      doAs(
-        "admin",
-        sql(
-          s"INSERT INTO $db.$table SELECT 1, 1, 'hello'"))
-
-      Seq(
-        s"SELECT value1, value2 FROM $db.$permView")
-        .foreach { q =>
-          doAs(
-            "perm_view_user", {
-              withClue(q) {
-                assert(sql(q).collect() ===
-                  Seq(
-                    Row(
-                      DigestUtils.md5Hex("1"),
-                      "hello")))
-              }
-            })
-        }
-    }
-  }
-
-  test("KYUUBI #2390: RuleEliminateMarker stays in analyze phase for data masking") {
-    val db = "default"
-    val table = "src"
-    val create =
-      s"CREATE TABLE IF NOT EXISTS $db.$table (key int, value1 int) USING $format"
-
-    withCleanTmpResources(Seq((s"$db.$table", "table"))) {
-      doAs("admin", sql(create))
-      doAs("admin", sql(s"INSERT INTO $db.$table SELECT 1, 1"))
-      // scalastyle: off
-      doAs(
-        "bob", {
-          assert(sql(s"select * from $db.$table").collect() ===
-            Seq(Row(1, DigestUtils.md5Hex("1"))))
-          assert(Try(sql(s"select * from $db.$table").show(1)).isSuccess)
-        })
-    }
-  }
-
   test("show tables") {
     val db = "default2"
     val table = "src"
@@ -680,7 +522,6 @@ class InMemoryCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
 
 class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
   override protected val catalogImpl: String = "hive"
-
   test("table stats must be specified") {
     val table = "hive_src"
     withCleanTmpResources(Seq((table, "table"))) {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
index 9f980c27a..73a13bc1c 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
@@ -104,7 +104,7 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu
     val e1 = intercept[AccessControlException](
       doAs("someone", sql(s"select city, id from $catalogV2.$namespace1.$table1").explain()))
     assert(e1.getMessage.contains(s"does not have [select] privilege" +
-      s" on [$namespace1/$table1/id]"))
+      s" on [$namespace1/$table1/city]"))
   }
 
   test("[KYUUBI #4255] DESCRIBE TABLE") {
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForHiveHiveParquetSuite.scala
similarity index 69%
copy from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
copy to extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForHiveHiveParquetSuite.scala
index d2da72570..ccc694f9b 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForHiveHiveParquetSuite.scala
@@ -15,13 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.spark.authz.util
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
 
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-
-class RuleEliminateMarker extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.child }
-  }
+class DataMaskingForHiveHiveParquetSuite extends DataMaskingTestBase {
+  override protected val catalogImpl: String = "hive"
+  override protected def format: String = "USING hive OPTIONS(fileFormat='parquet')"
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForHiveParquetSuite.scala
similarity index 69%
copy from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
copy to extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForHiveParquetSuite.scala
index d2da72570..ba254abbd 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForHiveParquetSuite.scala
@@ -15,13 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.spark.authz.util
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
 
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
-
-class RuleEliminateMarker extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.child }
-  }
+class DataMaskingForHiveParquetSuite extends DataMaskingTestBase {
+  override protected val catalogImpl: String = "hive"
+  override protected def format: String = "USING parquet"
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForIcebergSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForIcebergSuite.scala
new file mode 100644
index 000000000..99b7eb973
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForIcebergSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
+
+import org.apache.spark.SparkConf
+import org.scalatest.Outcome
+
+import org.apache.kyuubi.Utils
+
+class DataMaskingForIcebergSuite extends DataMaskingTestBase {
+  override protected val extraSparkConf: SparkConf = {
+    val conf = new SparkConf()
+
+    if (isSparkV31OrGreater) {
+      conf
+        .set("spark.sql.defaultCatalog", "testcat")
+        .set(
+          "spark.sql.catalog.testcat",
+          "org.apache.iceberg.spark.SparkCatalog")
+        .set(s"spark.sql.catalog.testcat.type", "hadoop")
+        .set(
+          "spark.sql.catalog.testcat.warehouse",
+          Utils.createTempDir("iceberg-hadoop").toString)
+    }
+    conf
+
+  }
+
+  override protected val catalogImpl: String = "in-memory"
+
+  override protected def format: String = "USING iceberg"
+
+  override def beforeAll(): Unit = {
+    if (isSparkV31OrGreater) {
+      super.beforeAll()
+    }
+  }
+
+  override def afterAll(): Unit = {
+    if (isSparkV31OrGreater) {
+      super.afterAll()
+    }
+  }
+
+  override def withFixture(test: NoArgTest): Outcome = {
+    assume(isSparkV31OrGreater)
+    test()
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForInMemoryParquetSuite.scala
similarity index 69%
copy from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
copy to extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForInMemoryParquetSuite.scala
index d2da72570..1bfb71e79 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForInMemoryParquetSuite.scala
@@ -15,13 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.spark.authz.util
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
 
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
+class DataMaskingForInMemoryParquetSuite extends DataMaskingTestBase {
 
-class RuleEliminateMarker extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.child }
-  }
+  override protected val catalogImpl: String = "in-memory"
+  override protected def format: String = "USING parquet"
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForJDBCV2Suite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForJDBCV2Suite.scala
new file mode 100644
index 000000000..894daeaf7
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingForJDBCV2Suite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
+import java.sql.DriverManager
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.scalatest.Outcome
+
+class DataMaskingForJDBCV2Suite extends DataMaskingTestBase {
+  override protected val extraSparkConf: SparkConf = {
+    val conf = new SparkConf()
+    if (isSparkV31OrGreater) {
+      conf
+        .set("spark.sql.defaultCatalog", "testcat")
+        .set(
+          "spark.sql.catalog.testcat",
+          "org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
+        .set(s"spark.sql.catalog.testcat.url", "jdbc:derby:memory:testcat;create=true")
+        .set(
+          s"spark.sql.catalog.testcat.driver",
+          "org.apache.derby.jdbc.AutoloadedDriver")
+    }
+    conf
+  }
+
+  override protected val catalogImpl: String = "in-memory"
+
+  override protected def format: String = ""
+
+  override def beforeAll(): Unit = {
+    if (isSparkV31OrGreater) super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    if (isSparkV31OrGreater) {
+      super.afterAll()
+      // cleanup db
+      Try {
+        DriverManager.getConnection(s"jdbc:derby:memory:testcat;shutdown=true")
+      }
+    }
+  }
+
+  override def withFixture(test: NoArgTest): Outcome = {
+    assume(isSparkV31OrGreater)
+    test()
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingTestBase.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingTestBase.scala
new file mode 100644
index 000000000..c13362617
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/datamasking/DataMaskingTestBase.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger.datamasking
+
+// scalastyle:off
+import java.sql.Timestamp
+
+import scala.util.Try
+
+import org.apache.commons.codec.digest.DigestUtils.md5Hex
+import org.apache.spark.sql.{Row, SparkSessionExtensions}
+import org.scalatest.{Assertion, BeforeAndAfterAll}
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
+import org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension
+
+/**
+ * Base trait for data masking tests, derivative classes shall name themselves following:
+ *  DataMaskingFor CatalogImpl?  FileFormat? Additions? Suite
+ */
+trait DataMaskingTestBase extends AnyFunSuite with SparkSessionProvider with BeforeAndAfterAll {
+// scalastyle:on
+  override protected val extension: SparkSessionExtensions => Unit = new RangerSparkExtension
+
+  private def setup(): Unit = {
+    sql(s"CREATE TABLE IF NOT EXISTS default.src" +
+      "(key int," +
+      " value1 int," +
+      " value2 string," +
+      " value3 string," +
+      " value4 timestamp," +
+      " value5 string)" +
+      s" $format")
+
+    // NOTICE: `bob` has a row filter `key < 20`
+    sql("INSERT INTO default.src " +
+      "SELECT 1, 1, 'hello', 'world', timestamp'2018-11-17 12:34:56', 'World'")
+    sql("INSERT INTO default.src " +
+      "SELECT 20, 2, 'kyuubi', 'y', timestamp'2018-11-17 12:34:56', 'world'")
+    sql("INSERT INTO default.src " +
+      "SELECT 30, 3, 'spark', 'a', timestamp'2018-11-17 12:34:56', 'world'")
+    sql(s"CREATE TABLE default.unmasked $format AS SELECT * FROM default.src")
+  }
+
+  private def cleanup(): Unit = {
+    sql("DROP TABLE IF EXISTS default.src")
+    sql("DROP TABLE IF EXISTS default.unmasked")
+  }
+
+  override def beforeAll(): Unit = {
+    doAs("admin", setup())
+    super.beforeAll()
+  }
+  override def afterAll(): Unit = {
+    doAs("admin", cleanup())
+    spark.stop
+    super.afterAll()
+  }
+
+  protected def checkAnswer(user: String, query: String, result: Seq[Row]): Assertion = {
+    doAs(user, assert(sql(query).collect() === result))
+  }
+
+  test("simple query with a user doesn't have mask rules") {
+    checkAnswer("kent", "SELECT key FROM default.src order by key", Seq(Row(1), Row(20), Row(30)))
+  }
+
+  test("simple query with a user has mask rules") {
+    val result =
+      Seq(Row(md5Hex("1"), "xxxxx", "worlx", Timestamp.valueOf("2018-01-01 00:00:00"), "Xorld"))
+    checkAnswer("bob", "SELECT value1, value2, value3, value4, value5 FROM default.src", result)
+    checkAnswer(
+      "bob",
+      "SELECT value1 as key, value2, value3, value4, value5 FROM default.src",
+      result)
+  }
+
+  test("star") {
+    val result =
+      Seq(Row(1, md5Hex("1"), "xxxxx", "worlx", Timestamp.valueOf("2018-01-01 00:00:00"), "Xorld"))
+    checkAnswer("bob", "SELECT * FROM default.src", result)
+  }
+
+  test("simple udf") {
+    val result =
+      Seq(Row(md5Hex("1"), "xxxxx", "worlx", Timestamp.valueOf("2018-01-01 00:00:00"), "Xorld"))
+    checkAnswer(
+      "bob",
+      "SELECT max(value1), max(value2), max(value3), max(value4), max(value5) FROM default.src",
+      result)
+  }
+
+  test("complex udf") {
+    val result =
+      Seq(Row(md5Hex("1"), "xxxxx", "worlx", Timestamp.valueOf("2018-01-01 00:00:00"), "Xorld"))
+    checkAnswer(
+      "bob",
+      "SELECT coalesce(max(value1), 1), coalesce(max(value2), 1), coalesce(max(value3), 1), " +
+        "coalesce(max(value4), timestamp '2018-01-01 22:33:44'), coalesce(max(value5), 1) " +
+        "FROM default.src",
+      result)
+  }
+
+  test("in subquery") {
+    val result =
+      Seq(Row(md5Hex("1"), "xxxxx", "worlx", Timestamp.valueOf("2018-01-01 00:00:00"), "Xorld"))
+    checkAnswer(
+      "bob",
+      "SELECT value1, value2, value3, value4, value5 FROM default.src WHERE value2 in " +
+        "(SELECT value2 as key FROM default.src)",
+      result)
+  }
+
+  test("create a unmasked table as select from a masked one") {
+    withCleanTmpResources(Seq(("default.src2", "table"))) {
+      doAs("bob", sql(s"CREATE TABLE default.src2 $format AS SELECT value1 FROM default.src"))
+      checkAnswer("bob", "SELECT value1 FROM default.src2", Seq(Row(md5Hex("1"))))
+    }
+  }
+
+  test("insert into a unmasked table from a masked one") {
+    withCleanTmpResources(Seq(("default.src2", "table"), ("default.src3", "table"))) {
+      doAs("bob", sql(s"CREATE TABLE default.src2 (value1 string) $format"))
+      doAs("bob", sql(s"INSERT INTO default.src2 SELECT value1 from default.src"))
+      doAs("bob", sql(s"INSERT INTO default.src2 SELECT value1 as v from default.src"))
+      checkAnswer("bob", "SELECT value1 FROM default.src2", Seq(Row(md5Hex("1")), Row(md5Hex("1"))))
+      doAs("bob", sql(s"CREATE TABLE default.src3 (k int, value string) $format"))
+      doAs("bob", sql(s"INSERT INTO default.src3 SELECT key, value1 from default.src"))
+      doAs("bob", sql(s"INSERT INTO default.src3 SELECT key, value1 as v from default.src"))
+      checkAnswer("bob", "SELECT value FROM default.src3", Seq(Row(md5Hex("1")), Row(md5Hex("1"))))
+    }
+  }
+
+  test("join on an unmasked table") {
+    val s = "SELECT a.value1, b.value1 FROM default.src a" +
+      " join default.unmasked b on a.value1=b.value1"
+    checkAnswer("bob", s, Nil)
+    checkAnswer("bob", s, Nil) // just for testing query multiple times, don't delete it
+  }
+
+  test("self join on a masked table") {
+    val s = "SELECT a.value1, b.value1 FROM default.src a" +
+      " join default.src b on a.value1=b.value1"
+    checkAnswer("bob", s, Seq(Row(md5Hex("1"), md5Hex("1"))))
+    // just for testing query multiple times, don't delete it
+    checkAnswer("bob", s, Seq(Row(md5Hex("1"), md5Hex("1"))))
+  }
+
+  test("self join on a masked table and filter the masked column with original value") {
+    val s = "SELECT a.value1, b.value1 FROM default.src a" +
+      " join default.src b on a.value1=b.value1" +
+      " where a.value1='1' and b.value1='1'"
+    checkAnswer("bob", s, Nil)
+    checkAnswer("bob", s, Nil) // just for testing query multiple times, don't delete it
+  }
+
+  test("self join on a masked table and filter the masked column with masked value") {
+    // scalastyle:off
+    val s = "SELECT a.value1, b.value1 FROM default.src a" +
+      " join default.src b on a.value1=b.value1" +
+      s" where a.value1='${md5Hex("1")}' and b.value1='${md5Hex("1")}'"
+    // TODO: The v1 an v2 relations generate different implicit type cast rules for filters
+    // so the bellow test failed in derivative classes that us v2 data source, e.g., DataMaskingForIcebergSuite
+    // For the issue itself, we might need check the spark logic first
+    // DataMaskingStage1Marker Project [value1#178, value1#183]
+    // +- Project [value1#178, value1#183]
+    //   +- Filter ((cast(value1#178 as int) = cast(c4ca4238a0b923820dcc509a6f75849b as int)) AND (cast(value1#183 as int) = cast(c4ca4238a0b923820dcc509a6f75849b as int)))
+    //      +- Join Inner, (value1#178 = value1#183)
+    //         :- SubqueryAlias a
+    //         :  +- SubqueryAlias testcat.default.src
+    //         :     +- Filter (key#166 < 20)
+    //         :        +- RowFilterMarker
+    //         :           +- DataMaskingStage0Marker RelationV2[key#166, value1#167, value2#168, value3#169, value4#170, value5#171] default.src
+    //         :              +- Project [key#166, md5(cast(cast(value1#167 as string) as binary)) AS value1#178, regexp_replace(regexp_replace(regexp_replace(value2#168, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#179, regexp_replace(regexp_replace(regexp_replace(value3#169, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#180, date_trunc(YEAR, value4#170, Some(Asia/Shanghai)) AS value4#181, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#171, (length(value5#171 [...]
+    //         :                 +- RelationV2[key#166, value1#167, value2#168, value3#169, value4#170, value5#171] default.src
+    //         +- SubqueryAlias b
+    //            +- SubqueryAlias testcat.default.src
+    //               +- Filter (key#172 < 20)
+    //                  +- RowFilterMarker
+    //                     +- DataMaskingStage0Marker RelationV2[key#172, value1#173, value2#174, value3#175, value4#176, value5#177] default.src
+    //                        +- Project [key#172, md5(cast(cast(value1#173 as string) as binary)) AS value1#183, regexp_replace(regexp_replace(regexp_replace(value2#174, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#184, regexp_replace(regexp_replace(regexp_replace(value3#175, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#185, date_trunc(YEAR, value4#176, Some(Asia/Shanghai)) AS value4#186, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#177, (length(value5#177 [...]
+    //                           +- RelationV2[key#172, value1#173, value2#174, value3#175, value4#176, value5#177] default.src
+    //
+    //
+    // Project [value1#143, value1#148]
+    // +- Filter ((value1#143 = c4ca4238a0b923820dcc509a6f75849b) AND (value1#148 = c4ca4238a0b923820dcc509a6f75849b))
+    //   +- Join Inner, (value1#143 = value1#148)
+    //      :- SubqueryAlias a
+    //      :  +- SubqueryAlias spark_catalog.default.src
+    //      :     +- Filter (key#60 < 20)
+    //      :        +- RowFilterMarker
+    //      :           +- DataMaskingStage0Marker Relation default.src[key#60,value1#61,value2#62,value3#63,value4#64,value5#65] parquet
+    //      :              +- Project [key#60, md5(cast(cast(value1#61 as string) as binary)) AS value1#143, regexp_replace(regexp_replace(regexp_replace(value2#62, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#144, regexp_replace(regexp_replace(regexp_replace(value3#63, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#145, date_trunc(YEAR, value4#64, Some(Asia/Shanghai)) AS value4#146, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#65, (length(value5#65) - 4)), [ [...]
+    //      :                 +- Relation default.src[key#60,value1#61,value2#62,value3#63,value4#64,value5#65] parquet
+    //      +- SubqueryAlias b
+    //         +- SubqueryAlias spark_catalog.default.src
+    //            +- Filter (key#153 < 20)
+    //               +- RowFilterMarker
+    //                  +- DataMaskingStage0Marker Relation default.src[key#60,value1#61,value2#62,value3#63,value4#64,value5#65] parquet
+    //                     +- Project [key#153, md5(cast(cast(value1#154 as string) as binary)) AS value1#148, regexp_replace(regexp_replace(regexp_replace(value2#155, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#149, regexp_replace(regexp_replace(regexp_replace(value3#156, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#150, date_trunc(YEAR, value4#157, Some(Asia/Shanghai)) AS value4#151, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#158, (length(value5#158) - [...]
+    //                        +- Relation default.src[key#153,value1#154,value2#155,value3#156,value4#157,value5#158] parquet
+    // checkAnswer("bob", s, Seq(Row(md5Hex("1"), md5Hex("1"))))
+    //
+    //
+    // scalastyle:on
+
+    // So here we use value2 to avoid type casting
+    val s2 = "SELECT a.value1, b.value1 FROM default.src a" +
+      " join default.src b on a.value1=b.value1" +
+      s" where a.value2='xxxxx' and b.value2='xxxxx'"
+    checkAnswer("bob", s2, Seq(Row(md5Hex("1"), md5Hex("1"))))
+    // just for testing query multiple times, don't delete it
+    checkAnswer("bob", s2, Seq(Row(md5Hex("1"), md5Hex("1"))))
+  }
+
+  test("union an unmasked table") {
+    val s = """
+      SELECT value1 from (
+           SELECT a.value1 FROM default.src a
+           union
+          (SELECT b.value1 FROM default.unmasked b)
+      ) c order by value1
+      """
+    checkAnswer("bob", s, Seq(Row("1"), Row("2"), Row("3"), Row(md5Hex("1"))))
+  }
+
+  test("union a masked table") {
+    val s = "SELECT a.value1 FROM default.src a union" +
+      " (SELECT b.value1 FROM default.src b)"
+    checkAnswer("bob", s, Seq(Row(md5Hex("1"))))
+  }
+
+  test("KYUUBI #3581: permanent view should lookup rule on itself not the   ") {
+    assume(isSparkV31OrGreater)
+    val supported = doAs(
+      "perm_view_user",
+      Try(sql("CREATE OR REPLACE VIEW default.perm_view AS SELECT * FROM default.src")).isSuccess)
+    assume(supported, s"view support for '$format' has not been implemented yet")
+
+    withCleanTmpResources(Seq(("default.perm_view", "view"))) {
+      checkAnswer(
+        "perm_view_user",
+        "SELECT value1, value2 FROM default.src where key < 20",
+        Seq(Row(1, "hello")))
+      checkAnswer(
+        "perm_view_user",
+        "SELECT value1, value2 FROM default.perm_view where key < 20",
+        Seq(Row(md5Hex("1"), "hello")))
+    }
+  }
+}