You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/07/28 22:41:36 UTC

[spark] branch master updated: [SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 485ae6d  [SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables
485ae6d is described below

commit 485ae6d1818e8756a86da38d6aefc8f1dbde49c2
Author: shahid <sh...@gmail.com>
AuthorDate: Sun Jul 28 15:35:01 2019 -0700

    [SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables
    
    In case of CatalogFileIndex datasource table, sizeInBytes is always coming as default size in bytes, which is  8.0EB (Even when the user give fallBackToHdfsForStatsEnabled=true) . So, the datasource table which has CatalogFileIndex, always prefer SortMergeJoin, instead of BroadcastJoin, even though the size is below broadcast join threshold.
    In this PR, In case of CatalogFileIndex table, if we enable "fallBackToHdfsForStatsEnabled=true", then the computeStatistics  get the sizeInBytes from the hdfs and we get the actual size of the table. Hence, during join operation, when the table size is below broadcast threshold, it will prefer broadCastHashJoin instead of SortMergeJoin.
    
    Added UT
    
    Closes #22502 from shahidki31/SPARK-25474.
    
    Authored-by: shahid <sh...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/execution/command/CommandUtils.scala | 11 ++++++
 .../execution/datasources/HadoopFsRelation.scala   | 13 ++++---
 .../org/apache/spark/sql/hive/HiveStrategies.scala | 14 ++------
 .../apache/spark/sql/hive/StatisticsSuite.scala    | 40 ++++++++++++++++++++++
 4 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index b644e6d..9a9d66b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -344,4 +344,15 @@ object CommandUtils extends Logging {
   private def isDataPath(path: Path, stagingDir: String): Boolean = {
     !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
   }
+
+  def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = {
+    try {
+      val hadoopConf = session.sessionState.newHadoopConf()
+      path.getFileSystem(hadoopConf).getContentSummary(path).getLength
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e)
+        defaultSize
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index d278802..f7d2315 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -17,13 +17,12 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.util.Locale
-
-import scala.collection.mutable
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.execution.FileRelation
+import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
 import org.apache.spark.sql.types.{StructField, StructType}
 
@@ -71,7 +70,13 @@ case class HadoopFsRelation(
 
   override def sizeInBytes: Long = {
     val compressionFactor = sqlContext.conf.fileCompressionFactor
-    (location.sizeInBytes * compressionFactor).toLong
+    val defaultSize = (location.sizeInBytes * compressionFactor).toLong
+    location match {
+      case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled =>
+        CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new Path(cfi.table.location),
+          defaultSize)
+      case _ => defaultSize
+    }
   }
 
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7b28e4f..d09c0ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
     ScriptTransformation}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
+import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -118,16 +118,8 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
         if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
       val table = relation.tableMeta
       val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-        try {
-          val hadoopConf = session.sessionState.newHadoopConf()
-          val tablePath = new Path(table.location)
-          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
-          fs.getContentSummary(tablePath).getLength
-        } catch {
-          case e: IOException =>
-            logWarning("Failed to get table size from hdfs.", e)
-            session.sessionState.conf.defaultSizeInBytes
-        }
+        CommandUtils.getSizeInBytesFallBackToHdfs(session, new Path(table.location),
+          session.sessionState.conf.defaultSizeInBytes)
       } else {
         session.sessionState.conf.defaultSizeInBytes
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index b4e5058..e20099a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1484,4 +1484,44 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       }
     }
   }
+
+  test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") {
+    withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
+      withTable("t1", "t2") {
+        sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
+        sql("INSERT INTO t1 VALUES (1, 'a')")
+        checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB")
+        sql("CREATE TABLE t2 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
+        sql("INSERT INTO t2 VALUES (1, 'a')")
+        checkKeywordsExist(sql("EXPLAIN SELECT * FROM t1, t2 WHERE t1.id=t2.id"),
+          "BroadcastHashJoin")
+      }
+    }
+  }
+
+  test("SPARK-25474: should not fall back to hdfs when table statistics exists" +
+    " for CatalogFileIndex dataSourceTable") {
+
+    var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L
+    Seq(true, false).foreach { fallBackToHdfs =>
+      withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> fallBackToHdfs.toString) {
+        withTable("t1") {
+          sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)")
+          sql("INSERT INTO t1 VALUES (1, 'a')")
+          // Analyze command updates the statistics of table `t1`
+          sql("ANALYZE TABLE t1 COMPUTE STATISTICS")
+          val catalogTable = getCatalogTable("t1")
+          assert(catalogTable.stats.isDefined)
+
+          if (!fallBackToHdfs) {
+            sizeInBytesDisabledFallBack = catalogTable.stats.get.sizeInBytes.toLong
+          } else {
+            sizeInBytesEnabledFallBack = catalogTable.stats.get.sizeInBytes.toLong
+          }
+          checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), "sizeInBytes=8.0 EiB")
+        }
+      }
+    }
+    assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org