You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/04/25 06:12:00 UTC

[jira] [Commented] (SPARK-27526) Driver OOM error occurs while writing parquet file with Append mode

    [ https://issues.apache.org/jira/browse/SPARK-27526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825764#comment-16825764 ] 

Hyukjin Kwon commented on SPARK-27526:
--------------------------------------

Please avoid to set target version which is usually reserved for committers.

> Driver OOM error occurs while writing parquet file with Append mode
> -------------------------------------------------------------------
>
>                 Key: SPARK-27526
>                 URL: https://issues.apache.org/jira/browse/SPARK-27526
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, SQL
>    Affects Versions: 2.1.1
>         Environment: centos6.7
>            Reporter: senyoung
>            Priority: Major
>              Labels: oom
>
> As this user code below
> {code:java}
> someDataFrame.write
> .mode(SaveMode.Append)
> .partitionBy(somePartitionKeySeqs)
> .parquet(targetPath);
> {code}
> When spark try to write parquet files into hdfs with the SaveMode.Append mode,it must check the existing Partition Columns 
>  would match the "existed files" ,how ever,this behevior would cache all leaf fileInfos under the "targetPath";
>  This can easily trigger oom when there are too many files in the targetPath;
>  This behevior is useful when someone needs the exactly correctness ,but i think it should be optional to avoid the oom;
> The linked code be here
> {code:java}
> //package org.apache.spark.sql.execution.datasources
> //case class DataSource
> private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
> ...
> /**
> */can we make it optional?
> */
> if (mode == SaveMode.Append) {
>   val existingPartitionColumns = Try {
>   /**
> * getOrInferFileFormatSchema(format, justPartitioning = true),
> * this method may cause oom when there be too many files,could we just sample limited files rather than all existed files ?
> */
>     getOrInferFileFormatSchema(format, justPartitioning = true)
> ._2.fieldNames.toList
>   }.getOrElse(Seq.empty[String])
>   val sameColumns =
>     existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
>   if (existingPartitionColumns.nonEmpty && !sameColumns) {
>     throw new AnalysisException(
>       s"""Requested partitioning does not match existing partitioning.
>          |Existing partitioning columns:
>          |  ${existingPartitionColumns.mkString(", ")}
>          |Requested partitioning columns:
>          |  ${partitionColumns.mkString(", ")}
>          |""".stripMargin)
>   }
> }
> ...
> }
> private def getOrInferFileFormatSchema(
>     format: FileFormat,
>     justPartitioning: Boolean = false): (StructType, StructType) = {
>   lazy val tempFileIndex = {
>     val allPaths = caseInsensitiveOptions.get("path") ++ paths
>     val hadoopConf = sparkSession.sessionState.newHadoopConf()
>     val globbedPaths = allPaths.toSeq.flatMap { path =>
>       val hdfsPath = new Path(path)
>       val fs = hdfsPath.getFileSystem(hadoopConf)
>       val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
>       SparkHadoopUtil.get.globPathIfNecessary(qualified)
>     }.toArray
>    /**
>     * InMemoryFileIndex.refresh0() cache all files info ,oom risks
>    */ 
>     new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
>   }
> ...
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org