You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/09/20 16:53:31 UTC
spark git commit: [SPARK-17051][SQL] we should use hadoopConf in
InsertIntoHiveTable
Repository: spark
Updated Branches:
refs/heads/master d5ec5dbb0 -> eb004c662
[SPARK-17051][SQL] we should use hadoopConf in InsertIntoHiveTable
## What changes were proposed in this pull request?
Hive confs in hive-site.xml will be loaded in `hadoopConf`, so we should use `hadoopConf` in `InsertIntoHiveTable` instead of `SessionState.conf`
## How was this patch tested?
N/A
Author: Wenchen Fan <we...@databricks.com>
Closes #14634 from cloud-fan/bug.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb004c66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb004c66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb004c66
Branch: refs/heads/master
Commit: eb004c66200da7df36dd0a9a11999fc352197916
Parents: d5ec5db
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Sep 20 09:53:28 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Sep 20 09:53:28 2016 -0700
----------------------------------------------------------------------
.../hive/execution/InsertIntoHiveTable.scala | 9 ++----
.../sql/hive/execution/HiveQuerySuite.scala | 32 +++++++++++++++++---
2 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/eb004c66/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 7eec9c7..53bb3b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -147,8 +147,7 @@ case class InsertIntoHiveTable(
val hadoopConf = sessionState.newHadoopConf()
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
- val isCompressed =
- sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
+ val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
@@ -182,15 +181,13 @@ case class InsertIntoHiveTable(
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
- if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
+ if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}
// Report error if dynamic partition strict mode is on but no static partition is found
if (numStaticPartitions == 0 &&
- sessionState.conf.getConfString(
- "hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
- {
+ hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/eb004c66/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 1d1a958..2b945db 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -26,16 +26,17 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkException, SparkFiles}
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
+import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
case class TestData(a: Int, b: String)
@@ -43,7 +44,7 @@ case class TestData(a: Int, b: String)
* A set of test cases expressed in Hive QL that are not covered by the tests
* included in the hive distribution.
*/
-class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
+class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAndAfter {
private val originalTimeZone = TimeZone.getDefault
private val originalLocale = Locale.getDefault
@@ -51,6 +52,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
+ def spark: SparkSession = sparkSession
+
override def beforeAll() {
super.beforeAll()
TestHive.setCacheTables(true)
@@ -1199,6 +1202,27 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
assertUnsupportedFeature { sql("DROP TEMPORARY MACRO SIGMOID") }
}
+
+ test("dynamic partitioning is allowed when hive.exec.dynamic.partition.mode is nonstrict") {
+ val modeConfKey = "hive.exec.dynamic.partition.mode"
+ withTable("with_parts") {
+ sql("CREATE TABLE with_parts(key INT) PARTITIONED BY (p INT)")
+
+ withSQLConf(modeConfKey -> "nonstrict") {
+ sql("INSERT OVERWRITE TABLE with_parts partition(p) select 1, 2")
+ assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2))
+ }
+
+ val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict")
+ try {
+ spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict")
+ sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4")
+ assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4))
+ } finally {
+ spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue)
+ }
+ }
+ }
}
// for SPARK-2180 test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org