You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/04 12:44:10 UTC

spark git commit: [SPARK-18661][SQL] Creating a partitioned datasource table should not scan all files for table

Repository: spark
Updated Branches:
  refs/heads/master e463678b1 -> d9eb4c721


[SPARK-18661][SQL] Creating a partitioned datasource table should not scan all files for table

## What changes were proposed in this pull request?

Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason.

We should avoid doing this when the user specifies a schema.

## How was this patch tested?

Perf stat tests.

Author: Eric Liang <ek...@databricks.com>

Closes #16090 from ericl/spark-18661.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9eb4c72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9eb4c72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9eb4c72

Branch: refs/heads/master
Commit: d9eb4c7215f26dd05527c0b9980af35087ab9d64
Parents: e463678
Author: Eric Liang <ek...@databricks.com>
Authored: Sun Dec 4 20:44:04 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Dec 4 20:44:04 2016 +0800

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        | 10 +++-
 .../sql/execution/datasources/DataSource.scala  |  2 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 11 ++++-
 .../hive/PartitionedTablePerfStatsSuite.scala   | 51 ++++++++++++++++++--
 4 files changed, 66 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9eb4c72/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 422700c..193a2a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -58,13 +58,21 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
     // Create the relation to validate the arguments before writing the metadata to the metastore,
     // and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
     val pathOption = table.storage.locationUri.map("path" -> _)
+    // Fill in some default table options from the session conf
+    val tableWithDefaultOptions = table.copy(
+      identifier = table.identifier.copy(
+        database = Some(
+          table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
+      tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
     val dataSource: BaseRelation =
       DataSource(
         sparkSession = sparkSession,
         userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
+        partitionColumns = table.partitionColumnNames,
         className = table.provider.get,
         bucketSpec = table.bucketSpec,
-        options = table.storage.properties ++ pathOption).resolveRelation()
+        options = table.storage.properties ++ pathOption,
+        catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
 
     dataSource match {
       case fs: HadoopFsRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d9eb4c72/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ccfc759..f47eb84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -132,7 +132,7 @@ case class DataSource(
       }.toArray
       new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
     }
-    val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
+    val partitionSchema = if (partitionColumns.isEmpty) {
       // Try to infer partitioning, because no DataSource in the read path provides the partitioning
       // columns properly unless it is a Hive DataSource
       val resolved = tempFileIndex.partitionSchema.map { partitionField =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d9eb4c72/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 2a004ba..e61beb4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -312,7 +312,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
           pathToNonPartitionedTable,
           userSpecifiedSchema = Option("num int, str string"),
           userSpecifiedPartitionCols = partitionCols,
-          expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+          expectedSchema = if (partitionCols.isDefined) {
+            // we skipped inference, so the partition col is ordered at the end
+            new StructType().add("str", StringType).add("num", IntegerType)
+          } else {
+            // no inferred partitioning, so schema is in original order
+            new StructType().add("num", IntegerType).add("str", StringType)
+          },
           expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
       }
     }
@@ -565,7 +571,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
       assert(table.provider == Some("parquet"))
-      assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
+      // a is ordered last since it is a user-specified partitioning column
+      assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType))
       assert(table.partitionColumnNames == Seq("a"))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9eb4c72/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 9838b9a..65c02d4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -60,36 +60,52 @@ class PartitionedTablePerfStatsSuite
     setupPartitionedHiveTable(tableName, dir, 5)
   }
 
-  private def setupPartitionedHiveTable(tableName: String, dir: File, scale: Int): Unit = {
+  private def setupPartitionedHiveTable(
+      tableName: String, dir: File, scale: Int,
+      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
 
+    if (clearMetricsBeforeCreate) {
+      HiveCatalogMetrics.reset()
+    }
+
     spark.sql(s"""
       |create external table $tableName (fieldOne long)
       |partitioned by (partCol1 int, partCol2 int)
       |stored as parquet
       |location "${dir.getAbsolutePath}"""".stripMargin)
-    spark.sql(s"msck repair table $tableName")
+    if (repair) {
+      spark.sql(s"msck repair table $tableName")
+    }
   }
 
   private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
     setupPartitionedDatasourceTable(tableName, dir, 5)
   }
 
-  private def setupPartitionedDatasourceTable(tableName: String, dir: File, scale: Int): Unit = {
+  private def setupPartitionedDatasourceTable(
+      tableName: String, dir: File, scale: Int,
+      clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
     spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
       .partitionBy("partCol1", "partCol2")
       .mode("overwrite")
       .parquet(dir.getAbsolutePath)
 
+    if (clearMetricsBeforeCreate) {
+      HiveCatalogMetrics.reset()
+    }
+
     spark.sql(s"""
       |create table $tableName (fieldOne long, partCol1 int, partCol2 int)
       |using parquet
       |options (path "${dir.getAbsolutePath}")
       |partitioned by (partCol1, partCol2)""".stripMargin)
-    spark.sql(s"msck repair table $tableName")
+    if (repair) {
+      spark.sql(s"msck repair table $tableName")
+    }
   }
 
   genericTest("partitioned pruned table reports only selected files") { spec =>
@@ -250,6 +266,33 @@ class PartitionedTablePerfStatsSuite
     }
   }
 
+  test("datasource table: table setup does not scan filesystem") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedDatasourceTable(
+            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+        }
+      }
+    }
+  }
+
+  test("hive table: table setup does not scan filesystem") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+      withTable("test") {
+        withTempDir { dir =>
+          HiveCatalogMetrics.reset()
+          setupPartitionedHiveTable(
+            "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+        }
+      }
+    }
+  }
+
   test("hive table: num hive client calls does not scale with partition count") {
     withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
       withTable("test") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org