You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wangyum <gi...@git.apache.org> on 2017/12/02 00:39:45 UTC
[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...
Github user wangyum commented on a diff in the pull request:
https://github.com/apache/spark/pull/19841#discussion_r154480032
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
@@ -104,147 +105,153 @@ case class InsertIntoHiveTable(
val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
- // By this time, the partition map must match the table's partition columns
- if (partitionColumnNames.toSet != partition.keySet) {
- throw new SparkException(
- s"""Requested partitioning does not match the ${table.identifier.table} table:
- |Requested partitions: ${partition.keys.mkString(",")}
- |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
- }
-
- // Validate partition spec if there exist any dynamic partitions
- if (numDynamicPartitions > 0) {
- // Report error if dynamic partitioning is not enabled
- if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
- throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
+ def processInsert = {
+ // By this time, the partition map must match the table's partition columns
+ if (partitionColumnNames.toSet != partition.keySet) {
+ throw new SparkException(
+ s"""Requested partitioning does not match the ${table.identifier.table} table:
+ |Requested partitions: ${partition.keys.mkString(",")}
+ |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
}
- // Report error if dynamic partition strict mode is on but no static partition is found
- if (numStaticPartitions == 0 &&
- hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
- throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
- }
+ // Validate partition spec if there exist any dynamic partitions
+ if (numDynamicPartitions > 0) {
+ // Report error if dynamic partitioning is not enabled
+ if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
+ throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
+ }
+
+ // Report error if dynamic partition strict mode is on but no static partition is found
+ if (numStaticPartitions == 0 &&
+ hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
+ throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
+ }
- // Report error if any static partition appears after a dynamic partition
- val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
- if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
- throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
+ // Report error if any static partition appears after a dynamic partition
+ val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
+ if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
+ throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
+ }
}
- }
- table.bucketSpec match {
- case Some(bucketSpec) =>
- // Writes to bucketed hive tables are allowed only if user does not care about maintaining
- // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
- // set to false
- val enforceBucketingConfig = "hive.enforce.bucketing"
- val enforceSortingConfig = "hive.enforce.sorting"
+ table.bucketSpec match {
+ case Some(bucketSpec) =>
+ // Writes to bucketed hive tables are allowed only if user does not care about maintaining
+ // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
+ // set to false
+ val enforceBucketingConfig = "hive.enforce.bucketing"
+ val enforceSortingConfig = "hive.enforce.sorting"
- val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
- "currently does NOT populate bucketed output which is compatible with Hive."
+ val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
+ "currently does NOT populate bucketed output which is compatible with Hive."
- if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
- hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
- throw new AnalysisException(message)
- } else {
- logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " +
- s"$enforceSortingConfig are set to false.")
- }
- case _ => // do nothing since table has no bucketing
- }
+ if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
+ hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
+ throw new AnalysisException(message)
+ } else {
+ logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and "
+ + s"$enforceSortingConfig are set to false.")
+ }
+ case _ => // do nothing since table has no bucketing
+ }
- val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
- query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
- throw new AnalysisException(
- s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
- }.asInstanceOf[Attribute]
- }
+ val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
+ query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
+ throw new AnalysisException(
+ s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
+ }.asInstanceOf[Attribute]
+ }
- saveAsHiveFile(
- sparkSession = sparkSession,
- queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
- hadoopConf = hadoopConf,
- fileSinkConf = fileSinkConf,
- outputLocation = tmpLocation.toString,
- partitionAttributes = partitionAttributes)
+ saveAsHiveFile(
+ sparkSession = sparkSession,
+ queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
+ hadoopConf = hadoopConf,
+ fileSinkConf = fileSinkConf,
+ outputLocation = tmpLocation.toString,
+ partitionAttributes = partitionAttributes)
- if (partition.nonEmpty) {
- if (numDynamicPartitions > 0) {
- externalCatalog.loadDynamicPartitions(
- db = table.database,
- table = table.identifier.table,
- tmpLocation.toString,
- partitionSpec,
- overwrite,
- numDynamicPartitions)
- } else {
- // scalastyle:off
- // ifNotExists is only valid with static partition, refer to
- // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
- // scalastyle:on
- val oldPart =
+ if (partition.nonEmpty) {
+ if (numDynamicPartitions > 0) {
+ externalCatalog.loadDynamicPartitions(
+ db = table.database,
+ table = table.identifier.table,
+ tmpLocation.toString,
+ partitionSpec,
+ overwrite,
+ numDynamicPartitions)
+ } else {
+ // scalastyle:off
+ // ifNotExists is only valid with static partition, refer to
+ // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
+ // scalastyle:on
+ val oldPart =
externalCatalog.getPartitionOption(
table.database,
table.identifier.table,
partitionSpec)
- var doHiveOverwrite = overwrite
-
- if (oldPart.isEmpty || !ifPartitionNotExists) {
- // SPARK-18107: Insert overwrite runs much slower than hive-client.
- // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
- // version and we may not want to catch up new Hive version every time. We delete the
- // Hive partition first and then load data file into the Hive partition.
- if (oldPart.nonEmpty && overwrite) {
- oldPart.get.storage.locationUri.foreach { uri =>
- val partitionPath = new Path(uri)
- val fs = partitionPath.getFileSystem(hadoopConf)
- if (fs.exists(partitionPath)) {
- if (!fs.delete(partitionPath, true)) {
- throw new RuntimeException(
- "Cannot remove partition directory '" + partitionPath.toString)
+ var doHiveOverwrite = overwrite
+
+ if (oldPart.isEmpty || !ifPartitionNotExists) {
+ // SPARK-18107: Insert overwrite runs much slower than hive-client.
+ // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
+ // version and we may not want to catch up new Hive version every time. We delete the
+ // Hive partition first and then load data file into the Hive partition.
+ if (oldPart.nonEmpty && overwrite) {
+ oldPart.get.storage.locationUri.foreach { uri =>
+ val partitionPath = new Path(uri)
+ val fs = partitionPath.getFileSystem(hadoopConf)
+ if (fs.exists(partitionPath)) {
+ if (!fs.delete(partitionPath, true)) {
+ throw new RuntimeException(
+ "Cannot remove partition directory '" + partitionPath.toString)
+ }
+ // Don't let Hive do overwrite operation since it is slower.
+ doHiveOverwrite = false
}
- // Don't let Hive do overwrite operation since it is slower.
- doHiveOverwrite = false
}
}
- }
- // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
- // which is currently considered as a Hive native command.
- val inheritTableSpecs = true
- externalCatalog.loadPartition(
- table.database,
- table.identifier.table,
- tmpLocation.toString,
- partitionSpec,
- isOverwrite = doHiveOverwrite,
- inheritTableSpecs = inheritTableSpecs,
- isSrcLocal = false)
+ // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
--- End diff --
l
l
The number of spaces is different
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org