You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2023/04/23 09:51:57 UTC

[kyuubi] branch master updated: [KYUUBI #4641] Add MaxFileSizeStrategy to limit max scan file size

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 19d5a9a37 [KYUUBI #4641] Add MaxFileSizeStrategy to limit max scan file size
19d5a9a37 is described below

commit 19d5a9a371bf17a0b9d5145e5fb2793baa034456
Author: wforget <64...@qq.com>
AuthorDate: Sun Apr 23 17:51:44 2023 +0800

    [KYUUBI #4641] Add MaxFileSizeStrategy to limit max scan file size
    
    ### _Why are the changes needed?_
    
    Add MaxFileSizeStrategy to limit max scan file size.
    close #4641
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4642 from wForget/KYUUBI-4641.
    
    Closes #4641
    
    14a680f8e [wforget] comment
    d2a393d97 [wforget] comment
    b1ef4c52c [wforget] fix
    d9e94bd8e [wforget] fix style
    8a9121131 [wforget] use optional value
    094eb61e3 [wforget] combine
    89e2cb4d0 [wforget] [KYUUBI-4641] Add MaxFileSizeStrategy to limit max scan file size
    
    Authored-by: wforget <64...@qq.com>
    Signed-off-by: ulyssesyou <ul...@apache.org>
---
 docs/extensions/engines/spark/rules.md             |   3 +-
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |   4 +-
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |   4 +-
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |   4 +-
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |  13 +-
 .../sql/watchdog/KyuubiWatchDogException.scala     |   5 +
 .../kyuubi/sql/watchdog/MaxPartitionStrategy.scala | 185 -------------
 .../kyuubi/sql/watchdog/MaxScanStrategy.scala      | 303 +++++++++++++++++++++
 .../org/apache/spark/sql/WatchDogSuiteBase.scala   | 123 ++++++++-
 9 files changed, 450 insertions(+), 194 deletions(-)

diff --git a/docs/extensions/engines/spark/rules.md b/docs/extensions/engines/spark/rules.md
index 8fa636c31..50bf087a8 100644
--- a/docs/extensions/engines/spark/rules.md
+++ b/docs/extensions/engines/spark/rules.md
@@ -73,7 +73,8 @@ Kyuubi provides some configs to make these feature easy to use.
 | spark.sql.analyzer.classification.enabled                           | false                                  | When true, allows Kyuubi engine to judge this SQL's classification and set `spark.sql.analyzer.classification` back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic.                                                                                                                                   | 1.4.0 |
 | spark.sql.optimizer.insertZorderBeforeWriting.enabled               | true                                   | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols.                                                                    | 1.4.0 |
 | spark.sql.optimizer.zorderGlobalSort.enabled                        | true                                   | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder.                                                                                                                                                                    | 1.4.0 |
-| spark.sql.watchdog.maxPartitions                                    | none                                   | Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined                                                                                                 | 1.4.0 |
+| spark.sql.watchdog.maxPartitions                                    | none                                   | Set the max partition number when spark scans a data source. Enable maxPartition Strategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined                                                                                                | 1.4.0 |
+| spark.sql.watchdog.maxFileSize                                      | none                                   | Set the maximum size in bytes of files when spark scans a data source. Enable maxFileSize Strategy by specifying this configuration. Add maxFileSize Strategy to avoid scan excessive size of files, it's optional that works with defined                                                                                                           | 1.8.0 |
 | spark.sql.optimizer.dropIgnoreNonExistent                           | false                                  | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition                                                                                                                                                                                                     | 1.5.0 |
 | spark.sql.optimizer.rebalanceBeforeZorder.enabled                   | false                                  | When true, we do a rebalance before zorder in case data skew. Note that, if the insertion is dynamic partition we will use the partition columns to rebalance. Note that, this config only affects with Spark 3.3.x.                                                                                                                                 | 1.6.0 |
 | spark.sql.optimizer.rebalanceZorderColumns.enabled                  | false                                  | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do rebalance before Z-Order. If it's dynamic partition insert, the rebalance expression will include both partition columns and Z-Order columns. Note that, this config only affects with Spark 3.3.x.                                                                 | 1.6.0 |
diff --git a/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index cd312de95..f952b56f3 100644
--- a/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.sql
 import org.apache.spark.sql.SparkSessionExtensions
 
 import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy}
+import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxScanStrategy}
 
 // scalastyle:off line.size.limit
 /**
@@ -40,6 +40,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
 
     // watchdog extension
     extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
-    extensions.injectPlannerStrategy(MaxPartitionStrategy)
+    extensions.injectPlannerStrategy(MaxScanStrategy)
   }
 }
diff --git a/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index ef9da41be..97e777042 100644
--- a/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-2/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.SparkSessionExtensions
 
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy}
+import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxScanStrategy}
 
 // scalastyle:off line.size.limit
 /**
@@ -38,6 +38,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
 
     // watchdog extension
     extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
-    extensions.injectPlannerStrategy(MaxPartitionStrategy)
+    extensions.injectPlannerStrategy(MaxScanStrategy)
   }
 }
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 0db9b3ab8..5d3464228 100644
--- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}
 
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy}
+import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxScanStrategy}
 
 // scalastyle:off line.size.limit
 /**
@@ -38,7 +38,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
 
     // watchdog extension
     extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
-    extensions.injectPlannerStrategy(MaxPartitionStrategy)
+    extensions.injectPlannerStrategy(MaxScanStrategy)
 
     extensions.injectQueryStagePrepRule(FinalStageResourceManager)
     extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index aeee45869..cb2f1130e 100644
--- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.sql
 
+import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf._
 
@@ -138,13 +139,23 @@ object KyuubiSQLConf {
   val WATCHDOG_MAX_PARTITIONS =
     buildConf("spark.sql.watchdog.maxPartitions")
       .doc("Set the max partition number when spark scans a data source. " +
-        "Enable MaxPartitionStrategy by specifying this configuration. " +
+        "Enable maxPartitions Strategy by specifying this configuration. " +
         "Add maxPartitions Strategy to avoid scan excessive partitions " +
         "on partitioned table, it's optional that works with defined")
       .version("1.4.0")
       .intConf
       .createOptional
 
+  val WATCHDOG_MAX_FILE_SIZE =
+    buildConf("spark.sql.watchdog.maxFileSize")
+      .doc("Set the maximum size in bytes of files when spark scans a data source. " +
+        "Enable maxFileSize Strategy by specifying this configuration. " +
+        "Add maxFileSize Strategy to avoid scan excessive size of files," +
+        " it's optional that works with defined")
+      .version("1.8.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createOptional
+
   val WATCHDOG_FORCED_MAXOUTPUTROWS =
     buildConf("spark.sql.watchdog.forcedMaxOutputRows")
       .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of non-limit query " +
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
index b3c58afdf..e44309192 100644
--- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
+++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
@@ -23,3 +23,8 @@ final class MaxPartitionExceedException(
     private val reason: String = "",
     private val cause: Throwable = None.orNull)
   extends KyuubiSQLExtensionException(reason, cause)
+
+final class MaxFileSizeExceedException(
+    private val reason: String = "",
+    private val cause: Throwable = None.orNull)
+  extends KyuubiSQLExtensionException(reason, cause)
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
deleted file mode 100644
index 61ab07adf..000000000
--- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
-import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
-import org.apache.spark.sql.types.StructType
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/**
- * Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table
- * 1 Check if scan exceed maxPartition
- * 2 Check if Using partitionFilter on partitioned table
- * This Strategy Add Planner Strategy after LogicalOptimizer
- */
-case class MaxPartitionStrategy(session: SparkSession)
-  extends Strategy
-  with SQLConfHelper
-  with PruneFileSourcePartitionHelper {
-  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
-    val maxScanPartitionsOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS)
-
-    if (maxScanPartitionsOpt.isDefined) {
-      checkRelationMaxPartitions(plan, maxScanPartitionsOpt.get)
-    }
-    Nil
-  }
-
-  private def checkRelationMaxPartitions(
-      plan: LogicalPlan,
-      maxScanPartitions: Int): Unit = {
-    plan match {
-      case ScanOperation(_, _, relation: HiveTableRelation) if relation.isPartitioned =>
-        relation.prunedPartitions match {
-          case Some(prunedPartitions) =>
-            if (prunedPartitions.size > maxScanPartitions) {
-              throw new MaxPartitionExceedException(
-                s"""
-                   |SQL job scan hive partition: ${prunedPartitions.size}
-                   |exceed restrict of hive scan maxPartition $maxScanPartitions
-                   |You should optimize your SQL logical according partition structure
-                   |or shorten query scope such as p_date, detail as below:
-                   |Table: ${relation.tableMeta.qualifiedName}
-                   |Owner: ${relation.tableMeta.owner}
-                   |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
-                   |""".stripMargin)
-            }
-          case _ =>
-            val totalPartitions = session
-              .sessionState.catalog.externalCatalog.listPartitionNames(
-                relation.tableMeta.database,
-                relation.tableMeta.identifier.table)
-            if (totalPartitions.size > maxScanPartitions) {
-              throw new MaxPartitionExceedException(
-                s"""
-                   |Your SQL job scan a whole huge table without any partition filter,
-                   |You should optimize your SQL logical according partition structure
-                   |or shorten query scope such as p_date, detail as below:
-                   |Table: ${relation.tableMeta.qualifiedName}
-                   |Owner: ${relation.tableMeta.owner}
-                   |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
-                   |""".stripMargin)
-            }
-        }
-      case ScanOperation(
-            _,
-            filters,
-            relation @ LogicalRelation(
-              fsRelation @ HadoopFsRelation(
-                fileIndex: InMemoryFileIndex,
-                partitionSchema,
-                _,
-                _,
-                _,
-                _),
-              _,
-              _,
-              _)) if fsRelation.partitionSchema.nonEmpty =>
-        val (partitionKeyFilters, dataFilter) =
-          getPartitionKeyFiltersAndDataFilters(
-            fsRelation.sparkSession,
-            relation,
-            partitionSchema,
-            filters,
-            relation.output)
-        val prunedPartitionSize = fileIndex.listFiles(
-          partitionKeyFilters.toSeq,
-          dataFilter)
-          .size
-        if (prunedPartitionSize > maxScanPartitions) {
-          throw maxPartitionExceedError(
-            prunedPartitionSize,
-            maxScanPartitions,
-            relation.catalogTable,
-            fileIndex.rootPaths,
-            fsRelation.partitionSchema)
-        }
-      case ScanOperation(
-            _,
-            filters,
-            logicalRelation @ LogicalRelation(
-              fsRelation @ HadoopFsRelation(
-                catalogFileIndex: CatalogFileIndex,
-                partitionSchema,
-                _,
-                _,
-                _,
-                _),
-              _,
-              _,
-              _)) if fsRelation.partitionSchema.nonEmpty =>
-        val (partitionKeyFilters, _) =
-          getPartitionKeyFiltersAndDataFilters(
-            fsRelation.sparkSession,
-            logicalRelation,
-            partitionSchema,
-            filters,
-            logicalRelation.output)
-
-        val prunedPartitionSize =
-          catalogFileIndex.filterPartitions(
-            partitionKeyFilters.toSeq)
-            .partitionSpec()
-            .partitions
-            .size
-        if (prunedPartitionSize > maxScanPartitions) {
-          throw maxPartitionExceedError(
-            prunedPartitionSize,
-            maxScanPartitions,
-            logicalRelation.catalogTable,
-            catalogFileIndex.rootPaths,
-            fsRelation.partitionSchema)
-        }
-      case _ =>
-    }
-  }
-
-  def maxPartitionExceedError(
-      prunedPartitionSize: Int,
-      maxPartitionSize: Int,
-      tableMeta: Option[CatalogTable],
-      rootPaths: Seq[Path],
-      partitionSchema: StructType): Throwable = {
-    val truncatedPaths =
-      if (rootPaths.length > 5) {
-        rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths"
-      } else {
-        rootPaths.mkString(",")
-      }
-
-    new MaxPartitionExceedException(
-      s"""
-         |SQL job scan data source partition: $prunedPartitionSize
-         |exceed restrict of data source scan maxPartition $maxPartitionSize
-         |You should optimize your SQL logical according partition structure
-         |or shorten query scope such as p_date, detail as below:
-         |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
-         |Owner: ${tableMeta.map(_.owner).getOrElse("")}
-         |RootPaths: $truncatedPaths
-         |Partition Structure: ${partitionSchema.map(_.name).mkString(", ")}
-         |""".stripMargin)
-  }
-}
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
new file mode 100644
index 000000000..0ee693fcb
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+/**
+ * Add MaxScanStrategy to avoid scan excessive partitions or files
+ * 1. Check if scan exceed maxPartition of partitioned table
+ * 2. Check if scan exceed maxFileSize (calculated by hive table and partition statistics)
+ * This Strategy Add Planner Strategy after LogicalOptimizer
+ * @param session
+ */
+case class MaxScanStrategy(session: SparkSession)
+  extends Strategy
+  with SQLConfHelper
+  with PruneFileSourcePartitionHelper {
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val maxScanPartitionsOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS)
+    val maxFileSizeOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE)
+    if (maxScanPartitionsOpt.isDefined || maxFileSizeOpt.isDefined) {
+      checkScan(plan, maxScanPartitionsOpt, maxFileSizeOpt)
+    }
+    Nil
+  }
+
+  private def checkScan(
+      plan: LogicalPlan,
+      maxScanPartitionsOpt: Option[Int],
+      maxFileSizeOpt: Option[Long]): Unit = {
+    plan match {
+      case ScanOperation(_, _, relation: HiveTableRelation) =>
+        if (relation.isPartitioned) {
+          relation.prunedPartitions match {
+            case Some(prunedPartitions) =>
+              if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) {
+                throw new MaxPartitionExceedException(
+                  s"""
+                     |SQL job scan hive partition: ${prunedPartitions.size}
+                     |exceed restrict of hive scan maxPartition ${maxScanPartitionsOpt.get}
+                     |You should optimize your SQL logical according partition structure
+                     |or shorten query scope such as p_date, detail as below:
+                     |Table: ${relation.tableMeta.qualifiedName}
+                     |Owner: ${relation.tableMeta.owner}
+                     |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
+                     |""".stripMargin)
+              }
+              lazy val scanFileSize = prunedPartitions.flatMap(_.stats).map(_.sizeInBytes).sum
+              if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+                throw partTableMaxFileExceedError(
+                  scanFileSize,
+                  maxFileSizeOpt.get,
+                  Some(relation.tableMeta),
+                  prunedPartitions.flatMap(_.storage.locationUri).map(_.toString),
+                  relation.partitionCols.map(_.name))
+              }
+            case _ =>
+              lazy val scanPartitions: Int = session
+                .sessionState.catalog.externalCatalog.listPartitionNames(
+                  relation.tableMeta.database,
+                  relation.tableMeta.identifier.table).size
+              if (maxScanPartitionsOpt.exists(_ < scanPartitions)) {
+                throw new MaxPartitionExceedException(
+                  s"""
+                     |Your SQL job scan a whole huge table without any partition filter,
+                     |You should optimize your SQL logical according partition structure
+                     |or shorten query scope such as p_date, detail as below:
+                     |Table: ${relation.tableMeta.qualifiedName}
+                     |Owner: ${relation.tableMeta.owner}
+                     |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
+                     |""".stripMargin)
+              }
+
+              lazy val scanFileSize: BigInt =
+                relation.tableMeta.stats.map(_.sizeInBytes).getOrElse {
+                  session
+                    .sessionState.catalog.externalCatalog.listPartitions(
+                      relation.tableMeta.database,
+                      relation.tableMeta.identifier.table).flatMap(_.stats).map(_.sizeInBytes).sum
+                }
+              if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+                throw new MaxFileSizeExceedException(
+                  s"""
+                     |Your SQL job scan a whole huge table without any partition filter,
+                     |You should optimize your SQL logical according partition structure
+                     |or shorten query scope such as p_date, detail as below:
+                     |Table: ${relation.tableMeta.qualifiedName}
+                     |Owner: ${relation.tableMeta.owner}
+                     |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
+                     |""".stripMargin)
+              }
+          }
+        } else {
+          lazy val scanFileSize = relation.tableMeta.stats.map(_.sizeInBytes).sum
+          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+            throw nonPartTableMaxFileExceedError(
+              scanFileSize,
+              maxFileSizeOpt.get,
+              Some(relation.tableMeta))
+          }
+        }
+      case ScanOperation(
+            _,
+            filters,
+            relation @ LogicalRelation(
+              fsRelation @ HadoopFsRelation(
+                fileIndex: InMemoryFileIndex,
+                partitionSchema,
+                _,
+                _,
+                _,
+                _),
+              _,
+              _,
+              _)) =>
+        if (fsRelation.partitionSchema.nonEmpty) {
+          val (partitionKeyFilters, dataFilter) =
+            getPartitionKeyFiltersAndDataFilters(
+              fsRelation.sparkSession,
+              relation,
+              partitionSchema,
+              filters,
+              relation.output)
+          val prunedPartitions = fileIndex.listFiles(
+            partitionKeyFilters.toSeq,
+            dataFilter)
+          if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) {
+            throw maxPartitionExceedError(
+              prunedPartitions.size,
+              maxScanPartitionsOpt.get,
+              relation.catalogTable,
+              fileIndex.rootPaths,
+              fsRelation.partitionSchema)
+          }
+          lazy val scanFileSize = prunedPartitions.flatMap(_.files).map(_.getLen).sum
+          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+            throw partTableMaxFileExceedError(
+              scanFileSize,
+              maxFileSizeOpt.get,
+              relation.catalogTable,
+              fileIndex.rootPaths.map(_.toString),
+              fsRelation.partitionSchema.map(_.name))
+          }
+        } else {
+          lazy val scanFileSize = fileIndex.sizeInBytes
+          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+            throw nonPartTableMaxFileExceedError(
+              scanFileSize,
+              maxFileSizeOpt.get,
+              relation.catalogTable)
+          }
+        }
+      case ScanOperation(
+            _,
+            filters,
+            logicalRelation @ LogicalRelation(
+              fsRelation @ HadoopFsRelation(
+                catalogFileIndex: CatalogFileIndex,
+                partitionSchema,
+                _,
+                _,
+                _,
+                _),
+              _,
+              _,
+              _)) =>
+        if (fsRelation.partitionSchema.nonEmpty) {
+          val (partitionKeyFilters, _) =
+            getPartitionKeyFiltersAndDataFilters(
+              fsRelation.sparkSession,
+              logicalRelation,
+              partitionSchema,
+              filters,
+              logicalRelation.output)
+
+          val fileIndex = catalogFileIndex.filterPartitions(
+            partitionKeyFilters.toSeq)
+
+          lazy val prunedPartitionSize = fileIndex.partitionSpec().partitions.size
+          if (maxScanPartitionsOpt.exists(_ < prunedPartitionSize)) {
+            throw maxPartitionExceedError(
+              prunedPartitionSize,
+              maxScanPartitionsOpt.get,
+              logicalRelation.catalogTable,
+              catalogFileIndex.rootPaths,
+              fsRelation.partitionSchema)
+          }
+
+          lazy val scanFileSize = fileIndex
+            .listFiles(Nil, Nil).flatMap(_.files).map(_.getLen).sum
+          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+            throw partTableMaxFileExceedError(
+              scanFileSize,
+              maxFileSizeOpt.get,
+              logicalRelation.catalogTable,
+              catalogFileIndex.rootPaths.map(_.toString),
+              fsRelation.partitionSchema.map(_.name))
+          }
+        } else {
+          lazy val scanFileSize = catalogFileIndex.sizeInBytes
+          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+            throw nonPartTableMaxFileExceedError(
+              scanFileSize,
+              maxFileSizeOpt.get,
+              logicalRelation.catalogTable)
+          }
+        }
+      case _ =>
+    }
+  }
+
+  def maxPartitionExceedError(
+      prunedPartitionSize: Int,
+      maxPartitionSize: Int,
+      tableMeta: Option[CatalogTable],
+      rootPaths: Seq[Path],
+      partitionSchema: StructType): Throwable = {
+    val truncatedPaths =
+      if (rootPaths.length > 5) {
+        rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths"
+      } else {
+        rootPaths.mkString(",")
+      }
+
+    new MaxPartitionExceedException(
+      s"""
+         |SQL job scan data source partition: $prunedPartitionSize
+         |exceed restrict of data source scan maxPartition $maxPartitionSize
+         |You should optimize your SQL logical according partition structure
+         |or shorten query scope such as p_date, detail as below:
+         |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
+         |Owner: ${tableMeta.map(_.owner).getOrElse("")}
+         |RootPaths: $truncatedPaths
+         |Partition Structure: ${partitionSchema.map(_.name).mkString(", ")}
+         |""".stripMargin)
+  }
+
+  private def partTableMaxFileExceedError(
+      scanFileSize: Number,
+      maxFileSize: Long,
+      tableMeta: Option[CatalogTable],
+      rootPaths: Seq[String],
+      partitions: Seq[String]): Throwable = {
+    val truncatedPaths =
+      if (rootPaths.length > 5) {
+        rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 5) + " more paths"
+      } else {
+        rootPaths.mkString(",")
+      }
+
+    new MaxFileSizeExceedException(
+      s"""
+         |SQL job scan file size in bytes: $scanFileSize
+         |exceed restrict of table scan maxFileSize $maxFileSize
+         |You should optimize your SQL logical according partition structure
+         |or shorten query scope such as p_date, detail as below:
+         |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
+         |Owner: ${tableMeta.map(_.owner).getOrElse("")}
+         |RootPaths: $truncatedPaths
+         |Partition Structure: ${partitions.mkString(", ")}
+         |""".stripMargin)
+  }
+
+  private def nonPartTableMaxFileExceedError(
+      scanFileSize: Number,
+      maxFileSize: Long,
+      tableMeta: Option[CatalogTable]): Throwable = {
+    new MaxFileSizeExceedException(
+      s"""
+         |SQL job scan file size in bytes: $scanFileSize
+         |exceed restrict of table scan maxFileSize $maxFileSize
+         |detail as below:
+         |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
+         |Owner: ${tableMeta.map(_.owner).getOrElse("")}
+         |Location: ${tableMeta.map(_.location).getOrElse("")}
+         |""".stripMargin)
+  }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
index e6ecd28c9..6254829f2 100644
--- a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
+++ b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.sql
 
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
-import org.apache.kyuubi.sql.watchdog.MaxPartitionExceedException
+import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException}
 
 trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest {
   override protected def beforeAll(): Unit = {
@@ -477,4 +482,120 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest {
       }
     }
   }
