You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by GitBox <gi...@apache.org> on 2019/01/16 06:10:21 UTC

[spark] Diff for: [GitHub] HyukjinKwon closed pull request #23506: [SPARK-26577][SQL] Add input optimizer when reading Hive table by SparkSQL

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 597eef129f63e..688376e634f9d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -128,6 +128,12 @@ private[spark] object HiveUtils extends Logging {
     .toSequence
     .createWithDefault(jdbcPrefixes)
 
+  val HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED =
+    buildConf("spark.sql.hive.inputFormat.optimizer.enabled")
+    .doc("When true, enable the optimizer of `fileInputFormat` in Spark SQL.")
+    .booleanConf
+    .createWithDefault(false)
+
   private def jdbcPrefixes = Seq(
     "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 536bc4a3f4ec4..ffc76bffa5bcd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -123,8 +123,7 @@ class HadoopTableReader(
     val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
 
     // logDebug("Table input: %s".format(tablePath))
-    val ifc = hiveTable.getInputFormatClass
-      .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+    val ifc = getAndOptimizeInput(hiveTable.getInputFormatClass.getName)
     val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)
 
     val attrsWithIndex = attributes.zipWithIndex
@@ -164,7 +163,7 @@ class HadoopTableReader(
     def verifyPartitionPath(
         partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
         Map[HivePartition, Class[_ <: Deserializer]] = {
-      if (!sparkSession.sessionState.conf.verifyPartitionPath) {
+      if (!conf.verifyPartitionPath) {
         partitionToDeserializer
       } else {
         val existPathSet = collection.mutable.Set[String]()
@@ -202,8 +201,7 @@ class HadoopTableReader(
       val partDesc = Utilities.getPartitionDesc(partition)
       val partPath = partition.getDataLocation
       val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
-      val ifc = partDesc.getInputFileFormatClass
-        .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+      val ifc = getAndOptimizeInput(partDesc.getInputFileFormatClassName)
       // Get partition field info
       val partSpec = partDesc.getPartSpec
       val partProps = partDesc.getProperties
@@ -311,6 +309,32 @@ class HadoopTableReader(
     // Only take the value (skip the key) because Hive works only with values.
     rdd.map(_._2)
   }
+
+  /**
+   * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will optimize the input
+   * method(including format and the size of splits) while reading Hive tables.
+   */
+  private def getAndOptimizeInput(
+    inputClassName: String): Class[InputFormat[Writable, Writable]] = {
+
+    var ifc = Utils.classForName(inputClassName)
+      .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+    if (conf.getConf(HiveUtils.HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED)) {
+      if ("org.apache.hadoop.mapreduce.lib.input.TextInputFormat"
+        .equals(inputClassName)) {
+        ifc = Utils.classForName(
+          "org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat")
+          .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+      }
+      if ("org.apache.hadoop.mapred.TextInputFormat"
+        .equals(inputClassName)) {
+        ifc = Utils.classForName(
+          "org.apache.hadoop.mapred.lib.CombineTextInputFormat")
+          .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+      }
+    }
+    ifc
+  }
 }
 
 private[hive] object HiveTableUtil {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index 3f9bb8de42e09..3371bce7087e5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -192,4 +192,47 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
       case p: HiveTableScanExec => p
     }.get
   }
-}
+
+  test("Test the InputFormat optimizer") {
+    withTable("table_old", "table_pt_old", "table_new", "table_pt_new") {
+      sql("set spark.sql.hive.fileInputFormat.enabled=true")
+      sql("set spark.sql.hive.fileInputFormat.split.maxsize=134217728")
+      sql("set spark.sql.hive.fileInputFormat.split.minsize=134217728")
+      sql(
+        s"""
+           |CREATE TABLE table_old (id int)
+           |STORED AS
+           |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+           |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+         """.stripMargin)
+      sql(
+        s"""
+           |CREATE TABLE table_pt_old (id int)
+           |PARTITIONED BY (a int, b int)
+           |STORED AS
+           |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+           |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+         """.stripMargin)
+      sql(
+        s"""
+           |CREATE TABLE table_new (id int)
+           |STORED AS
+           |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+           |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+       """.stripMargin)
+      sql(
+        s"""
+           |CREATE TABLE table_pt_new (id int)
+           |PARTITIONED BY (a int, b int)
+           |STORED AS
+           |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+           |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+       """.stripMargin)
+
+      sql("SELECT count(1) FROM table_old").show()
+      sql("SELECT count(1) FROM table_pt_old").show()
+      sql("SELECT count(1) FROM table_new").show()
+      sql("SELECT count(1) FROM table_pt_new").show()
+    }
+  }
+}
\ No newline at end of file


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org