You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/09/20 16:53:31 UTC

spark git commit: [SPARK-17051][SQL] we should use hadoopConf in InsertIntoHiveTable

Repository: spark
Updated Branches:
  refs/heads/master d5ec5dbb0 -> eb004c662


[SPARK-17051][SQL] we should use hadoopConf in InsertIntoHiveTable

## What changes were proposed in this pull request?

Hive confs in hive-site.xml will be loaded in `hadoopConf`, so we should use `hadoopConf` in `InsertIntoHiveTable` instead of `SessionState.conf`

## How was this patch tested?

N/A

Author: Wenchen Fan <we...@databricks.com>

Closes #14634 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb004c66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb004c66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb004c66

Branch: refs/heads/master
Commit: eb004c66200da7df36dd0a9a11999fc352197916
Parents: d5ec5db
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Sep 20 09:53:28 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Sep 20 09:53:28 2016 -0700

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    |  9 ++----
 .../sql/hive/execution/HiveQuerySuite.scala     | 32 +++++++++++++++++---
 2 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eb004c66/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 7eec9c7..53bb3b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -147,8 +147,7 @@ case class InsertIntoHiveTable(
     val hadoopConf = sessionState.newHadoopConf()
     val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
-    val isCompressed =
-      sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
+    val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
 
     if (isCompressed) {
       // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
@@ -182,15 +181,13 @@ case class InsertIntoHiveTable(
     // Validate partition spec if there exist any dynamic partitions
     if (numDynamicPartitions > 0) {
       // Report error if dynamic partitioning is not enabled
-      if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
+      if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
         throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
       }
 
       // Report error if dynamic partition strict mode is on but no static partition is found
       if (numStaticPartitions == 0 &&
-          sessionState.conf.getConfString(
-            "hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
-      {
+        hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
         throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eb004c66/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 1d1a958..2b945db 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -26,16 +26,17 @@ import scala.util.Try
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkException, SparkFiles}
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Cast
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
 import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
+import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
 
 case class TestData(a: Int, b: String)
 
@@ -43,7 +44,7 @@ case class TestData(a: Int, b: String)
  * A set of test cases expressed in Hive QL that are not covered by the tests
  * included in the hive distribution.
  */
-class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
+class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAndAfter {
   private val originalTimeZone = TimeZone.getDefault
   private val originalLocale = Locale.getDefault
 
@@ -51,6 +52,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
 
   private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
 
+  def spark: SparkSession = sparkSession
+
   override def beforeAll() {
     super.beforeAll()
     TestHive.setCacheTables(true)
@@ -1199,6 +1202,27 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
     }
     assertUnsupportedFeature { sql("DROP TEMPORARY MACRO SIGMOID") }
   }
+
+  test("dynamic partitioning is allowed when hive.exec.dynamic.partition.mode is nonstrict") {
+    val modeConfKey = "hive.exec.dynamic.partition.mode"
+    withTable("with_parts") {
+      sql("CREATE TABLE with_parts(key INT) PARTITIONED BY (p INT)")
+
+      withSQLConf(modeConfKey -> "nonstrict") {
+        sql("INSERT OVERWRITE TABLE with_parts partition(p) select 1, 2")
+        assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2))
+      }
+
+      val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict")
+      try {
+        spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict")
+        sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4")
+        assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4))
+      } finally {
+        spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue)
+      }
+    }
+  }
 }
 
 // for SPARK-2180 test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org