You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/01 07:24:12 UTC
spark git commit: [SPARK-18107][SQL] Insert overwrite statement runs
much slower in spark-sql than it does in hive-client
Repository: spark
Updated Branches:
refs/heads/master d9d146500 -> dd85eb544
[SPARK-18107][SQL] Insert overwrite statement runs much slower in spark-sql than it does in hive-client
## What changes were proposed in this pull request?
As reported on the jira, insert overwrite statement runs much slower in Spark, compared with hive-client.
It seems there is a patch [HIVE-11940](https://github.com/apache/hive/commit/ba21806b77287e237e1aa68fa169d2a81e07346d) which largely improves insert overwrite performance on Hive. HIVE-11940 is patched after Hive 2.0.0.
Because Spark SQL uses older Hive library, we can not benefit from such improvement.
The reporter verified that there is also a big performance gap between Hive 1.2.1 (520.037 secs) and Hive 2.0.1 (35.975 secs) on insert overwrite execution.
Instead of upgrading to Hive 2.0 in Spark SQL, which might not be a trivial task, this patch provides an approach to delete the partition before asking Hive to load data files into the partition.
Note: The case reported on the jira is insert overwrite to partition. Since `Hive.loadTable` also uses the function to replace files, insert overwrite to table should has the same issue. We can take the same approach to delete the table first. I will upgrade this to include this.
## How was this patch tested?
Jenkins tests.
There are existing tests using insert overwrite statement. Those tests should be passed. I added a new test to specially test insert overwrite into partition.
For performance issue, as I don't have Hive 2.0 environment, this needs the reporter to verify it. Please refer to the jira.
Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #15667 from viirya/improve-hive-insertoverwrite.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd85eb54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd85eb54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd85eb54
Branch: refs/heads/master
Commit: dd85eb5448c8f2672260b57e94c0da0eaac12616
Parents: d9d1465
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Tue Nov 1 00:24:08 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 1 00:24:08 2016 -0700
----------------------------------------------------------------------
.../hive/execution/InsertIntoHiveTable.scala | 24 +++++++++++++-
.../sql/hive/execution/SQLQuerySuite.scala | 33 ++++++++++++++++++++
2 files changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/dd85eb54/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index c3c4e29..2843100 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
@@ -257,7 +258,28 @@ case class InsertIntoHiveTable(
table.catalogTable.identifier.table,
partitionSpec)
+ var doHiveOverwrite = overwrite
+
if (oldPart.isEmpty || !ifNotExists) {
+ // SPARK-18107: Insert overwrite runs much slower than hive-client.
+ // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
+ // version and we may not want to catch up new Hive version every time. We delete the
+ // Hive partition first and then load data file into the Hive partition.
+ if (oldPart.nonEmpty && overwrite) {
+ oldPart.get.storage.locationUri.map { uri =>
+ val partitionPath = new Path(uri)
+ val fs = partitionPath.getFileSystem(hadoopConf)
+ if (fs.exists(partitionPath)) {
+ if (!fs.delete(partitionPath, true)) {
+ throw new RuntimeException(
+ "Cannot remove partition directory '" + partitionPath.toString)
+ }
+ // Don't let Hive do overwrite operation since it is slower.
+ doHiveOverwrite = false
+ }
+ }
+ }
+
// inheritTableSpecs is set to true. It should be set to false for an IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
@@ -266,7 +288,7 @@ case class InsertIntoHiveTable(
table.catalogTable.identifier.table,
outputPath.toString,
partitionSpec,
- isOverwrite = overwrite,
+ isOverwrite = doHiveOverwrite,
holdDDLTime = holdDDLTime,
inheritTableSpecs = inheritTableSpecs)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/dd85eb54/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f64010a..8b91693 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1973,6 +1973,39 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
+ test("Insert overwrite with partition") {
+ withTable("tableWithPartition") {
+ sql(
+ """
+ |CREATE TABLE tableWithPartition (key int, value STRING)
+ |PARTITIONED BY (part STRING)
+ """.stripMargin)
+ sql(
+ """
+ |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1')
+ |SELECT * FROM default.src
+ """.stripMargin)
+ checkAnswer(
+ sql("SELECT part, key, value FROM tableWithPartition"),
+ sql("SELECT '1' AS part, key, value FROM default.src")
+ )
+
+ sql(
+ """
+ |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1')
+ |SELECT * FROM VALUES (1, "one"), (2, "two"), (3, null) AS data(key, value)
+ """.stripMargin)
+ checkAnswer(
+ sql("SELECT part, key, value FROM tableWithPartition"),
+ sql(
+ """
+ |SELECT '1' AS part, key, value FROM VALUES
+ |(1, "one"), (2, "two"), (3, null) AS data(key, value)
+ """.stripMargin)
+ )
+ }
+ }
+
def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org