You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wangyum <gi...@git.apache.org> on 2017/12/02 00:39:45 UTC

[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...

Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19841#discussion_r154480032
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -104,147 +105,153 @@ case class InsertIntoHiveTable(
         val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
         val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
     
    -    // By this time, the partition map must match the table's partition columns
    -    if (partitionColumnNames.toSet != partition.keySet) {
    -      throw new SparkException(
    -        s"""Requested partitioning does not match the ${table.identifier.table} table:
    -           |Requested partitions: ${partition.keys.mkString(",")}
    -           |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
    -    }
    -
    -    // Validate partition spec if there exist any dynamic partitions
    -    if (numDynamicPartitions > 0) {
    -      // Report error if dynamic partitioning is not enabled
    -      if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
    -        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
    +    def processInsert = {
    +      // By this time, the partition map must match the table's partition columns
    +      if (partitionColumnNames.toSet != partition.keySet) {
    +        throw new SparkException(
    +          s"""Requested partitioning does not match the ${table.identifier.table} table:
    +             |Requested partitions: ${partition.keys.mkString(",")}
    +             |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
           }
     
    -      // Report error if dynamic partition strict mode is on but no static partition is found
    -      if (numStaticPartitions == 0 &&
    -        hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
    -        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
    -      }
    +      // Validate partition spec if there exist any dynamic partitions
    +      if (numDynamicPartitions > 0) {
    +        // Report error if dynamic partitioning is not enabled
    +        if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
    +          throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
    +        }
    +
    +        // Report error if dynamic partition strict mode is on but no static partition is found
    +        if (numStaticPartitions == 0 &&
    +          hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
    +          throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
    +        }
     
    -      // Report error if any static partition appears after a dynamic partition
    -      val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
    -      if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
    -        throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
    +        // Report error if any static partition appears after a dynamic partition
    +        val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
    +        if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
    +          throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
    +        }
           }
    -    }
     
    -    table.bucketSpec match {
    -      case Some(bucketSpec) =>
    -        // Writes to bucketed hive tables are allowed only if user does not care about maintaining
    -        // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
    -        // set to false
    -        val enforceBucketingConfig = "hive.enforce.bucketing"
    -        val enforceSortingConfig = "hive.enforce.sorting"
    +      table.bucketSpec match {
    +        case Some(bucketSpec) =>
    +          // Writes to bucketed hive tables are allowed only if user does not care about maintaining
    +          // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
    +          // set to false
    +          val enforceBucketingConfig = "hive.enforce.bucketing"
    +          val enforceSortingConfig = "hive.enforce.sorting"
     
    -        val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
    -          "currently does NOT populate bucketed output which is compatible with Hive."
    +          val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
    +            "currently does NOT populate bucketed output which is compatible with Hive."
     
    -        if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
    -          hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
    -          throw new AnalysisException(message)
    -        } else {
    -          logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " +
    -            s"$enforceSortingConfig are set to false.")
    -        }
    -      case _ => // do nothing since table has no bucketing
    -    }
    +          if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
    +            hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
    +            throw new AnalysisException(message)
    +          } else {
    +            logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and "
    +              + s"$enforceSortingConfig are set to false.")
    +          }
    +        case _ => // do nothing since table has no bucketing
    +      }
     
    -    val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
    -      query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
    -        throw new AnalysisException(
    -          s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
    -      }.asInstanceOf[Attribute]
    -    }
    +      val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
    +        query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
    +          throw new AnalysisException(
    +            s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
    +        }.asInstanceOf[Attribute]
    +      }
     
    -    saveAsHiveFile(
    -      sparkSession = sparkSession,
    -      queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
    -      hadoopConf = hadoopConf,
    -      fileSinkConf = fileSinkConf,
    -      outputLocation = tmpLocation.toString,
    -      partitionAttributes = partitionAttributes)
    +      saveAsHiveFile(
    +        sparkSession = sparkSession,
    +        queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
    +        hadoopConf = hadoopConf,
    +        fileSinkConf = fileSinkConf,
    +        outputLocation = tmpLocation.toString,
    +        partitionAttributes = partitionAttributes)
     
    -    if (partition.nonEmpty) {
    -      if (numDynamicPartitions > 0) {
    -        externalCatalog.loadDynamicPartitions(
    -          db = table.database,
    -          table = table.identifier.table,
    -          tmpLocation.toString,
    -          partitionSpec,
    -          overwrite,
    -          numDynamicPartitions)
    -      } else {
    -        // scalastyle:off
    -        // ifNotExists is only valid with static partition, refer to
    -        // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
    -        // scalastyle:on
    -        val oldPart =
    +      if (partition.nonEmpty) {
    +        if (numDynamicPartitions > 0) {
    +          externalCatalog.loadDynamicPartitions(
    +            db = table.database,
    +            table = table.identifier.table,
    +            tmpLocation.toString,
    +            partitionSpec,
    +            overwrite,
    +            numDynamicPartitions)
    +        } else {
    +          // scalastyle:off
    +          // ifNotExists is only valid with static partition, refer to
    +          // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
    +          // scalastyle:on
    +          val oldPart =
               externalCatalog.getPartitionOption(
                 table.database,
                 table.identifier.table,
                 partitionSpec)
     
    -        var doHiveOverwrite = overwrite
    -
    -        if (oldPart.isEmpty || !ifPartitionNotExists) {
    -          // SPARK-18107: Insert overwrite runs much slower than hive-client.
    -          // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
    -          // version and we may not want to catch up new Hive version every time. We delete the
    -          // Hive partition first and then load data file into the Hive partition.
    -          if (oldPart.nonEmpty && overwrite) {
    -            oldPart.get.storage.locationUri.foreach { uri =>
    -              val partitionPath = new Path(uri)
    -              val fs = partitionPath.getFileSystem(hadoopConf)
    -              if (fs.exists(partitionPath)) {
    -                if (!fs.delete(partitionPath, true)) {
    -                  throw new RuntimeException(
    -                    "Cannot remove partition directory '" + partitionPath.toString)
    +          var doHiveOverwrite = overwrite
    +
    +          if (oldPart.isEmpty || !ifPartitionNotExists) {
    +            // SPARK-18107: Insert overwrite runs much slower than hive-client.
    +            // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
    +            // version and we may not want to catch up new Hive version every time. We delete the
    +            // Hive partition first and then load data file into the Hive partition.
    +            if (oldPart.nonEmpty && overwrite) {
    +              oldPart.get.storage.locationUri.foreach { uri =>
    +                val partitionPath = new Path(uri)
    +                val fs = partitionPath.getFileSystem(hadoopConf)
    +                if (fs.exists(partitionPath)) {
    +                  if (!fs.delete(partitionPath, true)) {
    +                    throw new RuntimeException(
    +                      "Cannot remove partition directory '" + partitionPath.toString)
    +                  }
    +                  // Don't let Hive do overwrite operation since it is slower.
    +                  doHiveOverwrite = false
                     }
    -                // Don't let Hive do overwrite operation since it is slower.
    -                doHiveOverwrite = false
                   }
                 }
    -          }
     
    -          // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
    -          // which is currently considered as a Hive native command.
    -          val inheritTableSpecs = true
    -          externalCatalog.loadPartition(
    -            table.database,
    -            table.identifier.table,
    -            tmpLocation.toString,
    -            partitionSpec,
    -            isOverwrite = doHiveOverwrite,
    -            inheritTableSpecs = inheritTableSpecs,
    -            isSrcLocal = false)
    +            // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
    --- End diff --
    
                l
              l
    The number of spaces is different


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org