You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/27 18:31:00 UTC

spark git commit: [SPARK-14949][SQL] Remove HiveConf dependency from InsertIntoHiveTable

Repository: spark
Updated Branches:
  refs/heads/master 08dc89361 -> ea017b557


[SPARK-14949][SQL] Remove HiveConf dependency from InsertIntoHiveTable

## What changes were proposed in this pull request?
This patch removes the use of HiveConf from InsertIntoHiveTable. I think this is the last major use of HiveConf and after this we can try to remove the execution HiveConf.

## How was this patch tested?
Internal refactoring and should be covered by existing tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #12728 from rxin/SPARK-14949.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea017b55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea017b55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea017b55

Branch: refs/heads/master
Commit: ea017b5574cd26b90f586a3f5bf8ccab3fe02e4b
Parents: 08dc893
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Apr 27 09:30:57 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Apr 27 09:30:57 2016 -0700

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    | 30 +++++++++-----------
 1 file changed, 14 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ea017b55/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3cb6081..cba10ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -21,8 +21,6 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
 
@@ -35,7 +33,7 @@ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.SparkException
 import org.apache.spark.util.SerializableJobConf
 
-private[hive]
+
 case class InsertIntoHiveTable(
     table: MetastoreRelation,
     partition: Map[String, Option[String]],
@@ -45,8 +43,6 @@ case class InsertIntoHiveTable(
 
   @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
   @transient private val client = sessionState.metadataHive
-  @transient private val hiveconf = sessionState.hiveconf
-  @transient private lazy val hiveContext = new Context(hiveconf)
 
   def output: Seq[Attribute] = Seq.empty
 
@@ -70,7 +66,6 @@ case class InsertIntoHiveTable(
     writerContainer.driverSideSetup()
     sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
     writerContainer.commitJob()
-
   }
 
   /**
@@ -85,19 +80,20 @@ case class InsertIntoHiveTable(
     // instances within the closure, since Serializer is not serializable while TableDesc is.
     val tableDesc = table.tableDesc
     val tableLocation = table.hiveQlTable.getDataLocation
-    val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
+    val hadoopConf = sessionState.newHadoopConf()
+    val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
-    val isCompressed = hiveconf.getBoolean(
-      ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
+    val isCompressed =
+      sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
 
     if (isCompressed) {
       // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
       // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
       // to store compression information.
-      hiveconf.set("mapred.output.compress", "true")
+      hadoopConf.set("mapred.output.compress", "true")
       fileSinkConf.setCompressed(true)
-      fileSinkConf.setCompressCodec(hiveconf.get("mapred.output.compression.codec"))
-      fileSinkConf.setCompressType(hiveconf.get("mapred.output.compression.type"))
+      fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec"))
+      fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type"))
     }
 
     val numDynamicPartitions = partition.values.count(_.isEmpty)
@@ -114,13 +110,15 @@ case class InsertIntoHiveTable(
     // Validate partition spec if there exist any dynamic partitions
     if (numDynamicPartitions > 0) {
       // Report error if dynamic partitioning is not enabled
-      if (!hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
+      if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
         throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
       }
 
       // Report error if dynamic partition strict mode is on but no static partition is found
-      if (numStaticPartitions == 0 && hiveconf.getVar(
-          HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
+      if (numStaticPartitions == 0 &&
+          sessionState.conf.getConfString(
+            "hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
+      {
         throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
       }
 
@@ -131,7 +129,7 @@ case class InsertIntoHiveTable(
       }
     }
 
-    val jobConf = new JobConf(hiveconf)
+    val jobConf = new JobConf(hadoopConf)
     val jobConfSer = new SerializableJobConf(jobConf)
 
     // When speculation is on and output committer class name contains "Direct", we should warn


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org