You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/27 18:31:00 UTC
spark git commit: [SPARK-14949][SQL] Remove HiveConf dependency from
InsertIntoHiveTable
Repository: spark
Updated Branches:
refs/heads/master 08dc89361 -> ea017b557
[SPARK-14949][SQL] Remove HiveConf dependency from InsertIntoHiveTable
## What changes were proposed in this pull request?
This patch removes the use of HiveConf from InsertIntoHiveTable. I think this is the last major use of HiveConf and after this we can try to remove the execution HiveConf.
## How was this patch tested?
Internal refactoring and should be covered by existing tests.
Author: Reynold Xin <rx...@databricks.com>
Closes #12728 from rxin/SPARK-14949.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea017b55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea017b55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea017b55
Branch: refs/heads/master
Commit: ea017b5574cd26b90f586a3f5bf8ccab3fe02e4b
Parents: 08dc893
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Apr 27 09:30:57 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Apr 27 09:30:57 2016 -0700
----------------------------------------------------------------------
.../hive/execution/InsertIntoHiveTable.scala | 30 +++++++++-----------
1 file changed, 14 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ea017b55/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3cb6081..cba10ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -21,8 +21,6 @@ import java.util
import scala.collection.JavaConverters._
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
@@ -35,7 +33,7 @@ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
import org.apache.spark.util.SerializableJobConf
-private[hive]
+
case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
@@ -45,8 +43,6 @@ case class InsertIntoHiveTable(
@transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
@transient private val client = sessionState.metadataHive
- @transient private val hiveconf = sessionState.hiveconf
- @transient private lazy val hiveContext = new Context(hiveconf)
def output: Seq[Attribute] = Seq.empty
@@ -70,7 +66,6 @@ case class InsertIntoHiveTable(
writerContainer.driverSideSetup()
sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
writerContainer.commitJob()
-
}
/**
@@ -85,19 +80,20 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
- val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
+ val hadoopConf = sessionState.newHadoopConf()
+ val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
- val isCompressed = hiveconf.getBoolean(
- ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
+ val isCompressed =
+ sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
- hiveconf.set("mapred.output.compress", "true")
+ hadoopConf.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(hiveconf.get("mapred.output.compression.codec"))
- fileSinkConf.setCompressType(hiveconf.get("mapred.output.compression.type"))
+ fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec"))
+ fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type"))
}
val numDynamicPartitions = partition.values.count(_.isEmpty)
@@ -114,13 +110,15 @@ case class InsertIntoHiveTable(
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
- if (!hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
+ if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}
// Report error if dynamic partition strict mode is on but no static partition is found
- if (numStaticPartitions == 0 && hiveconf.getVar(
- HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
+ if (numStaticPartitions == 0 &&
+ sessionState.conf.getConfString(
+ "hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
+ {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}
@@ -131,7 +129,7 @@ case class InsertIntoHiveTable(
}
}
- val jobConf = new JobConf(hiveconf)
+ val jobConf = new JobConf(hadoopConf)
val jobConfSer = new SerializableJobConf(jobConf)
// When speculation is on and output committer class name contains "Direct", we should warn
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org