You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by GitBox <gi...@apache.org> on 2019/01/16 06:10:21 UTC
[spark] Diff for: [GitHub] HyukjinKwon closed pull request #23506:
[SPARK-26577][SQL] Add input optimizer when reading Hive table by SparkSQL
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 597eef129f63e..688376e634f9d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -128,6 +128,12 @@ private[spark] object HiveUtils extends Logging {
.toSequence
.createWithDefault(jdbcPrefixes)
+ val HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED =
+ buildConf("spark.sql.hive.inputFormat.optimizer.enabled")
+ .doc("When true, enable the optimizer of `fileInputFormat` in Spark SQL.")
+ .booleanConf
+ .createWithDefault(false)
+
private def jdbcPrefixes = Seq(
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 536bc4a3f4ec4..ffc76bffa5bcd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -123,8 +123,7 @@ class HadoopTableReader(
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
// logDebug("Table input: %s".format(tablePath))
- val ifc = hiveTable.getInputFormatClass
- .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ val ifc = getAndOptimizeInput(hiveTable.getInputFormatClass.getName)
val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)
val attrsWithIndex = attributes.zipWithIndex
@@ -164,7 +163,7 @@ class HadoopTableReader(
def verifyPartitionPath(
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
Map[HivePartition, Class[_ <: Deserializer]] = {
- if (!sparkSession.sessionState.conf.verifyPartitionPath) {
+ if (!conf.verifyPartitionPath) {
partitionToDeserializer
} else {
val existPathSet = collection.mutable.Set[String]()
@@ -202,8 +201,7 @@ class HadoopTableReader(
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = partition.getDataLocation
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
- val ifc = partDesc.getInputFileFormatClass
- .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ val ifc = getAndOptimizeInput(partDesc.getInputFileFormatClassName)
// Get partition field info
val partSpec = partDesc.getPartSpec
val partProps = partDesc.getProperties
@@ -311,6 +309,32 @@ class HadoopTableReader(
// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)
}
+
+ /**
+ * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will optimize the input
+ * method(including format and the size of splits) while reading Hive tables.
+ */
+ private def getAndOptimizeInput(
+ inputClassName: String): Class[InputFormat[Writable, Writable]] = {
+
+ var ifc = Utils.classForName(inputClassName)
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ if (conf.getConf(HiveUtils.HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED)) {
+ if ("org.apache.hadoop.mapreduce.lib.input.TextInputFormat"
+ .equals(inputClassName)) {
+ ifc = Utils.classForName(
+ "org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat")
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ }
+ if ("org.apache.hadoop.mapred.TextInputFormat"
+ .equals(inputClassName)) {
+ ifc = Utils.classForName(
+ "org.apache.hadoop.mapred.lib.CombineTextInputFormat")
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ }
+ }
+ ifc
+ }
}
private[hive] object HiveTableUtil {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index 3f9bb8de42e09..3371bce7087e5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -192,4 +192,47 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
case p: HiveTableScanExec => p
}.get
}
-}
+
+ test("Test the InputFormat optimizer") {
+ withTable("table_old", "table_pt_old", "table_new", "table_pt_new") {
+ sql("set spark.sql.hive.fileInputFormat.enabled=true")
+ sql("set spark.sql.hive.fileInputFormat.split.maxsize=134217728")
+ sql("set spark.sql.hive.fileInputFormat.split.minsize=134217728")
+ sql(
+ s"""
+ |CREATE TABLE table_old (id int)
+ |STORED AS
+ |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+ |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+ sql(
+ s"""
+ |CREATE TABLE table_pt_old (id int)
+ |PARTITIONED BY (a int, b int)
+ |STORED AS
+ |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+ |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+ sql(
+ s"""
+ |CREATE TABLE table_new (id int)
+ |STORED AS
+ |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+ |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+ sql(
+ s"""
+ |CREATE TABLE table_pt_new (id int)
+ |PARTITIONED BY (a int, b int)
+ |STORED AS
+ |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+ |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+
+ sql("SELECT count(1) FROM table_old").show()
+ sql("SELECT count(1) FROM table_pt_old").show()
+ sql("SELECT count(1) FROM table_new").show()
+ sql("SELECT count(1) FROM table_pt_new").show()
+ }
+ }
+}
\ No newline at end of file
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org