You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zuotingbing <gi...@git.apache.org> on 2017/11/29 06:25:19 UTC

[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...

GitHub user zuotingbing opened a pull request:

    https://github.com/apache/spark/pull/19841

    [SPARK-22642][SQL] the createdTempDir will not be deleted if an exception occurs

    
    ## What changes were proposed in this pull request?
    
    We found staging directories will not be dropped sometimes in our production environment.
    The createdTempDir will not be deleted if an exception occurs, we should delete createdTempDir in finally.
    Refer to SPARK-18703。
    
    ## How was this patch tested?
    
    exist tests
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zuotingbing/spark SPARK-stagedir

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19841.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19841
    
----
commit 99d1fd74b7f25a7ddbda4dd01a8b4c03b3da778a
Author: zuotingbing <zu...@zte.com.cn>
Date:   2017-11-29T06:22:20Z

    [SPARK-22642][SQL] the createdTempDir will not be deleted if an exception occurs

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by zuotingbing <gi...@git.apache.org>.
Github user zuotingbing commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    I extract a separate function but it has too many parameters.  Could i extract several separate functions?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19841


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...

Posted by wangyum <gi...@git.apache.org>.
Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19841#discussion_r154480032
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -104,147 +105,153 @@ case class InsertIntoHiveTable(
         val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
         val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
     
    -    // By this time, the partition map must match the table's partition columns
    -    if (partitionColumnNames.toSet != partition.keySet) {
    -      throw new SparkException(
    -        s"""Requested partitioning does not match the ${table.identifier.table} table:
    -           |Requested partitions: ${partition.keys.mkString(",")}
    -           |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
    -    }
    -
    -    // Validate partition spec if there exist any dynamic partitions
    -    if (numDynamicPartitions > 0) {
    -      // Report error if dynamic partitioning is not enabled
    -      if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
    -        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
    +    def processInsert = {
    +      // By this time, the partition map must match the table's partition columns
    +      if (partitionColumnNames.toSet != partition.keySet) {
    +        throw new SparkException(
    +          s"""Requested partitioning does not match the ${table.identifier.table} table:
    +             |Requested partitions: ${partition.keys.mkString(",")}
    +             |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
           }
     
    -      // Report error if dynamic partition strict mode is on but no static partition is found
    -      if (numStaticPartitions == 0 &&
    -        hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
    -        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
    -      }
    +      // Validate partition spec if there exist any dynamic partitions
    +      if (numDynamicPartitions > 0) {
    +        // Report error if dynamic partitioning is not enabled
    +        if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
    +          throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
    +        }
    +
    +        // Report error if dynamic partition strict mode is on but no static partition is found
    +        if (numStaticPartitions == 0 &&
    +          hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
    +          throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
    +        }
     
    -      // Report error if any static partition appears after a dynamic partition
    -      val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
    -      if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
    -        throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
    +        // Report error if any static partition appears after a dynamic partition
    +        val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
    +        if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
    +          throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
    +        }
           }
    -    }
     
    -    table.bucketSpec match {
    -      case Some(bucketSpec) =>
    -        // Writes to bucketed hive tables are allowed only if user does not care about maintaining
    -        // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
    -        // set to false
    -        val enforceBucketingConfig = "hive.enforce.bucketing"
    -        val enforceSortingConfig = "hive.enforce.sorting"
    +      table.bucketSpec match {
    +        case Some(bucketSpec) =>
    +          // Writes to bucketed hive tables are allowed only if user does not care about maintaining
    +          // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
    +          // set to false
    +          val enforceBucketingConfig = "hive.enforce.bucketing"
    +          val enforceSortingConfig = "hive.enforce.sorting"
     
    -        val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
    -          "currently does NOT populate bucketed output which is compatible with Hive."
    +          val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
    +            "currently does NOT populate bucketed output which is compatible with Hive."
     
    -        if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
    -          hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
    -          throw new AnalysisException(message)
    -        } else {
    -          logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " +
    -            s"$enforceSortingConfig are set to false.")
    -        }
    -      case _ => // do nothing since table has no bucketing
    -    }
    +          if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
    +            hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
    +            throw new AnalysisException(message)
    +          } else {
    +            logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and "
    +              + s"$enforceSortingConfig are set to false.")
    +          }
    +        case _ => // do nothing since table has no bucketing
    +      }
     
    -    val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
    -      query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
    -        throw new AnalysisException(
    -          s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
    -      }.asInstanceOf[Attribute]
    -    }
    +      val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
    +        query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
    +          throw new AnalysisException(
    +            s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
    +        }.asInstanceOf[Attribute]
    +      }
     
    -    saveAsHiveFile(
    -      sparkSession = sparkSession,
    -      queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
    -      hadoopConf = hadoopConf,
    -      fileSinkConf = fileSinkConf,
    -      outputLocation = tmpLocation.toString,
    -      partitionAttributes = partitionAttributes)
    +      saveAsHiveFile(
    +        sparkSession = sparkSession,
    +        queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
    +        hadoopConf = hadoopConf,
    +        fileSinkConf = fileSinkConf,
    +        outputLocation = tmpLocation.toString,
    +        partitionAttributes = partitionAttributes)
     
    -    if (partition.nonEmpty) {
    -      if (numDynamicPartitions > 0) {
    -        externalCatalog.loadDynamicPartitions(
    -          db = table.database,
    -          table = table.identifier.table,
    -          tmpLocation.toString,
    -          partitionSpec,
    -          overwrite,
    -          numDynamicPartitions)
    -      } else {
    -        // scalastyle:off
    -        // ifNotExists is only valid with static partition, refer to
    -        // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
    -        // scalastyle:on
    -        val oldPart =
    +      if (partition.nonEmpty) {
    +        if (numDynamicPartitions > 0) {
    +          externalCatalog.loadDynamicPartitions(
    +            db = table.database,
    +            table = table.identifier.table,
    +            tmpLocation.toString,
    +            partitionSpec,
    +            overwrite,
    +            numDynamicPartitions)
    +        } else {
    +          // scalastyle:off
    +          // ifNotExists is only valid with static partition, refer to
    +          // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
    +          // scalastyle:on
    +          val oldPart =
               externalCatalog.getPartitionOption(
                 table.database,
                 table.identifier.table,
                 partitionSpec)
     
    -        var doHiveOverwrite = overwrite
    -
    -        if (oldPart.isEmpty || !ifPartitionNotExists) {
    -          // SPARK-18107: Insert overwrite runs much slower than hive-client.
    -          // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
    -          // version and we may not want to catch up new Hive version every time. We delete the
    -          // Hive partition first and then load data file into the Hive partition.
    -          if (oldPart.nonEmpty && overwrite) {
    -            oldPart.get.storage.locationUri.foreach { uri =>
    -              val partitionPath = new Path(uri)
    -              val fs = partitionPath.getFileSystem(hadoopConf)
    -              if (fs.exists(partitionPath)) {
    -                if (!fs.delete(partitionPath, true)) {
    -                  throw new RuntimeException(
    -                    "Cannot remove partition directory '" + partitionPath.toString)
    +          var doHiveOverwrite = overwrite
    +
    +          if (oldPart.isEmpty || !ifPartitionNotExists) {
    +            // SPARK-18107: Insert overwrite runs much slower than hive-client.
    +            // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
    +            // version and we may not want to catch up new Hive version every time. We delete the
    +            // Hive partition first and then load data file into the Hive partition.
    +            if (oldPart.nonEmpty && overwrite) {
    +              oldPart.get.storage.locationUri.foreach { uri =>
    +                val partitionPath = new Path(uri)
    +                val fs = partitionPath.getFileSystem(hadoopConf)
    +                if (fs.exists(partitionPath)) {
    +                  if (!fs.delete(partitionPath, true)) {
    +                    throw new RuntimeException(
    +                      "Cannot remove partition directory '" + partitionPath.toString)
    +                  }
    +                  // Don't let Hive do overwrite operation since it is slower.
    +                  doHiveOverwrite = false
                     }
    -                // Don't let Hive do overwrite operation since it is slower.
    -                doHiveOverwrite = false
                   }
                 }
    -          }
     
    -          // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
    -          // which is currently considered as a Hive native command.
    -          val inheritTableSpecs = true
    -          externalCatalog.loadPartition(
    -            table.database,
    -            table.identifier.table,
    -            tmpLocation.toString,
    -            partitionSpec,
    -            isOverwrite = doHiveOverwrite,
    -            inheritTableSpecs = inheritTableSpecs,
    -            isSrcLocal = false)
    +            // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
    --- End diff --
    
                l
              l
    The number of spaces is different


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    **[Test build #84423 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84423/testReport)** for PR 19841 at commit [`515b00a`](https://github.com/apache/spark/commit/515b00abf2d214d2a253ef3ccb213df670d64fd4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    Merged to master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by zuotingbing <gi...@git.apache.org>.
Github user zuotingbing commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    Yes i am also confused why the diff is so big but it  reported by git originally.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19841#discussion_r154490878
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -104,147 +105,153 @@ case class InsertIntoHiveTable(
         val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
         val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
     
    -    // By this time, the partition map must match the table's partition columns
    -    if (partitionColumnNames.toSet != partition.keySet) {
    -      throw new SparkException(
    -        s"""Requested partitioning does not match the ${table.identifier.table} table:
    -           |Requested partitions: ${partition.keys.mkString(",")}
    -           |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
    -    }
    -
    -    // Validate partition spec if there exist any dynamic partitions
    -    if (numDynamicPartitions > 0) {
    -      // Report error if dynamic partitioning is not enabled
    -      if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
    -        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
    +    def processInsert = {
    --- End diff --
    
    Generally, we do not like nested method.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84423/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    **[Test build #84423 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84423/testReport)** for PR 19841 at commit [`515b00a`](https://github.com/apache/spark/commit/515b00abf2d214d2a253ef3ccb213df670d64fd4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    Could we minimise the diff? It seems hard to follow to me too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by guoxiaolongzte <gi...@git.apache.org>.
Github user guoxiaolongzte commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    +1, avoid unpredictable exceptions that cause the temporary directory or file to be deleted.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    It's a very long method, and a try-finally around it is hard to follow. Can this wrapper live in the caller? is there another place to ensure something's cleaned up?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by zuotingbing <gi...@git.apache.org>.
Github user zuotingbing commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    Thanks srowen.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by zuotingbing <gi...@git.apache.org>.
Github user zuotingbing commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    It would be great if someone help to merge it to master, Thanks! @gatorsmile @srowen @HyukjinKwon @wangyum


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19841#discussion_r154575693
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -104,14 +105,61 @@ case class InsertIntoHiveTable(
         val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
         val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
     
    -    // By this time, the partition map must match the table's partition columns
    -    if (partitionColumnNames.toSet != partition.keySet) {
    -      throw new SparkException(
    -        s"""Requested partitioning does not match the ${table.identifier.table} table:
    -           |Requested partitions: ${partition.keys.mkString(",")}
    -           |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
    +    try {
    +      // By this time, the partition map must match the table's partition columns
    +      if (partitionColumnNames.toSet != partition.keySet) {
    +        throw new SparkException(
    +          s"""Requested partitioning does not match the ${table.identifier.table} table:
    +             |Requested partitions: ${partition.keys.mkString(",")}
    +             |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
    +      }
    +
    +      validatePartitionSpec(hadoopConf, numDynamicPartitions, numStaticPartitions,
    +        partitionSpec, partitionColumnNames)
    +
    +      validateBucketSpec(hadoopConf)
    +
    +      val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
    +        query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
    +          throw new AnalysisException(
    +            s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
    +        }.asInstanceOf[Attribute]
    +      }
    +
    --- End diff --
    
    ```Scala
      try {
    ```
    
    should start from this line, right?
    
    No need to create `validatePartitionSpec ` and `validateBucketSpec ` in this PR. We want to minimized the code changes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19841: [SPARK-22642][SQL] the createdTempDir will not be delete...

Posted by zuotingbing <gi...@git.apache.org>.
Github user zuotingbing commented on the issue:

    https://github.com/apache/spark/pull/19841
  
    please review it again, thanks all.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19841: [SPARK-22642][SQL] the createdTempDir will not be...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19841#discussion_r154490891
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---
    @@ -104,147 +105,153 @@ case class InsertIntoHiveTable(
         val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
         val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
     
    -    // By this time, the partition map must match the table's partition columns
    -    if (partitionColumnNames.toSet != partition.keySet) {
    -      throw new SparkException(
    -        s"""Requested partitioning does not match the ${table.identifier.table} table:
    -           |Requested partitions: ${partition.keys.mkString(",")}
    -           |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
    -    }
    -
    -    // Validate partition spec if there exist any dynamic partitions
    -    if (numDynamicPartitions > 0) {
    -      // Report error if dynamic partitioning is not enabled
    -      if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
    -        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
    +    def processInsert = {
    +      // By this time, the partition map must match the table's partition columns
    +      if (partitionColumnNames.toSet != partition.keySet) {
    +        throw new SparkException(
    +          s"""Requested partitioning does not match the ${table.identifier.table} table:
    +             |Requested partitions: ${partition.keys.mkString(",")}
    +             |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
           }
     
    -      // Report error if dynamic partition strict mode is on but no static partition is found
    -      if (numStaticPartitions == 0 &&
    -        hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
    -        throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
    -      }
    +      // Validate partition spec if there exist any dynamic partitions
    +      if (numDynamicPartitions > 0) {
    +        // Report error if dynamic partitioning is not enabled
    +        if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
    +          throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
    +        }
    +
    +        // Report error if dynamic partition strict mode is on but no static partition is found
    +        if (numStaticPartitions == 0 &&
    +          hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
    +          throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
    +        }
     
    -      // Report error if any static partition appears after a dynamic partition
    -      val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
    -      if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
    -        throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
    +        // Report error if any static partition appears after a dynamic partition
    +        val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
    +        if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
    +          throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
    +        }
           }
    -    }
     
    -    table.bucketSpec match {
    -      case Some(bucketSpec) =>
    -        // Writes to bucketed hive tables are allowed only if user does not care about maintaining
    -        // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
    -        // set to false
    -        val enforceBucketingConfig = "hive.enforce.bucketing"
    -        val enforceSortingConfig = "hive.enforce.sorting"
    +      table.bucketSpec match {
    +        case Some(bucketSpec) =>
    +          // Writes to bucketed hive tables are allowed only if user does not care about maintaining
    +          // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are
    +          // set to false
    +          val enforceBucketingConfig = "hive.enforce.bucketing"
    +          val enforceSortingConfig = "hive.enforce.sorting"
     
    -        val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
    -          "currently does NOT populate bucketed output which is compatible with Hive."
    +          val message = s"Output Hive table ${table.identifier} is bucketed but Spark" +
    +            "currently does NOT populate bucketed output which is compatible with Hive."
     
    -        if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
    -          hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
    -          throw new AnalysisException(message)
    -        } else {
    -          logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " +
    -            s"$enforceSortingConfig are set to false.")
    -        }
    -      case _ => // do nothing since table has no bucketing
    -    }
    +          if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
    +            hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
    +            throw new AnalysisException(message)
    +          } else {
    +            logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and "
    +              + s"$enforceSortingConfig are set to false.")
    +          }
    +        case _ => // do nothing since table has no bucketing
    +      }
     
    -    val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
    -      query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
    -        throw new AnalysisException(
    -          s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
    -      }.asInstanceOf[Attribute]
    -    }
    +      val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
    +        query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
    +          throw new AnalysisException(
    +            s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
    +        }.asInstanceOf[Attribute]
    +      }
     
    -    saveAsHiveFile(
    -      sparkSession = sparkSession,
    -      queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
    -      hadoopConf = hadoopConf,
    -      fileSinkConf = fileSinkConf,
    -      outputLocation = tmpLocation.toString,
    -      partitionAttributes = partitionAttributes)
    +      saveAsHiveFile(
    +        sparkSession = sparkSession,
    +        queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
    +        hadoopConf = hadoopConf,
    +        fileSinkConf = fileSinkConf,
    +        outputLocation = tmpLocation.toString,
    +        partitionAttributes = partitionAttributes)
     
    -    if (partition.nonEmpty) {
    -      if (numDynamicPartitions > 0) {
    -        externalCatalog.loadDynamicPartitions(
    -          db = table.database,
    -          table = table.identifier.table,
    -          tmpLocation.toString,
    -          partitionSpec,
    -          overwrite,
    -          numDynamicPartitions)
    -      } else {
    -        // scalastyle:off
    -        // ifNotExists is only valid with static partition, refer to
    -        // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
    -        // scalastyle:on
    -        val oldPart =
    +      if (partition.nonEmpty) {
    +        if (numDynamicPartitions > 0) {
    +          externalCatalog.loadDynamicPartitions(
    +            db = table.database,
    +            table = table.identifier.table,
    +            tmpLocation.toString,
    +            partitionSpec,
    +            overwrite,
    +            numDynamicPartitions)
    +        } else {
    +          // scalastyle:off
    +          // ifNotExists is only valid with static partition, refer to
    +          // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
    +          // scalastyle:on
    +          val oldPart =
               externalCatalog.getPartitionOption(
                 table.database,
                 table.identifier.table,
                 partitionSpec)
     
    -        var doHiveOverwrite = overwrite
    -
    -        if (oldPart.isEmpty || !ifPartitionNotExists) {
    -          // SPARK-18107: Insert overwrite runs much slower than hive-client.
    -          // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
    -          // version and we may not want to catch up new Hive version every time. We delete the
    -          // Hive partition first and then load data file into the Hive partition.
    -          if (oldPart.nonEmpty && overwrite) {
    -            oldPart.get.storage.locationUri.foreach { uri =>
    -              val partitionPath = new Path(uri)
    -              val fs = partitionPath.getFileSystem(hadoopConf)
    -              if (fs.exists(partitionPath)) {
    -                if (!fs.delete(partitionPath, true)) {
    -                  throw new RuntimeException(
    -                    "Cannot remove partition directory '" + partitionPath.toString)
    +          var doHiveOverwrite = overwrite
    +
    +          if (oldPart.isEmpty || !ifPartitionNotExists) {
    +            // SPARK-18107: Insert overwrite runs much slower than hive-client.
    +            // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
    +            // version and we may not want to catch up new Hive version every time. We delete the
    +            // Hive partition first and then load data file into the Hive partition.
    +            if (oldPart.nonEmpty && overwrite) {
    +              oldPart.get.storage.locationUri.foreach { uri =>
    +                val partitionPath = new Path(uri)
    +                val fs = partitionPath.getFileSystem(hadoopConf)
    +                if (fs.exists(partitionPath)) {
    +                  if (!fs.delete(partitionPath, true)) {
    +                    throw new RuntimeException(
    +                      "Cannot remove partition directory '" + partitionPath.toString)
    +                  }
    +                  // Don't let Hive do overwrite operation since it is slower.
    +                  doHiveOverwrite = false
                     }
    -                // Don't let Hive do overwrite operation since it is slower.
    -                doHiveOverwrite = false
                   }
                 }
    -          }
     
    -          // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
    -          // which is currently considered as a Hive native command.
    -          val inheritTableSpecs = true
    -          externalCatalog.loadPartition(
    -            table.database,
    -            table.identifier.table,
    -            tmpLocation.toString,
    -            partitionSpec,
    -            isOverwrite = doHiveOverwrite,
    -            inheritTableSpecs = inheritTableSpecs,
    -            isSrcLocal = false)
    +            // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
    +            // which is currently considered as a Hive native command.
    +            val inheritTableSpecs = true
    +            externalCatalog.loadPartition(
    +              table.database,
    +              table.identifier.table,
    +              tmpLocation.toString,
    +              partitionSpec,
    +              isOverwrite = doHiveOverwrite,
    +              inheritTableSpecs = inheritTableSpecs,
    +              isSrcLocal = false)
    +          }
             }
    +      } else {
    +        externalCatalog.loadTable(
    +          table.database,
    +          table.identifier.table,
    +          tmpLocation.toString, // TODO: URI
    +          overwrite,
    +          isSrcLocal = false)
           }
    -    } else {
    -      externalCatalog.loadTable(
    -        table.database,
    -        table.identifier.table,
    -        tmpLocation.toString, // TODO: URI
    -        overwrite,
    -        isSrcLocal = false)
         }
     
    -    // Attempt to delete the staging directory and the inclusive files. If failed, the files are
    -    // expected to be dropped at the normal termination of VM since deleteOnExit is used.
    -    deleteExternalTmpPath(hadoopConf)
    +    try {
    +      processInsert
    --- End diff --
    
    Please create a separate function from line 165 to line 235. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org