+
+  private def checkMaxFileSize(tableSize: Long, nonPartTableSize: Long): Unit = {
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> tableSize.toString) {
+      checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)
+    }
+
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 2).toString) {
+      sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
+
+      sql(s"SELECT * FROM test WHERE p in (${Range(0, 3).toList.mkString(",")})")
+        .queryExecution.sparkPlan
+
+      intercept[MaxFileSizeExceedException](
+        sql("SELECT * FROM test where p != 1").queryExecution.sparkPlan)
+
+      intercept[MaxFileSizeExceedException](
+        sql("SELECT * FROM test").queryExecution.sparkPlan)
+
+      intercept[MaxFileSizeExceedException](sql(
+        s"SELECT * FROM test WHERE p in (${Range(0, 6).toList.mkString(",")})")
+        .queryExecution.sparkPlan)
+    }
+
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> nonPartTableSize.toString) {
+      checkAnswer(sql("SELECT count(*) FROM test_non_part"), Row(10000) :: Nil)
+    }
+
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize - 1).toString) {
+      intercept[MaxFileSizeExceedException](
+        sql("SELECT * FROM test_non_part").queryExecution.sparkPlan)
+    }
+  }
+
+  test("watchdog with scan maxFileSize -- hive") {
+    Seq(false).foreach { convertMetastoreParquet =>
+      withTable("test", "test_non_part", "temp") {
+        spark.range(10000).selectExpr("id as col")
+          .createOrReplaceTempView("temp")
+
+        // partitioned table
+        sql(
+          s"""
+             |CREATE TABLE test(i int)
+             |PARTITIONED BY (p int)
+             |STORED AS parquet""".stripMargin)
+        for (part <- Range(0, 10)) {
+          sql(
+            s"""
+               |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+               |select col from temp""".stripMargin)
+        }
+
+        val tablePath = new File(spark.sessionState.catalog.externalCatalog
+          .getTable("default", "test").location)
+        val tableSize = FileUtils.listFiles(tablePath, Array("parquet"), true).asScala
+          .map(_.length()).sum
+        assert(tableSize > 0)
+
+        // non-partitioned table
+        sql(
+          s"""
+             |CREATE TABLE test_non_part(i int)
+             |STORED AS parquet""".stripMargin)
+        sql(
+          s"""
+             |INSERT OVERWRITE TABLE test_non_part
+             |select col from temp""".stripMargin)
+        sql("ANALYZE TABLE test_non_part COMPUTE STATISTICS")
+
+        val nonPartTablePath = new File(spark.sessionState.catalog.externalCatalog
+          .getTable("default", "test_non_part").location)
+        val nonPartTableSize = FileUtils.listFiles(nonPartTablePath, Array("parquet"), true).asScala
+          .map(_.length()).sum
+        assert(nonPartTableSize > 0)
+
+        // check
+        withSQLConf("spark.sql.hive.convertMetastoreParquet" -> convertMetastoreParquet.toString) {
+          checkMaxFileSize(tableSize, nonPartTableSize)
+        }
+      }
+    }
+  }
+
+  test("watchdog with scan maxFileSize -- data source") {
+    withTempDir { dir =>
+      withTempView("test", "test_non_part") {
+        // partitioned table
+        val tablePath = new File(dir, "test")
+        spark.range(10).selectExpr("id", "id as p")
+          .write
+          .partitionBy("p")
+          .mode("overwrite")
+          .parquet(tablePath.getCanonicalPath)
+        spark.read.load(tablePath.getCanonicalPath).createOrReplaceTempView("test")
+
+        val tableSize = FileUtils.listFiles(tablePath, Array("parquet"), true).asScala
+          .map(_.length()).sum
+        assert(tableSize > 0)
+
+        // non-partitioned table
+        val nonPartTablePath = new File(dir, "test_non_part")
+        spark.range(10000).selectExpr("id", "id as p")
+          .write
+          .mode("overwrite")
+          .parquet(nonPartTablePath.getCanonicalPath)
+        spark.read.load(nonPartTablePath.getCanonicalPath).createOrReplaceTempView("test_non_part")
+
+        val nonPartTableSize = FileUtils.listFiles(nonPartTablePath, Array("parquet"), true).asScala
+          .map(_.length()).sum
+        assert(tableSize > 0)
+
+        // check
+        checkMaxFileSize(tableSize, nonPartTableSize)
+      }
+    }
+  }
 }