You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/22 23:24:45 UTC

[GitHub] gatorsmile closed pull request #19560: [SPARK-22334][SQL] Check table size from filesystem in case the size in metastore is wrong.

gatorsmile closed pull request #19560: [SPARK-22334][SQL] Check table size from filesystem in case the size in metastore is wrong.
URL: https://github.com/apache/spark/pull/19560
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4cfe53b2c115b..6fa006fa7c9b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -187,6 +187,15 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val VERIFY_STATS_FROM_FILESYSTEM_WHEN_JOIN =
+    buildConf("spark.sql.statistics.verifyStatsFromFileSystemWhenJoin")
+    .doc("If table size in metastore is below spark.sql.autoBroadcastJoinThreshold, check the" +
+      " real size on file system and set table size to be the bigger one. This is for defense" +
+      " and help avoid OOM caused by broadcast join. It's useful when metastore failed to" +
+      " update the stats of table previously.")
+    .booleanConf
+    .createWithDefault(false)
+
   val DEFAULT_SIZE_IN_BYTES = buildConf("spark.sql.defaultSizeInBytes")
     .internal()
     .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " +
@@ -1104,6 +1113,9 @@ class SQLConf extends Serializable with Logging {
 
   def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
 
+  def verifyStatsFromFileSystemWhenJoin: Boolean =
+    getConf(VERIFY_STATS_FROM_FILESYSTEM_WHEN_JOIN)
+
   def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
 
   def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3592b8f4846d1..e19cfdcacabff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.IOException
+import java.net.URI
 import java.util.Locale
 
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -26,8 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan,
-    ScriptTransformation}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
@@ -120,22 +120,53 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
         if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
       val table = relation.tableMeta
       val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-        try {
-          val hadoopConf = session.sessionState.newHadoopConf()
-          val tablePath = new Path(table.location)
-          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
-          fs.getContentSummary(tablePath).getLength
-        } catch {
-          case e: IOException =>
-            logWarning("Failed to get table size from hdfs.", e)
-            session.sessionState.conf.defaultSizeInBytes
-        }
+        getSizeFromFileSystem(table.location)
       } else {
         session.sessionState.conf.defaultSizeInBytes
       }
 
       val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
       relation.copy(tableMeta = withStats)
+
+    case r: Join =>
+      r.transformUp {
+        case relation: HiveTableRelation => verifySize(relation)
+      }
+  }
+
+  private[this] def verifySize(relation: HiveTableRelation): HiveTableRelation = {
+    if (DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.nonEmpty &&
+      session.sessionState.conf.verifyStatsFromFileSystemWhenJoin &&
+      relation.tableMeta.stats.get.sizeInBytes <
+        session.sessionState.conf.autoBroadcastJoinThreshold) {
+      val table = relation.tableMeta
+      val sizeInBytes = getSizeFromFileSystem(table.location)
+      if (sizeInBytes > relation.tableMeta.stats.get.sizeInBytes) {
+        logWarning(s"For hive table ${relation.tableMeta.qualifiedName}, its size" +
+          s" ${relation.tableMeta.stats.get.sizeInBytes} from metastore is smaller than its" +
+          s" real size $sizeInBytes on file system. Please update stats in metastore accurately.")
+        val newTable =
+          table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
+        relation.copy(tableMeta = newTable)
+      } else {
+        relation
+      }
+    } else {
+      relation
+    }
+  }
+
+  private[this] def getSizeFromFileSystem(loc: URI): Long = {
+    try {
+      val hadoopConf = session.sessionState.newHadoopConf()
+      val tablePath = new Path(loc)
+      val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+      fs.getContentSummary(tablePath).getLength
+    } catch {
+      case e: IOException =>
+        logWarning("Failed to get table size from hdfs.", e)
+        session.sessionState.conf.defaultSizeInBytes
+    }
   }
 }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index b9a5ad7657134..857b61eddb90c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -79,6 +79,59 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("Verify stats from file system when join.") {
+    withSQLConf(SQLConf.VERIFY_STATS_FROM_FILESYSTEM_WHEN_JOIN.key -> "true") {
+      withTable("csv_table") {
+        withTempDir { tempDir =>
+          // EXTERNAL OpenCSVSerde table pointing to LOCATION
+          val file1 = new File(tempDir + "/data1")
+          val writer1 = new PrintWriter(file1)
+          writer1.write("1,2")
+          writer1.close()
+
+          val file2 = new File(tempDir + "/data2")
+          val writer2 = new PrintWriter(file2)
+          writer2.write("1,2")
+          writer2.close()
+
+          sql(
+            s"""
+               |CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+               |WITH SERDEPROPERTIES (
+               |\"separatorChar\" = \",\",
+               |\"quoteChar\"     = \"\\\"\",
+               |\"escapeChar\"    = \"\\\\\")
+               |LOCATION '${tempDir.toURI}'
+               |tblproperties(
+               |  'numFiles'='1',
+               |  'rawDataSize'='1',
+               |  'totalSize'='1'
+               |)
+               |""".stripMargin)
+          val relation = spark.table("csv_table").queryExecution.analyzed.children.head
+            .asInstanceOf[HiveTableRelation]
+          val sizeInBytes = relation.stats.sizeInBytes
+          assert(sizeInBytes === 1)
+
+          val df = sql(
+            """
+              |SELECT * FROM
+              |csv_table a LEFT JOIN csv_table b
+              |on a.page_id = b.page_id
+            """.stripMargin)
+          val relations = df.queryExecution.analyzed.collect {
+            case relation: HiveTableRelation => relation
+          }
+          assert(relations.length === 2)
+          val realSize = BigInt(file1.length() + file2.length())
+          assert(relations(0).stats.sizeInBytes === realSize)
+          assert(relations(1).stats.sizeInBytes === realSize)
+        }
+      }
+    }
+  }
+
   test("analyze Hive serde tables") {
     def queryTotalSize(tableName: String): BigInt =
       spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org