You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/29 03:46:41 UTC

[GitHub] [spark] ulysses-you opened a new pull request, #39277: [SPARK-41708][SQL] Pull v1write information to write file node

ulysses-you opened a new pull request, #39277:
URL: https://github.com/apache/spark/pull/39277

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This pr aims to pull out the v1write information from `V1WriteCommand` to `WriteFiles`:
   ```scala
   case class WriteFiles(child: LogicalPlan)
   
   =>
   
   case class WriteFiles(
       child: LogicalPlan,
       fileFormat: FileFormat,
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
       options: Map[String, String],
       requiredOrdering: Seq[SortOrder])
   ```
   
   Also, this pr do a cleanup for `WriteSpec` which is unnecessary.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   After this pr, `WriteFiles` will hold write information that can help developers
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   no
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Pass CI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059221668


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala:
##########
@@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU
     }
   }
 
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      scratchDir: String): Path = {
-    val extURI: URI = path.toUri
-    val scratchPath = new Path(scratchDir, executionId)
-    var dirPath = new Path(
-      extURI.getScheme,
-      extURI.getAuthority,
-      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
-    try {
-      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
-      dirPath = new Path(fs.makeQualified(dirPath).toString())
-
-      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
-      }
-      createdTempDir = Some(dirPath)
-      fs.deleteOnExit(dirPath)
-    } catch {
-      case e: IOException =>
-        throw QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e)
-    }
-    dirPath
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
-  private def newVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val extURI: URI = path.toUri
-    if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path, hadoopConf, stagingDir)
-    } else {
-      new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
-    }
-  }
-
-  private def getExtTmpPathRelTo(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
-  }
-
-  private def getExternalScratchDir(
-      extURI: URI,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    getStagingDir(
-      new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
-      hadoopConf,
-      stagingDir)
-  }
-
-  private[hive] def getStagingDir(
-      inputPath: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val inputPathName: String = inputPath.toString
-    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
-    var stagingPathName: String =
-      if (inputPathName.indexOf(stagingDir) == -1) {
-        new Path(inputPathName, stagingDir).toString
-      } else {
-        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
-      }
-
-    // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
-    // staging directory needs to avoid being deleted when users set hive.exec.stagingdir
-    // under the table directory.
-    if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
-      !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
-      logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
-        "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
-        "directory.")
-      stagingPathName = new Path(inputPathName, ".hive-staging").toString
-    }
-
-    val dir: Path =
-      fs.makeQualified(
-        new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
-    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+  protected def createExternalTmpPath(dir: Path, hadoopConf: Configuration): Unit = {
+    val fs: FileSystem = dir.getFileSystem(hadoopConf)
     try {
       if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
         throw new IllegalStateException("Cannot create staging directory  '" + dir.toString + "'")
       }
-      createdTempDir = Some(dir)

Review Comment:
   The global variable `createdTempDir` is  really hack. Since we have specified staging dir, we can pass it to the method `deleteExternalTmpPath`, then we do not it anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on PR #39277:
URL: https://github.com/apache/spark/pull/39277#issuecomment-1367695695

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062424040


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##########
@@ -105,4 +227,33 @@ trait V1WritesHiveUtils {
       .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true"))
       .getOrElse(Map.empty)
   }
+
+  def setupCompression(

Review Comment:
   I think `setupHadoopConfForCompression` is more accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1061624700


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##########
@@ -105,4 +112,164 @@ trait V1WritesHiveUtils {
       .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true"))
       .getOrElse(Map.empty)
   }
+
+  def setupCompression(
+      fileSinkConf: FileSinkDesc,
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Unit = {
+    val isCompressed =
+      fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match {
+        case formatName if formatName.endsWith("orcoutputformat") =>
+          // For ORC,"mapreduce.output.fileoutputformat.compress",
+          // "mapreduce.output.fileoutputformat.compress.codec", and
+          // "mapreduce.output.fileoutputformat.compress.type"
+          // have no impact because it uses table properties to store compression information.
+          false
+        case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean
+      }
+
+    if (isCompressed) {
+      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+      fileSinkConf.setCompressed(true)
+      fileSinkConf.setCompressCodec(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.codec"))
+      fileSinkConf.setCompressType(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.type"))
+    } else {
+      // Set compression by priority
+      HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf)
+        .foreach { case (compression, codec) => hadoopConf.set(compression, codec) }
+    }
+  }
+
+  /**
+   * Return two paths:
+   * 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath`
+   * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-10000`
+   * The call side should create `stagingDir` before using `externalTmpPath` and
+   * delete `stagingDir` at the end.

Review Comment:
   Instead of adding a lot of comments to explain it, let's create a wrapper class
   
   ```
   class HiveTableTempPath(session: SparkSession, conf: HadoopConf, path: Path) {
     ...
     def stagingDir: Path = ...
     def externalTempPath: Path = ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062052748


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##########
@@ -105,4 +112,164 @@ trait V1WritesHiveUtils {
       .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true"))
       .getOrElse(Map.empty)
   }
+
+  def setupCompression(
+      fileSinkConf: FileSinkDesc,
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Unit = {
+    val isCompressed =
+      fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match {
+        case formatName if formatName.endsWith("orcoutputformat") =>
+          // For ORC,"mapreduce.output.fileoutputformat.compress",
+          // "mapreduce.output.fileoutputformat.compress.codec", and
+          // "mapreduce.output.fileoutputformat.compress.type"
+          // have no impact because it uses table properties to store compression information.
+          false
+        case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean
+      }
+
+    if (isCompressed) {
+      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+      fileSinkConf.setCompressed(true)
+      fileSinkConf.setCompressCodec(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.codec"))
+      fileSinkConf.setCompressType(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.type"))
+    } else {
+      // Set compression by priority
+      HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf)
+        .foreach { case (compression, codec) => hadoopConf.set(compression, codec) }
+    }
+  }
+
+  /**
+   * Return two paths:
+   * 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath`
+   * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-10000`
+   * The call side should create `stagingDir` before using `externalTmpPath` and
+   * delete `stagingDir` at the end.

Review Comment:
   wrapped using `HiveTempPath` since it would be used by `InsertIntoHiveDirCommand`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1061619079


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala:
##########
@@ -53,13 +59,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode {
 /**
  * Responsible for writing files.
  */
-case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode {
+case class WriteFilesExec(
+    child: SparkPlan,
+    fileFormat: FileFormat,
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    staticPartitions: TablePartitionSpec) extends UnaryExecNode {
   override def output: Seq[Attribute] = Seq.empty
 
-  override protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = {
-    assert(writeSpec.isInstanceOf[WriteFilesSpec])
-    val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec]
-
+  override protected def doExecuteWrite(
+      writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {

Review Comment:
   should `WriteFilesSpec` include less information as some information are already available in `WriteFilesExec`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062422156


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##########
@@ -17,21 +17,143 @@
 
 package org.apache.spark.sql.hive.execution
 
-import java.util.Locale
+import java.net.URI
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale, Random}
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.ql.ErrorMsg
-import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.ql.exec.TaskRunner
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
 
 import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.BucketingUtils
-import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion}
+
+class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: Path)

Review Comment:
   can we move it to a new file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1064238626


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -294,3 +285,40 @@ case class InsertIntoHiveTable(
   override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoHiveTable =
     copy(query = newChild)
 }
+
+object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {

Review Comment:
   oh, I missed to clean up it. Will remove it when I touch the related code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059239578


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala:
##########
@@ -38,13 +38,18 @@ case class WriteFilesSpec(
     description: WriteJobDescription,
     committer: FileCommitProtocol,
     concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
-  extends WriteSpec
 
 /**
  * During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between [[V1WriteCommand]] and query.
  * [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
  */
-case class WriteFiles(child: LogicalPlan) extends UnaryNode {
+case class WriteFiles(
+    child: LogicalPlan,
+    fileFormat: FileFormat,
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    requiredOrdering: Seq[SortOrder]) extends UnaryNode {

Review Comment:
   This doesn't seem like a logical write information, but more of internal information. Do we really need it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059221992


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -92,29 +95,13 @@ case class InsertIntoHiveTable(
    */
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     val externalCatalog = sparkSession.sharedState.externalCatalog
-    val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
-    val hiveQlTable = HiveClientImpl.toHiveTable(table)
-    // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
-    // instances within the closure, since Serializer is not serializable while TableDesc is.
-    val tableDesc = new TableDesc(
-      hiveQlTable.getInputFormatClass,
-      // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
-      // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
-      // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
-      // HiveSequenceFileOutputFormat.
-      hiveQlTable.getOutputFormatClass,
-      hiveQlTable.getMetadata
-    )
-    val tableLocation = hiveQlTable.getDataLocation
-    val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
-
+    createExternalTmpPath(stagingDir, hadoopConf)
     try {
-      processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child)
+      processInsert(sparkSession, externalCatalog, child)

Review Comment:
   now the code looks like:
   
   ```scala
   create stagingDir
   try {
     processInsert
   } finally {
     delete stagingDir
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059246404


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -73,16 +74,18 @@ case class InsertIntoHiveTable(
     query: LogicalPlan,
     overwrite: Boolean,
     ifPartitionNotExists: Boolean,
-    outputColumnNames: Seq[String]
+    outputColumnNames: Seq[String],
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    fileFormat: FileFormat,
+    externalTmpPath: String,
+    @transient stagingDir: Path,

Review Comment:
   for old hive version, externalTmpPath and stagingDir are the same.
   https://github.com/apache/spark/blob/a3c837ae2eaf2c7ba08563b7afa0f96df8a4e80b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala#L129-L136
   
   
   for new hive version:
   https://github.com/apache/spark/blob/a3c837ae2eaf2c7ba08563b7afa0f96df8a4e80b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala#L189-L197
   
   - externalTmpPath: `new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")`
   - stagingDir: `getExternalScratchDir(extURI, hadoopConf, stagingDir)`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059240434


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##########
@@ -105,4 +112,164 @@ trait V1WritesHiveUtils {
       .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true"))
       .getOrElse(Map.empty)
   }
+
+  def setupCompression(
+      fileSinkConf: FileSinkDesc,
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Unit = {
+    val isCompressed =
+      fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match {
+        case formatName if formatName.endsWith("orcoutputformat") =>
+          // For ORC,"mapreduce.output.fileoutputformat.compress",
+          // "mapreduce.output.fileoutputformat.compress.codec", and
+          // "mapreduce.output.fileoutputformat.compress.type"
+          // have no impact because it uses table properties to store compression information.
+          false
+        case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean
+      }
+
+    if (isCompressed) {
+      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+      fileSinkConf.setCompressed(true)
+      fileSinkConf.setCompressCodec(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.codec"))
+      fileSinkConf.setCompressType(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.type"))
+    } else {
+      // Set compression by priority
+      HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf)
+        .foreach { case (compression, codec) => hadoopConf.set(compression, codec) }
+    }
+  }
+
+  /**
+   * Return two paths:
+   * 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath`
+   * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-10000`

Review Comment:
   Does Hadoop `Path` provide API to get parent? If it does then we don't need to return 2 paths



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
URL: https://github.com/apache/spark/pull/39277


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059221668


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala:
##########
@@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU
     }
   }
 
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      scratchDir: String): Path = {
-    val extURI: URI = path.toUri
-    val scratchPath = new Path(scratchDir, executionId)
-    var dirPath = new Path(
-      extURI.getScheme,
-      extURI.getAuthority,
-      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
-    try {
-      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
-      dirPath = new Path(fs.makeQualified(dirPath).toString())
-
-      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
-      }
-      createdTempDir = Some(dirPath)
-      fs.deleteOnExit(dirPath)
-    } catch {
-      case e: IOException =>
-        throw QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e)
-    }
-    dirPath
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
-  private def newVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val extURI: URI = path.toUri
-    if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path, hadoopConf, stagingDir)
-    } else {
-      new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
-    }
-  }
-
-  private def getExtTmpPathRelTo(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
-  }
-
-  private def getExternalScratchDir(
-      extURI: URI,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    getStagingDir(
-      new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
-      hadoopConf,
-      stagingDir)
-  }
-
-  private[hive] def getStagingDir(
-      inputPath: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val inputPathName: String = inputPath.toString
-    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
-    var stagingPathName: String =
-      if (inputPathName.indexOf(stagingDir) == -1) {
-        new Path(inputPathName, stagingDir).toString
-      } else {
-        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
-      }
-
-    // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
-    // staging directory needs to avoid being deleted when users set hive.exec.stagingdir
-    // under the table directory.
-    if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
-      !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
-      logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
-        "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
-        "directory.")
-      stagingPathName = new Path(inputPathName, ".hive-staging").toString
-    }
-
-    val dir: Path =
-      fs.makeQualified(
-        new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
-    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+  protected def createExternalTmpPath(dir: Path, hadoopConf: Configuration): Unit = {
+    val fs: FileSystem = dir.getFileSystem(hadoopConf)
     try {
       if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
         throw new IllegalStateException("Cannot create staging directory  '" + dir.toString + "'")
       }
-      createdTempDir = Some(dir)

Review Comment:
   The global variable `createdTempDir` is  really hack. Since we have specified staging dir, we can pass it to the method `deleteExternalTmpPath`, then we do not need it anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059273228


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala:
##########
@@ -38,13 +38,18 @@ case class WriteFilesSpec(
     description: WriteJobDescription,
     committer: FileCommitProtocol,
     concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
-  extends WriteSpec
 
 /**
  * During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between [[V1WriteCommand]] and query.
  * [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
  */
-case class WriteFiles(child: LogicalPlan) extends UnaryNode {
+case class WriteFiles(
+    child: LogicalPlan,
+    fileFormat: FileFormat,
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    requiredOrdering: Seq[SortOrder]) extends UnaryNode {

Review Comment:
   SGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062421275


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -73,16 +75,20 @@ case class InsertIntoHiveTable(
     query: LogicalPlan,
     overwrite: Boolean,
     ifPartitionNotExists: Boolean,
-    outputColumnNames: Seq[String]
+    outputColumnNames: Seq[String],
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    fileFormat: FileFormat,
+    @transient externalTmpPath: HiveTempPath

Review Comment:
   ```suggestion
       @transient hiveTmpPath: HiveTempPath
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #39277:
URL: https://github.com/apache/spark/pull/39277#issuecomment-1373121273

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062552806


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -92,29 +98,17 @@ case class InsertIntoHiveTable(
    */
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     val externalCatalog = sparkSession.sharedState.externalCatalog
-    val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
-    val hiveQlTable = HiveClientImpl.toHiveTable(table)
-    // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
-    // instances within the closure, since Serializer is not serializable while TableDesc is.
-    val tableDesc = new TableDesc(
-      hiveQlTable.getInputFormatClass,
-      // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
-      // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
-      // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
-      // HiveSequenceFileOutputFormat.
-      hiveQlTable.getOutputFormatClass,
-      hiveQlTable.getMetadata
-    )
-    val tableLocation = hiveQlTable.getDataLocation
-    val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
+    val hadoopConf = externalTmpPath.hadoopConf
+    val stagingDir = externalTmpPath.stagingDir
+    val tmpLocation = externalTmpPath.externalTempPath
 
+    createExternalTmpPath(stagingDir, hadoopConf)
     try {
-      processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child)
+      processInsert(sparkSession, externalCatalog, hadoopConf, tmpLocation, child)
     } finally {
       // Attempt to delete the staging directory and the inclusive files. If failed, the files are
       // expected to be dropped at the normal termination of VM since deleteOnExit is used.
-      deleteExternalTmpPath(hadoopConf)
+      deleteExternalTmpPath(stagingDir, hadoopConf)

Review Comment:
   addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059221271


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##########
@@ -105,4 +112,164 @@ trait V1WritesHiveUtils {
       .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true"))
       .getOrElse(Map.empty)
   }
+
+  def setupCompression(
+      fileSinkConf: FileSinkDesc,
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Unit = {
+    val isCompressed =
+      fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match {
+        case formatName if formatName.endsWith("orcoutputformat") =>
+          // For ORC,"mapreduce.output.fileoutputformat.compress",
+          // "mapreduce.output.fileoutputformat.compress.codec", and
+          // "mapreduce.output.fileoutputformat.compress.type"
+          // have no impact because it uses table properties to store compression information.
+          false
+        case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean
+      }
+
+    if (isCompressed) {
+      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+      fileSinkConf.setCompressed(true)
+      fileSinkConf.setCompressCodec(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.codec"))
+      fileSinkConf.setCompressType(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.type"))
+    } else {
+      // Set compression by priority
+      HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf)
+        .foreach { case (compression, codec) => hadoopConf.set(compression, codec) }
+    }
+  }
+
+  /**
+   * Return two paths:
+   * 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath`
+   * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-10000`
+   * The call side should create `stagingDir` before using `externalTmpPath` and
+   * delete `stagingDir` at the end.
+   */
+  protected def getExternalTmpPath(

Review Comment:
   This is the key change for hive insertion. Before this method has a side effect of creating the stagingDir. Now, this method return two paths, one is staging dir for creating and the other is the original externalTmpPath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059245083


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##########
@@ -105,4 +112,164 @@ trait V1WritesHiveUtils {
       .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true"))
       .getOrElse(Map.empty)
   }
+
+  def setupCompression(
+      fileSinkConf: FileSinkDesc,
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Unit = {
+    val isCompressed =
+      fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match {
+        case formatName if formatName.endsWith("orcoutputformat") =>
+          // For ORC,"mapreduce.output.fileoutputformat.compress",
+          // "mapreduce.output.fileoutputformat.compress.codec", and
+          // "mapreduce.output.fileoutputformat.compress.type"
+          // have no impact because it uses table properties to store compression information.
+          false
+        case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean
+      }
+
+    if (isCompressed) {
+      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+      fileSinkConf.setCompressed(true)
+      fileSinkConf.setCompressCodec(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.codec"))
+      fileSinkConf.setCompressType(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.type"))
+    } else {
+      // Set compression by priority
+      HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf)
+        .foreach { case (compression, codec) => hadoopConf.set(compression, codec) }
+    }
+  }
+
+  /**
+   * Return two paths:
+   * 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath`
+   * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-10000`

Review Comment:
   it not only can be the parent. for old version hive, they are the same. So if we want reduce one path, we should check the hive version again before using it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059240180


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala:
##########
@@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU
     }
   }
 
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(

Review Comment:
   are these code gone?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059246929


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala:
##########
@@ -38,13 +38,18 @@ case class WriteFilesSpec(
     description: WriteJobDescription,
     committer: FileCommitProtocol,
     concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
-  extends WriteSpec
 
 /**
  * During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between [[V1WriteCommand]] and query.
  * [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
  */
-case class WriteFiles(child: LogicalPlan) extends UnaryNode {
+case class WriteFiles(
+    child: LogicalPlan,
+    fileFormat: FileFormat,
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    requiredOrdering: Seq[SortOrder]) extends UnaryNode {

Review Comment:
   how about pull out `partitionSpec` instead ? `partitionColumns` does not contain the information of the insertion partition spec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059244731


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala:
##########
@@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU
     }
   }
 
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(

Review Comment:
   these code are moved to `V1WritesHiveUtils`, so object InsertIntoHiveTable can use them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059239858


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -73,16 +74,18 @@ case class InsertIntoHiveTable(
     query: LogicalPlan,
     overwrite: Boolean,
     ifPartitionNotExists: Boolean,
-    outputColumnNames: Seq[String]
+    outputColumnNames: Seq[String],
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    fileFormat: FileFormat,
+    externalTmpPath: String,
+    @transient stagingDir: Path,

Review Comment:
   what's the difference between `externalTmpPath` and `stagingDir`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1061620498


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -294,3 +282,40 @@ case class InsertIntoHiveTable(
   override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoHiveTable =
     copy(query = newChild)
 }
+object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {

Review Comment:
   ```suggestion
   
   object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1063415311


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -294,3 +285,40 @@ case class InsertIntoHiveTable(
   override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoHiveTable =
     copy(query = newChild)
 }
+
+object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {

Review Comment:
   what do we log inside this object?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062425644


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -92,29 +98,17 @@ case class InsertIntoHiveTable(
    */
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     val externalCatalog = sparkSession.sharedState.externalCatalog
-    val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
-    val hiveQlTable = HiveClientImpl.toHiveTable(table)
-    // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
-    // instances within the closure, since Serializer is not serializable while TableDesc is.
-    val tableDesc = new TableDesc(
-      hiveQlTable.getInputFormatClass,
-      // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
-      // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
-      // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
-      // HiveSequenceFileOutputFormat.
-      hiveQlTable.getOutputFormatClass,
-      hiveQlTable.getMetadata
-    )
-    val tableLocation = hiveQlTable.getDataLocation
-    val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
+    val hadoopConf = externalTmpPath.hadoopConf
+    val stagingDir = externalTmpPath.stagingDir
+    val tmpLocation = externalTmpPath.externalTempPath
 
+    createExternalTmpPath(stagingDir, hadoopConf)
     try {
-      processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child)
+      processInsert(sparkSession, externalCatalog, hadoopConf, tmpLocation, child)
     } finally {
       // Attempt to delete the staging directory and the inclusive files. If failed, the files are
       // expected to be dropped at the normal termination of VM since deleteOnExit is used.
-      deleteExternalTmpPath(hadoopConf)
+      deleteExternalTmpPath(stagingDir, hadoopConf)

Review Comment:
   Can we add `def createTempPath()` and `def deleteTempPath()` in `HiveTempPath`? Then we don't even need to expose the `stagingDir`, which makes the interface cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1061620498


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##########
@@ -294,3 +282,40 @@ case class InsertIntoHiveTable(
   override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoHiveTable =
     copy(query = newChild)
 }
+object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {

Review Comment:
   ```suggestion
   
   object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062020277


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala:
##########
@@ -53,13 +59,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode {
 /**
  * Responsible for writing files.
  */
-case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode {
+case class WriteFilesExec(
+    child: SparkPlan,
+    fileFormat: FileFormat,
+    partitionColumns: Seq[Attribute],
+    bucketSpec: Option[BucketSpec],
+    options: Map[String, String],
+    staticPartitions: TablePartitionSpec) extends UnaryExecNode {
   override def output: Seq[Attribute] = Seq.empty
 
-  override protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = {
-    assert(writeSpec.isInstanceOf[WriteFilesSpec])
-    val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec]
-
+  override protected def doExecuteWrite(
+      writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {

Review Comment:
   Seems it's a bit hard. look at the current information:
   ```scala
   case class WriteFilesSpec(
       description: WriteJobDescription,
       committer: FileCommitProtocol,
       concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
   ```
   - `ConcurrentOutputWriterSpec` and `FileCommitProtocol` contain the output spec so we can not replace them
   - `WriteJobDescription` contains many information which includes what we pull out, but if we want to reduce something inside `WriteJobDescription`, we need to create a new class to hold others. I'm not sure it's worth to do that.
   
   ```scala
   class WriteJobDescription(
       val uuid: String,
       val serializableHadoopConf: SerializableConfiguration,
       val outputWriterFactory: OutputWriterFactory,
       val allColumns: Seq[Attribute],
       val dataColumns: Seq[Attribute],
       val partitionColumns: Seq[Attribute],
       val bucketSpec: Option[WriterBucketSpec],
       val path: String,
       val customPartitionLocations: Map[TablePartitionSpec, String],
       val maxRecordsPerFile: Long,
       val timeZoneId: String,
       val statsTrackers: Seq[WriteJobStatsTracker])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062551972


##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+import java.net.URI
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale, Random}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.exec.TaskRunner
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.client.HiveVersion
+
+class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: Path)
+  extends Logging {
+  private var stagingDirForCreating: Option[Path] = None
+
+  lazy val externalTempPath: Path = getExternalTmpPath(path)
+
+  private def getExternalTmpPath(path: Path): Path = {
+    import org.apache.spark.sql.hive.client.hive._
+
+    // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
+    // a common scratch directory. After the writing is finished, Hive will simply empty the table
+    // directory and move the staging directory to it.
+    // After Hive 1.1, Hive will create the staging directory under the table directory, and when
+    // moving staging directory to table directory, Hive will still empty the table directory, but
+    // will exclude the staging directory there.
+    // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
+    // staging directory under the table director for Hive prior to 1.1, the staging directory will
+    // be removed by Hive when Hive is trying to empty the table directory.
+    val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
+    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
+      Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
+
+    // Ensure all the supported versions are considered here.
+    assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
+      allSupportedHiveVersions)
+
+    val externalCatalog = session.sharedState.externalCatalog
+    val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
+    val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+    val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+
+    if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
+      oldVersionExternalTempPath(path, scratchDir)
+    } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
+      newVersionExternalTempPath(path, stagingDir)
+    } else {
+      throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
+    }
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
+  private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path = {
+    val extURI: URI = path.toUri
+    val scratchPath = new Path(scratchDir, executionId)
+    var dirPath = new Path(
+      extURI.getScheme,
+      extURI.getAuthority,
+      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+
+    val fs = dirPath.getFileSystem(hadoopConf)
+    dirPath = new Path(fs.makeQualified(dirPath).toString())
+    stagingDirForCreating = Some(dirPath)
+    dirPath
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
+  private def newVersionExternalTempPath(path: Path, stagingDir: String): Path = {
+    val extURI: URI = path.toUri
+    if (extURI.getScheme == "viewfs") {
+      val qualifiedStagingDir = getStagingDir(path, stagingDir)
+      stagingDirForCreating = Some(qualifiedStagingDir)
+      // Hive uses 10000
+      new Path(qualifiedStagingDir, "-ext-10000")
+    } else {
+      val qualifiedStagingDir = getExternalScratchDir(extURI, stagingDir)
+      stagingDirForCreating = Some(qualifiedStagingDir)
+      new Path(qualifiedStagingDir, "-ext-10000")
+    }
+  }
+
+  private def getExternalScratchDir(extURI: URI, stagingDir: String): Path = {
+    getStagingDir(
+      new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
+      stagingDir)
+  }
+
+  private[hive] def getStagingDir(inputPath: Path, stagingDir: String): Path = {
+    val inputPathName: String = inputPath.toString
+    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
+    var stagingPathName: String =
+      if (inputPathName.indexOf(stagingDir) == -1) {
+        new Path(inputPathName, stagingDir).toString
+      } else {
+        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
+      }
+
+    // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
+    // staging directory needs to avoid being deleted when users set hive.exec.stagingdir
+    // under the table directory.
+    if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
+      !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
+      logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
+        "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
+        "directory.")
+      stagingPathName = new Path(inputPathName, ".hive-staging").toString
+    }
+
+    val dir: Path =
+      fs.makeQualified(
+        new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
+    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+    dir
+  }
+
+  // HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's FileUtils.isSubDir().
+  private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = {
+    val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR
+    val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR
+    path1.startsWith(path2)
+  }
+
+  private def executionId: String = {
+    val rand: Random = new Random
+    val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
+    "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
+  }
+
+  def deleteTmpPath() : Unit = {
+    // Attempt to delete the staging directory and the inclusive files. If failed, the files are
+    // expected to be dropped at the normal termination of VM since deleteOnExit is used.
+    try {
+      stagingDirForCreating.foreach { stagingDir =>
+        val fs = stagingDir.getFileSystem(hadoopConf)
+        if (fs.delete(stagingDir, true)) {
+          // If we successfully delete the staging directory, remove it from FileSystem's cache.
+          fs.cancelDeleteOnExit(stagingDir)
+        }
+      }
+    } catch {
+      case NonFatal(e) =>
+        val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+    }
+  }
+
+  def createTmpPath(): Unit = {
+    try {
+      stagingDirForCreating.foreach { stagingDir =>
+        val fs: FileSystem = stagingDir.getFileSystem(hadoopConf)
+        if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) {
+          throw new IllegalStateException(
+            "Cannot create staging directory  '" + stagingDir.toString + "'")
+        }
+        fs.deleteOnExit(stagingDir)
+      }
+    } catch {
+      case e: IOException =>
+        throw QueryExecutionErrors.cannotCreateStagingDirError(
+          s"'${stagingDirForCreating.toString}': ${e.getMessage}", e)
+    }
+  }
+
+  def deleteIfNotStagingDir(path: Path, fs: FileSystem): Unit = {
+    if (Option(path) != stagingDirForCreating) fs.delete(path, true)

Review Comment:
   one more method for `InsertIntoHiveDirCommand` so we can hide staging dir.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org