You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/16 18:44:57 UTC

git commit: [SPARK-2119][SQL] Improved Parquet performance when reading off S3

Repository: spark
Updated Branches:
  refs/heads/master 632fb3d9a -> efc452a16


[SPARK-2119][SQL] Improved Parquet performance when reading off S3

JIRA issue: [SPARK-2119](https://issues.apache.org/jira/browse/SPARK-2119)

Essentially this PR fixed three issues to gain much better performance when reading large Parquet file off S3.

1. When reading the schema, fetching Parquet metadata from a part-file rather than the `_metadata` file

   The `_metadata` file contains metadata of all row groups, and can be very large if there are many row groups. Since schema information and row group metadata are coupled within a single Thrift object, we have to read the whole `_metadata` to fetch the schema. On the other hand, schema is replicated among footers of all part-files, which are fairly small.

1. Only add the root directory of the Parquet file rather than all the part-files to input paths

   HDFS API can automatically filter out all hidden files and underscore files (`_SUCCESS` & `_metadata`), there's no need to filter out all part-files and add them individually to input paths. What make it much worse is that, `FileInputFormat.listStatus()` calls `FileSystem.globStatus()` on each individual input path sequentially, each results a blocking remote S3 HTTP request.

1. Worked around [PARQUET-16](https://issues.apache.org/jira/browse/PARQUET-16)

   Essentially PARQUET-16 is similar to the above issue, and results lots of sequential `FileSystem.getFileStatus()` calls, which are further translated into a bunch of remote S3 HTTP requests.

   `FilteringParquetRowInputFormat` should be cleaned up once PARQUET-16 is fixed.

Below is the micro benchmark result. The dataset used is a S3 Parquet file consists of 3,793 partitions, about 110MB per partition in average. The benchmark is done with a 9-node AWS cluster.

- Creating a Parquet `SchemaRDD` (Parquet schema is fetched)

  ```scala
  val tweets = parquetFile(uri)
  ```

  - Before: 17.80s
  - After: 8.61s

- Fetching partition information

  ```scala
  tweets.getPartitions
  ```

  - Before: 700.87s
  - After: 21.47s

- Counting the whole file (both steps above are executed altogether)

  ```scala
  parquetFile(uri).count()
  ```

  - Before: ??? (haven't test yet)
  - After: 53.26s

Author: Cheng Lian <li...@gmail.com>

Closes #1370 from liancheng/faster-parquet and squashes the following commits:

94a2821 [Cheng Lian] Added comments about schema consistency
d2c4417 [Cheng Lian] Worked around PARQUET-16 to improve Parquet performance
1c0d1b9 [Cheng Lian] Accelerated Parquet schema retrieving
5bd3d29 [Cheng Lian] Fixed Parquet log level


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efc452a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efc452a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efc452a1

Branch: refs/heads/master
Commit: efc452a16322e8b20b3c4fe1d6847315f928cd2d
Parents: 632fb3d
Author: Cheng Lian <li...@gmail.com>
Authored: Wed Jul 16 12:44:51 2014 -0400
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Jul 16 12:44:51 2014 -0400

----------------------------------------------------------------------
 .../sql/parquet/ParquetTableOperations.scala    | 115 +++++++++++++++----
 .../spark/sql/parquet/ParquetTableSupport.scala |  27 +++--
 .../apache/spark/sql/parquet/ParquetTypes.scala |  33 +++---
 3 files changed, 125 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/efc452a1/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index ade823b..ea74320 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -17,27 +17,34 @@
 
 package org.apache.spark.sql.parquet
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.Try
+
 import java.io.IOException
+import java.lang.{Long => JLong}
 import java.text.SimpleDateFormat
-import java.util.Date
+import java.util.{Date, List => JList}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 
-import parquet.hadoop.{ParquetRecordReader, ParquetInputFormat, ParquetOutputFormat}
-import parquet.hadoop.api.ReadSupport
+import parquet.hadoop._
+import parquet.hadoop.api.{InitContext, ReadSupport}
+import parquet.hadoop.metadata.GlobalMetaData
 import parquet.hadoop.util.ContextUtil
-import parquet.io.InvalidRecordException
+import parquet.io.ParquetDecodingException
 import parquet.schema.MessageType
 
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
+import org.apache.spark.{Logging, SerializableWritable, TaskContext}
 
 /**
  * Parquet table scan operator. Imports the file that backs the given
@@ -55,16 +62,14 @@ case class ParquetTableScan(
   override def execute(): RDD[Row] = {
     val sc = sqlContext.sparkContext
     val job = new Job(sc.hadoopConfiguration)
-    ParquetInputFormat.setReadSupportClass(
-      job,
-      classOf[org.apache.spark.sql.parquet.RowReadSupport])
+    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
+
     val conf: Configuration = ContextUtil.getConfiguration(job)
-    val fileList = FileSystemHelper.listFiles(relation.path, conf)
-    // add all paths in the directory but skip "hidden" ones such
-    // as "_SUCCESS" and "_metadata"
-    for (path <- fileList if !path.getName.startsWith("_")) {
-      NewFileInputFormat.addInputPath(job, path)
+    val qualifiedPath = {
+      val path = new Path(relation.path)
+      path.getFileSystem(conf).makeQualified(path)
     }
+    NewFileInputFormat.addInputPath(job, qualifiedPath)
 
     // Store both requested and original schema in `Configuration`
     conf.set(
@@ -87,7 +92,7 @@ case class ParquetTableScan(
 
     sc.newAPIHadoopRDD(
       conf,
-      classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat],
+      classOf[FilteringParquetRowInputFormat],
       classOf[Void],
       classOf[Row])
       .map(_._2)
@@ -122,14 +127,7 @@ case class ParquetTableScan(
   private def validateProjection(projection: Seq[Attribute]): Boolean = {
     val original: MessageType = relation.parquetSchema
     val candidate: MessageType = ParquetTypesConverter.convertFromAttributes(projection)
-    try {
-      original.checkContains(candidate)
-      true
-    } catch {
-      case e: InvalidRecordException => {
-        false
-      }
-    }
+    Try(original.checkContains(candidate)).isSuccess
   }
 }
 
@@ -302,6 +300,11 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
  */
 private[parquet] class FilteringParquetRowInputFormat
   extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
+
+  private var footers: JList[Footer] = _
+
+  private var fileStatuses= Map.empty[Path, FileStatus]
+
   override def createRecordReader(
       inputSplit: InputSplit,
       taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = {
@@ -318,6 +321,70 @@ private[parquet] class FilteringParquetRowInputFormat
       new ParquetRecordReader[Row](readSupport)
     }
   }
+
+  override def getFooters(jobContext: JobContext): JList[Footer] = {
+    if (footers eq null) {
+      val statuses = listStatus(jobContext)
+      fileStatuses = statuses.map(file => file.getPath -> file).toMap
+      footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
+    }
+
+    footers
+  }
+
+  // TODO Remove this method and related code once PARQUET-16 is fixed
+  // This method together with the `getFooters` method and the `fileStatuses` field are just used
+  // to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17
+  override def getSplits(
+      configuration: Configuration,
+      footers: JList[Footer]): JList[ParquetInputSplit] = {
+
+    val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
+    val minSplitSize: JLong =
+      Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
+    if (maxSplitSize < 0 || minSplitSize < 0) {
+      throw new ParquetDecodingException(
+        s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" +
+          s" minSplitSize = $minSplitSize")
+    }
+
+    val getGlobalMetaData =
+      classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
+    getGlobalMetaData.setAccessible(true)
+    val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]
+
+    val readContext = getReadSupport(configuration).init(
+      new InitContext(configuration,
+        globalMetaData.getKeyValueMetaData(),
+        globalMetaData.getSchema()))
+
+    val generateSplits =
+      classOf[ParquetInputFormat[_]].getDeclaredMethods.find(_.getName == "generateSplits").get
+    generateSplits.setAccessible(true)
+
+    val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
+    for (footer <- footers) {
+      val fs = footer.getFile.getFileSystem(configuration)
+      val file = footer.getFile
+      val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
+      val parquetMetaData = footer.getParquetMetadata
+      val blocks = parquetMetaData.getBlocks
+      val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
+      splits.addAll(
+        generateSplits.invoke(
+          null,
+          blocks,
+          fileBlockLocations,
+          fileStatus,
+          parquetMetaData.getFileMetaData,
+          readContext.getRequestedSchema.toString,
+          readContext.getReadSupportMetadata,
+          minSplitSize,
+          maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
+    }
+
+    splits
+  }
 }
 
 private[parquet] object FileSystemHelper {

http://git-wip-us.apache.org/repos/asf/spark/blob/efc452a1/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index f1953a0..39294a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -17,20 +17,19 @@
 
 package org.apache.spark.sql.parquet
 
-import org.apache.hadoop.conf.Configuration
+import java.util.{HashMap => JHashMap}
 
+import org.apache.hadoop.conf.Configuration
 import parquet.column.ParquetProperties
 import parquet.hadoop.ParquetOutputFormat
 import parquet.hadoop.api.ReadSupport.ReadContext
 import parquet.hadoop.api.{ReadSupport, WriteSupport}
 import parquet.io.api._
-import parquet.schema.{MessageType, MessageTypeParser}
+import parquet.schema.MessageType
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
 import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.execution.SparkSqlSerializer
-import com.google.common.io.BaseEncoding
 
 /**
  * A `parquet.io.api.RecordMaterializer` for Rows.
@@ -93,8 +92,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
       configuration: Configuration,
       keyValueMetaData: java.util.Map[String, String],
       fileSchema: MessageType): ReadContext = {
-    var parquetSchema: MessageType = fileSchema
-    var metadata: java.util.Map[String, String] = new java.util.HashMap[String, String]()
+    var parquetSchema = fileSchema
+    val metadata = new JHashMap[String, String]()
     val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)
 
     if (requestedAttributes != null) {
@@ -109,7 +108,7 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
       metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)
     }
 
-    return new ReadSupport.ReadContext(parquetSchema, metadata)
+    new ReadSupport.ReadContext(parquetSchema, metadata)
   }
 }
 
@@ -132,13 +131,17 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
   private[parquet] var attributes: Seq[Attribute] = null
 
   override def init(configuration: Configuration): WriteSupport.WriteContext = {
-    attributes = if (attributes == null) RowWriteSupport.getSchema(configuration) else attributes
-    
+    val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
+    val metadata = new JHashMap[String, String]()
+    metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)
+
+    if (attributes == null) {
+      attributes = ParquetTypesConverter.convertFromString(origAttributesStr)
+    }
+
     log.debug(s"write support initialized for requested schema $attributes")
     ParquetRelation.enableLogForwarding()
-    new WriteSupport.WriteContext(
-      ParquetTypesConverter.convertFromAttributes(attributes),
-      new java.util.HashMap[java.lang.String, java.lang.String]())
+    new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata)
   }
 
   override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/efc452a1/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 7f6ad90..58370b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -22,6 +22,7 @@ import java.io.IOException
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 
 import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
 import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
@@ -367,20 +368,24 @@ private[parquet] object ParquetTypesConverter extends Logging {
         s"Expected $path for be a directory with Parquet files/metadata")
     }
     ParquetRelation.enableLogForwarding()
-    val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
-    // if this is a new table that was just created we will find only the metadata file
-    if (fs.exists(metadataPath) && fs.isFile(metadataPath)) {
-      ParquetFileReader.readFooter(conf, metadataPath)
-    } else {
-      // there may be one or more Parquet files in the given directory
-      val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path))
-      // TODO: for now we assume that all footers (if there is more than one) have identical
-      // metadata; we may want to add a check here at some point
-      if (footers.size() == 0) {
-        throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")
-      }
-      footers(0).getParquetMetadata
+
+    val children = fs.listStatus(path).filterNot {
+      _.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME
     }
+
+    // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
+    // groups. Since Parquet schema is replicated among all row groups, we only need to touch a
+    // single row group to read schema related metadata. Notice that we are making assumptions that
+    // all data in a single Parquet file have the same schema, which is normally true.
+    children
+      // Try any non-"_metadata" file first...
+      .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
+      // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is
+      // empty, thus normally the "_metadata" file is expected to be fairly small).
+      .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE))
+      .map(ParquetFileReader.readFooter(conf, _))
+      .getOrElse(
+        throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
   }
 
   /**
@@ -403,7 +408,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
     } else {
       val attributes = convertToAttributes(
         readMetaData(origPath, conf).getFileMetaData.getSchema)
-      log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes")
+      log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
       attributes
     }
   }