You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/04/25 06:12:00 UTC
[jira] [Commented] (SPARK-27526) Driver OOM error occurs while
writing parquet file with Append mode
[ https://issues.apache.org/jira/browse/SPARK-27526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825764#comment-16825764 ]
Hyukjin Kwon commented on SPARK-27526:
--------------------------------------
Please avoid to set target version which is usually reserved for committers.
> Driver OOM error occurs while writing parquet file with Append mode
> -------------------------------------------------------------------
>
> Key: SPARK-27526
> URL: https://issues.apache.org/jira/browse/SPARK-27526
> Project: Spark
> Issue Type: Bug
> Components: Input/Output, SQL
> Affects Versions: 2.1.1
> Environment: centos6.7
> Reporter: senyoung
> Priority: Major
> Labels: oom
>
> As this user code below
> {code:java}
> someDataFrame.write
> .mode(SaveMode.Append)
> .partitionBy(somePartitionKeySeqs)
> .parquet(targetPath);
> {code}
> When spark try to write parquet files into hdfs with the SaveMode.Append mode,it must check the existing Partition Columns
> would match the "existed files" ,how ever,this behevior would cache all leaf fileInfos under the "targetPath";
> This can easily trigger oom when there are too many files in the targetPath;
> This behevior is useful when someone needs the exactly correctness ,but i think it should be optional to avoid the oom;
> The linked code be here
> {code:java}
> //package org.apache.spark.sql.execution.datasources
> //case class DataSource
> private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
> ...
> /**
> */can we make it optional?
> */
> if (mode == SaveMode.Append) {
> val existingPartitionColumns = Try {
> /**
> * getOrInferFileFormatSchema(format, justPartitioning = true),
> * this method may cause oom when there be too many files,could we just sample limited files rather than all existed files ?
> */
> getOrInferFileFormatSchema(format, justPartitioning = true)
> ._2.fieldNames.toList
> }.getOrElse(Seq.empty[String])
> val sameColumns =
> existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
> if (existingPartitionColumns.nonEmpty && !sameColumns) {
> throw new AnalysisException(
> s"""Requested partitioning does not match existing partitioning.
> |Existing partitioning columns:
> | ${existingPartitionColumns.mkString(", ")}
> |Requested partitioning columns:
> | ${partitionColumns.mkString(", ")}
> |""".stripMargin)
> }
> }
> ...
> }
> private def getOrInferFileFormatSchema(
> format: FileFormat,
> justPartitioning: Boolean = false): (StructType, StructType) = {
> lazy val tempFileIndex = {
> val allPaths = caseInsensitiveOptions.get("path") ++ paths
> val hadoopConf = sparkSession.sessionState.newHadoopConf()
> val globbedPaths = allPaths.toSeq.flatMap { path =>
> val hdfsPath = new Path(path)
> val fs = hdfsPath.getFileSystem(hadoopConf)
> val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
> SparkHadoopUtil.get.globPathIfNecessary(qualified)
> }.toArray
> /**
> * InMemoryFileIndex.refresh0() cache all files info ,oom risks
> */
> new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
> }
> ...
> }
> {code}
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org