You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "senyoung (JIRA)" <ji...@apache.org> on 2019/04/20 10:43:00 UTC

[jira] [Created] (SPARK-27526) Driver OOM error occurs while writing parquet file with Append mode

senyoung created SPARK-27526:
--------------------------------

             Summary: Driver OOM error occurs while writing parquet file with Append mode
                 Key: SPARK-27526
                 URL: https://issues.apache.org/jira/browse/SPARK-27526
             Project: Spark
          Issue Type: Bug
          Components: Input/Output, SQL
    Affects Versions: 2.1.1
         Environment: centos6.7
            Reporter: senyoung


As this user code below
{code:java}
someDataFrame.write
.mode(SaveMode.Append)
.partitionBy(somePartitionKeySeqs)
.parquet(targetPath);
{code}
When spark try to write parquet files into hdfs with the SaveMode.Append mode,it must check the existing Partition Columns 
 would match the "existed files" ,how ever,this behevior would cache all leaf fileInfos under the "targetPath";
 This can easily trigger oom when there are too many files in the targetPath;
 This behevior is useful when someone needs the exactly correctness ,but i think it should be optional to avoid the oom;

The linked code be here
{code:java}
//package org.apache.spark.sql.execution.datasources
//case class DataSource

private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
...
if (mode == SaveMode.Append) {//can we make it optional? 
  val existingPartitionColumns = Try {
  /**
* getOrInferFileFormatSchema(format, justPartitioning = true),
* this method may cause oom when there be too many files,could we just sample limited files 
* rather than all existed files ?
*/
    getOrInferFileFormatSchema(format, justPartitioning = true)
._2.fieldNames.toList
  }.getOrElse(Seq.empty[String])
  // TODO: Case sensitivity.
  val sameColumns =
    existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
  if (existingPartitionColumns.nonEmpty && !sameColumns) {
    throw new AnalysisException(
      s"""Requested partitioning does not match existing partitioning.
         |Existing partitioning columns:
         |  ${existingPartitionColumns.mkString(", ")}
         |Requested partitioning columns:
         |  ${partitionColumns.mkString(", ")}
         |""".stripMargin)
  }
}
...
}


private def getOrInferFileFormatSchema(
    format: FileFormat,
    justPartitioning: Boolean = false): (StructType, StructType) = {
  // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
  // in streaming mode, we have already inferred and registered partition columns, we will
  // never have to materialize the lazy val below
  lazy val tempFileIndex = {
    val allPaths = caseInsensitiveOptions.get("path") ++ paths
    val hadoopConf = sparkSession.sessionState.newHadoopConf()
    val globbedPaths = allPaths.toSeq.flatMap { path =>
      val hdfsPath = new Path(path)
      val fs = hdfsPath.getFileSystem(hadoopConf)
      val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
      SparkHadoopUtil.get.globPathIfNecessary(qualified)
    }.toArray
   // InMemoryFileIndex.refresh0() cache all files info ,oom risks
    new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
  }
  val partitionSchema = if (partitionColumns.isEmpty) {
    // Try to infer partitioning, because no DataSource in the read path provides the partitioning
    // columns properly unless it is a Hive DataSource
    val resolved = tempFileIndex.partitionSchema.map { partitionField =>
      val equality = sparkSession.sessionState.conf.resolver
      // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
      userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
        partitionField)
    }
    StructType(resolved)
  } else {
    // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
    // partitioning
    if (userSpecifiedSchema.isEmpty) {
      val inferredPartitions = tempFileIndex.partitionSchema
      inferredPartitions
    } else {
      val partitionFields = partitionColumns.map { partitionColumn =>
        val equality = sparkSession.sessionState.conf.resolver
        userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
          val inferredPartitions = tempFileIndex.partitionSchema
          val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
          if (inferredOpt.isDefined) {
            logDebug(
              s"""Type of partition column: $partitionColumn not found in specified schema
                 |for $format.
                 |User Specified Schema
                 |=====================
                 |${userSpecifiedSchema.orNull}
                 |
                 |Falling back to inferred dataType if it exists.
               """.stripMargin)
          }
          inferredOpt
        }.getOrElse {
          throw new AnalysisException(s"Failed to resolve the schema for $format for " +
            s"the partition column: $partitionColumn. It must be specified manually.")
        }
      }
      StructType(partitionFields)
    }
  }
  if (justPartitioning) {
    return (null, partitionSchema)
  }
  val dataSchema = userSpecifiedSchema.map { schema =>
    val equality = sparkSession.sessionState.conf.resolver
    StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
  }.orElse {
    format.inferSchema(
      sparkSession,
      caseInsensitiveOptions,
      tempFileIndex.allFiles())
  }.getOrElse {
    throw new AnalysisException(
      s"Unable to infer schema for $format. It must be specified manually.")
  }
  (dataSchema, partitionSchema)
}
{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org