You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by Jason Altekruse <al...@gmail.com> on 2020/10/01 00:29:55 UTC

Re: Metadata summary file deprecation

My current prototype does require a small spark changeset, here is the
diff. As I mentioned above I probably want to modify this not to read all
footers from the driver if a summary file is totally absent, the remaining
files could fall back to use the executor side pruning and uniform split
planning.

---
 .../sql/execution/DataSourceScanExec.scala    | 56 ++++++++++++++++---
 .../parquet/ParquetFileFormat.scala           | 13 +++++
 .../datasources/parquet/ParquetFilters.scala  |  2 +-
 3 files changed, 62 insertions(+), 9 deletions(-)

diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 7dc9faeac7..9400dc976c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -17,10 +17,16 @@

 package org.apache.spark.sql.execution

+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer

 import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus,
Path}
+import org.apache.parquet.filter2.compat.{FilterCompat, RowGroupFilter}
+import org.apache.parquet.hadoop.{Footer, ParquetFileReader}
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
+import org.apache.parquet.schema.MessageType

 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
@@ -321,6 +327,7 @@ case class FileSourceScanExec(
   private lazy val inputRDD: RDD[InternalRow] = {
     // Update metrics for taking effect in both code generation node and
normal node.
     updateDriverMetrics()
+    val hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,
@@ -329,13 +336,13 @@ case class FileSourceScanExec(
         requiredSchema = requiredSchema,
         filters = pushedDownFilters,
         options = relation.options,
-        hadoopConf =
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+        hadoopConf = hadoopConf)

     relation.bucketSpec match {
       case Some(bucketing) if
relation.sparkSession.sessionState.conf.bucketingEnabled =>
         createBucketedReadRDD(bucketing, readFile, selectedPartitions,
relation)
       case _ =>
-        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
+        createNonBucketedReadRDD(readFile, hadoopConf, selectedPartitions,
relation)
     }
   }

@@ -453,6 +460,7 @@ case class FileSourceScanExec(
    */
   private def createNonBucketedReadRDD(
       readFile: (PartitionedFile) => Iterator[InternalRow],
+      hadoopConf: Configuration,
       selectedPartitions: Array[PartitionDirectory],
       fsRelation: HadoopFsRelation): RDD[InternalRow] = {
     val defaultMaxSplitBytes =
@@ -466,17 +474,49 @@ case class FileSourceScanExec(
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes
bytes, " +
       s"open cost is considered as scanning $openCostInBytes bytes.")

+    val footersMap: Map[Path, ParquetMetadata] =
+      if (fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
pushedDownFilters.nonEmpty) {
+        // pushedDownFilters
+        val splitFiles: java.util.Collection[FileStatus] =
selectedPartitions.flatMap { partition =>
+          partition.files }.toSeq.asJavaCollection
+        val footers: java.util.List[Footer] =
+          ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
+            hadoopConf, splitFiles, false)
+        footers.asScala.map(footer => footer.getFile ->
footer.getParquetMetadata).toMap
+      } else {
+        Map()
+      }
     val splitFiles = selectedPartitions.flatMap { partition =>
       partition.files.flatMap { file =>
         val blockLocations = getBlockLocations(file)
         if (fsRelation.fileFormat.isSplitable(
             fsRelation.sparkSession, fsRelation.options, file.getPath)) {
-          (0L until file.getLen by maxSplitBytes).map { offset =>
-            val remaining = file.getLen - offset
-            val size = if (remaining > maxSplitBytes) maxSplitBytes else
remaining
-            val hosts = getBlockHosts(blockLocations, offset, size)
-            PartitionedFile(
-              partition.values, file.getPath.toUri.toString, offset, size,
hosts)
+          if (fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
+              pushedDownFilters.nonEmpty) {
+            val footer = footersMap(file.getPath)
+            val fileSchema: MessageType =
footer.getFileMetaData().getSchema();
+            val filter =
+
 ParquetSource.makeParquetFilters(fsRelation.sparkSession.sessionState.conf)
+                .createFilter(fileSchema, pushedDownFilters(1) /* TODO and
these together */).get
+
+            val blocks = RowGroupFilter.filterRowGroups(
+              FilterCompat.get(filter), footer.getBlocks(), fileSchema)
+            (blocks.asScala
+                .map(block =>
block.getColumns.asScala.head.getStartingPos)).map { offset =>
+              val remaining = file.getLen - offset
+              val size = if (remaining > maxSplitBytes) maxSplitBytes else
remaining
+              val hosts = getBlockHosts(blockLocations, offset, size)
+              PartitionedFile(
+                partition.values, file.getPath.toUri.toString, offset,
size, hosts)
+            }
+          } else {
+            (0L until file.getLen by maxSplitBytes).map { offset =>
+              val remaining = file.getLen - offset
+              val size = if (remaining > maxSplitBytes) maxSplitBytes else
remaining
+              val hosts = getBlockHosts(blockLocations, offset, size)
+              PartitionedFile(
+                partition.values, file.getPath.toUri.toString, offset,
size, hosts)
+            }
           }
         } else {
           val hosts = getBlockHosts(blockLocations, 0, file.getLen)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 16cd570901..247a4b05f4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -473,6 +473,19 @@ class ParquetFileFormat
 }

 object ParquetFileFormat extends Logging {
+
+  def makeParquetFilters(sqlConf: SQLConf): ParquetFilters = {
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith =
sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+    new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
+      pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
+  }
+
   private[parquet] def readSchema(
       footers: Seq[Footer], sparkSession: SparkSession):
Option[StructType] = {

diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7e420d36f4..8a44331629 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Some utility function to convert Spark data source filters to Parquet
filters.
  */
-private[parquet] class ParquetFilters(
+class ParquetFilters(
     pushDownDate: Boolean,
     pushDownTimestamp: Boolean,
     pushDownDecimal: Boolean,
-- 
2.23.0

On Wed, Sep 30, 2020, 5:29 PM Ryan Blue <rb...@netflix.com> wrote:

> I agree that using column-level metrics is a great way to avoid doing
> extra work. I didn't think that Parquet used column metrics from the
> metadata file or directly from footers for job planning though?
>
> That's another thing that we've built into the newer formats. Iceberg will
> prune unnecessary data files using lower/upper bounds, null counts, etc.
>
> On Wed, Sep 30, 2020 at 3:19 PM Jason Altekruse <al...@gmail.com>
> wrote:
>
>> I'm not skipping row group metadata, I am trying to accomplish driver
>> side pruning, I need to read row group information to get the column stats.
>> We are managing the schema elsewhere.
>>
>> On engines replacing files, this is why I had proposed possibly adding
>> length and/or last modified time to the summary files.
>>
>> I do understand that there are useful benefits to pushing as much as
>> possible to the executors, and I agree with this change, but it has
>> side-effects. Now for a table with thousands of files spark spins up
>> thousands of tasks, if most of these can just be pruned out based on
>> metadata, this can happen much faster with a consolidated list of all of
>> the metadata rather than having all of the coordination overhead of the
>> small tasks.
>>
>> I actually would like to have a design that would do the "fall-back"
>> using the driver side pruning and uniform split planning for any footers
>> missing from the summary file, but I thought that might add extra
>> complexity to the discussion.
>>
>> Jason Altekruse
>>
>>
>> On Wed, Sep 30, 2020 at 2:53 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> I went back and looked at the code a bit. Looks like the deprecation
>>> also had to do with MR job split planning.
>>>
>>> The main reason summary files were useful (I think) was that split
>>> planning was done using the footers. This was extremely slow when reading
>>> individual files, even when parallelizing on the driver. But, there were
>>> correctness issues introduced by the metadata files because engines could
>>> replace the data files easily because they used the same names, like
>>> `part-r-00000`. To avoid the planning time, we switched to using FileSplit
>>> based on uniform split sizes, just like other formats. That removed the
>>> need for metadata files for split planning, and I think that's when we
>>> deprecated them.
>>>
>>> When not reading the row group information from the metadata file, it is
>>> assumed that all of the other metadata is the same
>>> <https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L180-L184>
>>> because the metadata is used for all of the requested files. This means
>>> that for the use case of tracking file schemas, Parquet would do the wrong
>>> thing.
>>>
>>> So the metadata files don't work quite right for table metadata and are
>>> no longer used or recommended for planning splits. Seems reasonable to
>>> deprecate them to me.
>>>
>>> For the table-level metadata, like schema, I would highly recommend
>>> tracking that centrally in a table abstraction. If you're using standard
>>> Parquet tables in Spark, there are severe limitations to what you can do
>>> safely. What's worse, there is no validation for a path-based table that
>>> you're not producing data with an incompatible schema or making an
>>> incompatible change. So you can write two different types to the same
>>> column name or break your table by renaming a column. This is one of the
>>> reasons why we wanted to introduce better table metadata, so tables can
>>> behave like real SQL tables.
>>>
>>> rb
>>>
>>> On Wed, Sep 30, 2020 at 9:38 AM Jason Altekruse <
>>> altekrusejason@gmail.com> wrote:
>>>
>>>> Hey Ryan,
>>>>
>>>> Thanks for the response, I do not want to push the parquet community to
>>>> keep around features that will cause users headaches, but I am still
>>>> looking for the best solution to the problem I am facing.
>>>>
>>>> One thing I could use some clarity on, given what I have seen in
>>>> various tools, I am actually not sure that there is a significant risk of
>>>> wrong results with one possible small recommendation in relation to these
>>>> files.
>>>>
>>>> I was not assuming that directories were immutable, and the method
>>>> included in parquet-mr that I referenced,
>>>> readAllFootersInParallelUsingSummaryFiles, also goes against the notion
>>>> that this is a hard requirement. It specifically takes in a file listing,
>>>> which needs to be provided by some external source, either the FS directly,
>>>> or some transactional system like delta in my case and actually uses the
>>>> metadata summary file just as a cache for footers, with explicit code to
>>>> fall back to red any footers missing from the summary directly from the FS
>>>> itself.
>>>>
>>>> It sounds like some projects in the past have used this file to avoid
>>>> doing an FS listing, which I absolutely agree isn't safe to do and will
>>>> cause problems when people copy in new files to a directory. Can we just
>>>> document that this practice is bad? And possibly just deprecate any code
>>>> that reads the summary file without this kind of fallback and an
>>>> expectation that callers pass in a list of files they expect to get footers
>>>> for?
>>>>
>>>> I also don't know if I have ever seen a project take advantage of the
>>>> fact that you can technically directly append to a parquet file by reading
>>>> in the previous footer, appending new row groups and writing out a whole
>>>> new footer with the new metadata combined with the old, leaving dead bytes
>>>> in the file where the old footer sat. I do remember discussing this
>>>> possibility with Julien at some point, but I don't know if parquet-mr or
>>>> any other projects actually have written code to do this. If this is done,
>>>> it would provide another way for the summary file to become stale, and this
>>>> would not be detectable with just knowing the filename, the summary would
>>>> need to contain file length info.
>>>>
>>>> There is also the possibility that parquet files could be deleted and
>>>> rewritten in the same filenames, but this isn't common in any hadoop/spark
>>>> ecosystem projects I know of, they all generate unique filenames using
>>>> application IDs or GUIDs.
>>>>
>>>> Jason Altekruse
>>>>
>>>> On Tue, Sep 29, 2020 at 8:26 PM Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> I don't remember deprecating it, but I've always recommended against
>>>>> using
>>>>> it because of the assumptions it requires.
>>>>>
>>>>> Those assumptions are routinely violated by processing engines and
>>>>> users
>>>>> that expect to be able to drop files into directories and see the
>>>>> results
>>>>> in their table. Since this was a feature without guard rails or
>>>>> documentation to explain how to safely use it, I think it is a good
>>>>> idea to
>>>>> steer people away from it and deprecate it unless someone wants to
>>>>> address
>>>>> those concerns. Now, I think there are much better alternatives (thanks
>>>>> Jacques!) so I probably wouldn't recommend spending time on bringing
>>>>> this
>>>>> up to date and making it marginally safer.
>>>>>
>>>>> On Tue, Sep 29, 2020 at 11:38 AM Julien Le Dem <julien.ledem@gmail.com
>>>>> >
>>>>> wrote:
>>>>>
>>>>> > Hi Jason,
>>>>> > Thank you for bringing this up.
>>>>> > A correctness issue would only come up when more parquet files are
>>>>> > added to the same folder or files are modified. Historically folders
>>>>> have
>>>>> > been considered immutables and the summary file reflects the
>>>>> metadata for
>>>>> > all the files in the folder. The summary file contains the names of
>>>>> the
>>>>> > files it is for, so extra files in the folder can also be detected
>>>>> and
>>>>> > dealt with at read time without correctness issues.
>>>>> > Like you mentioned the read path allows for those files to not be
>>>>> present.
>>>>> > I think a better solution than deprecating would be to have a switch
>>>>> > allowing turning off those summary files when one wants to be able
>>>>> to not
>>>>> > respect the immutable folder contact.
>>>>> > Projects like Iceberg can elect to not produce them and allow
>>>>> modifying
>>>>> > and adding more files to the same folder without creating correctness
>>>>> > problems.
>>>>> > I would be in favor of removing those Deprecated annotations and
>>>>> document
>>>>> > the use of a switch to optionally not produce the summary files when
>>>>> > electing to modify folders.
>>>>> > I'm curious to hear from Ryan about this who did the change in the
>>>>> first
>>>>> > place.
>>>>> > Best,
>>>>> > Julien
>>>>> >
>>>>> > On Fri, Sep 25, 2020 at 3:06 PM Jason Altekruse <
>>>>> altekrusejason@gmail.com>
>>>>> > wrote:
>>>>> >
>>>>> >> Hy Jacques,
>>>>> >>
>>>>> >> It's good to hear from you, thanks for the pointer to Iceberg. I am
>>>>> aware
>>>>> >> of it as well as other similar projects, including Delta Lake,
>>>>> which my
>>>>> >> team is already using. Unfortunately even with Delta, there is only
>>>>> a
>>>>> >> placeholder in the project currently where they will be tracking
>>>>> file
>>>>> >> level
>>>>> >> statistics at some point in the future, we are also evaluating the
>>>>> >> possibility of implementing this in delta itself. While it and
>>>>> Iceberg
>>>>> >> aren't quite the same architecturally, I think there is enough
>>>>> overlap
>>>>> >> that
>>>>> >> it might be a bit awkward to use the two in conjunction with one
>>>>> another.
>>>>> >>
>>>>> >> From my testing so far, it appears that delta pretty easily can
>>>>> operate
>>>>> >> alongside these older metadata summary files without the two
>>>>> fighting with
>>>>> >> each other. Delta is responsible for maintaining a transactionally
>>>>> >> consistent list of files, and this file can coexist in the
>>>>> directory just
>>>>> >> to allow efficient pruning on the driver side on a best effort
>>>>> basis, as
>>>>> >> it
>>>>> >> can gracefully fall back to the FS if it is missing a newer file.
>>>>> >>
>>>>> >> We are somewhat nervous about depending on something that is marked
>>>>> >> deprecated, but as it is so close to a "just works" state for our
>>>>> needs, I
>>>>> >> was hoping to confirm with the community if there were other risks
>>>>> I was
>>>>> >> missing.
>>>>> >>
>>>>> >> Jason Altekruse
>>>>> >>
>>>>> >> On Wed, Sep 23, 2020 at 6:29 PM Jacques Nadeau <ja...@apache.org>
>>>>> >> wrote:
>>>>> >>
>>>>> >> > Hey Jason,
>>>>> >> >
>>>>> >> > I'd suggest you look at Apache Iceberg. It is a much more mature
>>>>> way of
>>>>> >> > handling metadata efficiency issues and provides a substantial
>>>>> superset
>>>>> >> of
>>>>> >> > functionality over the old metadata cache files.
>>>>> >> >
>>>>> >> > On Wed, Sep 23, 2020 at 4:16 PM Jason Altekruse <
>>>>> >> altekrusejason@gmail.com>
>>>>> >> > wrote:
>>>>> >> >
>>>>> >> > > Hello again,
>>>>> >> > >
>>>>> >> > > I took a look through the mail archives and found a little more
>>>>> >> > information
>>>>> >> > > in this and a few other threads.
>>>>> >> > >
>>>>> >> > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> http://mail-archives.apache.org/mod_mbox//parquet-dev/201707.mbox/%3CCAO4re1k8-bZZZWBRuLCnm1V7AoJE1fdunSuBn%2BecRuFGPgcXnA%40mail.gmail.com%3E
>>>>> >> > >
>>>>> >> > > While I do understand the benefits for federating out the
>>>>> reading of
>>>>> >> > > footers for the sake of not worrying about synchronization
>>>>> between the
>>>>> >> > > cached metadata and any changes to the files on disk, it does
>>>>> appear
>>>>> >> > there
>>>>> >> > > is still a use case that isn't solved well with this design,
>>>>> needle
>>>>> >> in a
>>>>> >> > > haystack selective filter queries, where the data is sorted by
>>>>> the
>>>>> >> filter
>>>>> >> > > column. For example in the tests I ran with queries against
>>>>> lots of
>>>>> >> > parquet
>>>>> >> > > files where the vast majority are pruned by a bunch of small
>>>>> tasks, it
>>>>> >> > > takes 33 seconds vs just 1-2 seconds with driver side pruning
>>>>> using
>>>>> >> the
>>>>> >> > > summary file (requires a small spark changet).
>>>>> >> > >
>>>>> >> > > In our use case we are never going to be replacing contents of
>>>>> >> existing
>>>>> >> > > parquet files (with a delete and rewrite on HDFS) or appending
>>>>> new row
>>>>> >> > > groups onto existing files. In that case I don't believe we
>>>>> should
>>>>> >> > > experience any correctness problems, but I wanted to confirm if
>>>>> there
>>>>> >> is
>>>>> >> > > something I am missing. I am
>>>>> >> > > using readAllFootersInParallelUsingSummaryFiles which does fall
>>>>> back
>>>>> >> to
>>>>> >> > > read individual footers if they are missing from the summary
>>>>> file.
>>>>> >> > >
>>>>> >> > > I am also curious if a solution to the correctness problems
>>>>> could be
>>>>> >> to
>>>>> >> > > include a file length and/or last modified time into the
>>>>> summary file,
>>>>> >> > > which could confirm against FS metadata that the files on disk
>>>>> are
>>>>> >> still
>>>>> >> > in
>>>>> >> > > sync with the metadata summary relatively quickly. Would it be
>>>>> >> possible
>>>>> >> > to
>>>>> >> > > consider avoiding this deprecation if I was to work on an
>>>>> update to
>>>>> >> > > implement this?
>>>>> >> > >
>>>>> >> > > - Jason Altekruse
>>>>> >> > >
>>>>> >> > >
>>>>> >> > > On Tue, Sep 15, 2020 at 8:52 PM Jason Altekruse <
>>>>> >> > altekrusejason@gmail.com>
>>>>> >> > > wrote:
>>>>> >> > >
>>>>> >> > > > Hello all,
>>>>> >> > > >
>>>>> >> > > > I have been working on optimizing reads in spark to avoid
>>>>> spinning
>>>>> >> up
>>>>> >> > > lots
>>>>> >> > > > of short lived tasks that just perform row group pruning in
>>>>> >> selective
>>>>> >> > > > filter queries.
>>>>> >> > > >
>>>>> >> > > > My high level question is why metadata summary files were
>>>>> marked
>>>>> >> > > > deprecated in this Parquet changeset? There isn't much
>>>>> explanation
>>>>> >> > given
>>>>> >> > > > or a description of what should be used instead.
>>>>> >> > > > https://github.com/apache/parquet-mr/pull/429
>>>>> >> > > >
>>>>> >> > > > There are other members of the broader parquet community that
>>>>> are
>>>>> >> also
>>>>> >> > > > confused by this deprecation, see this discussion in an arrow
>>>>> PR.
>>>>> >> > > > https://github.com/apache/arrow/pull/4166
>>>>> >> > > >
>>>>> >> > > > In the course of making my small prototype I got an extra
>>>>> >> performance
>>>>> >> > > > boost by making spark write out metadata summary files,
>>>>> rather than
>>>>> >> > > having
>>>>> >> > > > to read all footers on the driver. This effect would be even
>>>>> more
>>>>> >> > > > pronounced on a completely remote storage system like S3.
>>>>> Writing
>>>>> >> these
>>>>> >> > > > summary files was disabled by default in SPARK-15719, because
>>>>> of the
>>>>> >> > > > performance impact of appending a small number of new files
>>>>> to an
>>>>> >> > > existing
>>>>> >> > > > dataset with many files.
>>>>> >> > > >
>>>>> >> > > > https://issues.apache.org/jira/browse/SPARK-15719
>>>>> >> > > >
>>>>> >> > > > This spark JIRA does make decent points considering how spark
>>>>> >> operates
>>>>> >> > > > today, but I think that there is a performance optimization
>>>>> >> opportunity
>>>>> >> > > > that is missed because the row group pruning is deferred to a
>>>>> bunch
>>>>> >> of
>>>>> >> > > > separate short lived tasks rather than done upfront,
>>>>> currently spark
>>>>> >> > only
>>>>> >> > > > uses footers on the driver for schema merging.
>>>>> >> > > >
>>>>> >> > > > Thanks for the help!
>>>>> >> > > > Jason Altekruse
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> >
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Re: Metadata summary file deprecation

Posted by Tim Armstrong <ta...@cloudera.com.INVALID>.
I'm chiming in a bit late here, but I wanted to mention that Hive has a
tendency to reuse file names when you do an INSERT OVERWRITE of a
partition. We've had to deal with a number of problems related to this when
caching data from parquet files - it's necessary to be scrupulous about
comparing mtime of the file as well as the path.

Newer table formats (Hive ACID, Iceberg, etc) are better about not
overwriting files in place.

On Thu, Oct 1, 2020 at 5:08 AM Patrick Woody <pa...@gmail.com>
wrote:

> Hey Jason,
>
> Somewhat tangential advice here, but I did basically this exact thing a few
> years back in Spark w/ Parquet metadata and had a couple of notes from it
> running in prod for a while:
> - The majority of individual cases where the assumptions hold (no mismatch
> of summary metadata and actual files), this works perfectly fine. Very good
> sweet spot on medium-sized data. The latency gain isn't worth it for
> smaller tables. Large tables can be a toss up - outstandingly useful when
> you have well laid out data with a sort-order, less useful/worse
> performance if it is randomly distributed even if the query is highly
> selective.
> - The memory pressure across queries can get a bit crazy here if you are
> using it in a Spark driver with many queries going off concurrently.
> - The degenerate cases for high file-count tables and tables with large
> statistics (many columns or actual values) can be tedious to handle and
> will make your driver OOM.
>
> All these things are roughly solvable, but figured I'd chime in if you end
> up going down this route.
>
> One last aside - these are exactly the types of workflows that the
> formalization of the table format of Iceberg can enable in a safe and
> scalable way - excited to see where that goes!
>
> Best,
> Pat
>
>
> On Wed, Sep 30, 2020 at 8:30 PM Jason Altekruse <al...@gmail.com>
> wrote:
>
> > My current prototype does require a small spark changeset, here is the
> > diff. As I mentioned above I probably want to modify this not to read all
> > footers from the driver if a summary file is totally absent, the
> remaining
> > files could fall back to use the executor side pruning and uniform split
> > planning.
> >
> > ---
> >  .../sql/execution/DataSourceScanExec.scala    | 56 ++++++++++++++++---
> >  .../parquet/ParquetFileFormat.scala           | 13 +++++
> >  .../datasources/parquet/ParquetFilters.scala  |  2 +-
> >  3 files changed, 62 insertions(+), 9 deletions(-)
> >
> > diff --git
> >
> >
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
> >
> >
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
> > index 7dc9faeac7..9400dc976c 100644
> > ---
> >
> >
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
> > +++
> >
> >
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
> > @@ -17,10 +17,16 @@
> >
> >  package org.apache.spark.sql.execution
> >
> > +import scala.collection.JavaConverters._
> >  import scala.collection.mutable.ArrayBuffer
> >
> >  import org.apache.commons.lang3.StringUtils
> > +import org.apache.hadoop.conf.Configuration
> >  import org.apache.hadoop.fs.{BlockLocation, FileStatus,
> LocatedFileStatus,
> > Path}
> > +import org.apache.parquet.filter2.compat.{FilterCompat, RowGroupFilter}
> > +import org.apache.parquet.hadoop.{Footer, ParquetFileReader}
> > +import org.apache.parquet.hadoop.metadata.ParquetMetadata
> > +import org.apache.parquet.schema.MessageType
> >
> >  import org.apache.spark.rdd.RDD
> >  import org.apache.spark.sql.SparkSession
> > @@ -321,6 +327,7 @@ case class FileSourceScanExec(
> >    private lazy val inputRDD: RDD[InternalRow] = {
> >      // Update metrics for taking effect in both code generation node and
> > normal node.
> >      updateDriverMetrics()
> > +    val hadoopConf =
> >
> >
> relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
> >      val readFile: (PartitionedFile) => Iterator[InternalRow] =
> >        relation.fileFormat.buildReaderWithPartitionValues(
> >          sparkSession = relation.sparkSession,
> > @@ -329,13 +336,13 @@ case class FileSourceScanExec(
> >          requiredSchema = requiredSchema,
> >          filters = pushedDownFilters,
> >          options = relation.options,
> > -        hadoopConf =
> >
> >
> relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
> > +        hadoopConf = hadoopConf)
> >
> >      relation.bucketSpec match {
> >        case Some(bucketing) if
> > relation.sparkSession.sessionState.conf.bucketingEnabled =>
> >          createBucketedReadRDD(bucketing, readFile, selectedPartitions,
> > relation)
> >        case _ =>
> > -        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
> > +        createNonBucketedReadRDD(readFile, hadoopConf,
> selectedPartitions,
> > relation)
> >      }
> >    }
> >
> > @@ -453,6 +460,7 @@ case class FileSourceScanExec(
> >     */
> >    private def createNonBucketedReadRDD(
> >        readFile: (PartitionedFile) => Iterator[InternalRow],
> > +      hadoopConf: Configuration,
> >        selectedPartitions: Array[PartitionDirectory],
> >        fsRelation: HadoopFsRelation): RDD[InternalRow] = {
> >      val defaultMaxSplitBytes =
> > @@ -466,17 +474,49 @@ case class FileSourceScanExec(
> >      logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes
> > bytes, " +
> >        s"open cost is considered as scanning $openCostInBytes bytes.")
> >
> > +    val footersMap: Map[Path, ParquetMetadata] =
> > +      if (fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
> > pushedDownFilters.nonEmpty) {
> > +        // pushedDownFilters
> > +        val splitFiles: java.util.Collection[FileStatus] =
> > selectedPartitions.flatMap { partition =>
> > +          partition.files }.toSeq.asJavaCollection
> > +        val footers: java.util.List[Footer] =
> > +          ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
> > +            hadoopConf, splitFiles, false)
> > +        footers.asScala.map(footer => footer.getFile ->
> > footer.getParquetMetadata).toMap
> > +      } else {
> > +        Map()
> > +      }
> >      val splitFiles = selectedPartitions.flatMap { partition =>
> >        partition.files.flatMap { file =>
> >          val blockLocations = getBlockLocations(file)
> >          if (fsRelation.fileFormat.isSplitable(
> >              fsRelation.sparkSession, fsRelation.options, file.getPath))
> {
> > -          (0L until file.getLen by maxSplitBytes).map { offset =>
> > -            val remaining = file.getLen - offset
> > -            val size = if (remaining > maxSplitBytes) maxSplitBytes else
> > remaining
> > -            val hosts = getBlockHosts(blockLocations, offset, size)
> > -            PartitionedFile(
> > -              partition.values, file.getPath.toUri.toString, offset,
> size,
> > hosts)
> > +          if (fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
> > +              pushedDownFilters.nonEmpty) {
> > +            val footer = footersMap(file.getPath)
> > +            val fileSchema: MessageType =
> > footer.getFileMetaData().getSchema();
> > +            val filter =
> > +
> >
> >
> ParquetSource.makeParquetFilters(fsRelation.sparkSession.sessionState.conf)
> > +                .createFilter(fileSchema, pushedDownFilters(1) /* TODO
> and
> > these together */).get
> > +
> > +            val blocks = RowGroupFilter.filterRowGroups(
> > +              FilterCompat.get(filter), footer.getBlocks(), fileSchema)
> > +            (blocks.asScala
> > +                .map(block =>
> > block.getColumns.asScala.head.getStartingPos)).map { offset =>
> > +              val remaining = file.getLen - offset
> > +              val size = if (remaining > maxSplitBytes) maxSplitBytes
> else
> > remaining
> > +              val hosts = getBlockHosts(blockLocations, offset, size)
> > +              PartitionedFile(
> > +                partition.values, file.getPath.toUri.toString, offset,
> > size, hosts)
> > +            }
> > +          } else {
> > +            (0L until file.getLen by maxSplitBytes).map { offset =>
> > +              val remaining = file.getLen - offset
> > +              val size = if (remaining > maxSplitBytes) maxSplitBytes
> else
> > remaining
> > +              val hosts = getBlockHosts(blockLocations, offset, size)
> > +              PartitionedFile(
> > +                partition.values, file.getPath.toUri.toString, offset,
> > size, hosts)
> > +            }
> >            }
> >          } else {
> >            val hosts = getBlockHosts(blockLocations, 0, file.getLen)
> > diff --git
> >
> >
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
> >
> >
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
> > index 16cd570901..247a4b05f4 100644
> > ---
> >
> >
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
> > +++
> >
> >
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
> > @@ -473,6 +473,19 @@ class ParquetFileFormat
> >  }
> >
> >  object ParquetFileFormat extends Logging {
> > +
> > +  def makeParquetFilters(sqlConf: SQLConf): ParquetFilters = {
> > +    val pushDownDate = sqlConf.parquetFilterPushDownDate
> > +    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
> > +    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
> > +    val pushDownStringStartWith =
> > sqlConf.parquetFilterPushDownStringStartWith
> > +    val pushDownInFilterThreshold =
> > sqlConf.parquetFilterPushDownInFilterThreshold
> > +    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
> > +
> > +    new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
> > +      pushDownStringStartWith, pushDownInFilterThreshold,
> isCaseSensitive)
> > +  }
> > +
> >    private[parquet] def readSchema(
> >        footers: Seq[Footer], sparkSession: SparkSession):
> > Option[StructType] = {
> >
> > diff --git
> >
> >
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
> >
> >
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
> > index 7e420d36f4..8a44331629 100644
> > ---
> >
> >
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
> > +++
> >
> >
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
> > @@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
> >  /**
> >   * Some utility function to convert Spark data source filters to Parquet
> > filters.
> >   */
> > -private[parquet] class ParquetFilters(
> > +class ParquetFilters(
> >      pushDownDate: Boolean,
> >      pushDownTimestamp: Boolean,
> >      pushDownDecimal: Boolean,
> > --
> > 2.23.0
> >
> > On Wed, Sep 30, 2020, 5:29 PM Ryan Blue <rb...@netflix.com> wrote:
> >
> > > I agree that using column-level metrics is a great way to avoid doing
> > > extra work. I didn't think that Parquet used column metrics from the
> > > metadata file or directly from footers for job planning though?
> > >
> > > That's another thing that we've built into the newer formats. Iceberg
> > will
> > > prune unnecessary data files using lower/upper bounds, null counts,
> etc.
> > >
> > > On Wed, Sep 30, 2020 at 3:19 PM Jason Altekruse <
> > altekrusejason@gmail.com>
> > > wrote:
> > >
> > >> I'm not skipping row group metadata, I am trying to accomplish driver
> > >> side pruning, I need to read row group information to get the column
> > stats.
> > >> We are managing the schema elsewhere.
> > >>
> > >> On engines replacing files, this is why I had proposed possibly adding
> > >> length and/or last modified time to the summary files.
> > >>
> > >> I do understand that there are useful benefits to pushing as much as
> > >> possible to the executors, and I agree with this change, but it has
> > >> side-effects. Now for a table with thousands of files spark spins up
> > >> thousands of tasks, if most of these can just be pruned out based on
> > >> metadata, this can happen much faster with a consolidated list of all
> of
> > >> the metadata rather than having all of the coordination overhead of
> the
> > >> small tasks.
> > >>
> > >> I actually would like to have a design that would do the "fall-back"
> > >> using the driver side pruning and uniform split planning for any
> footers
> > >> missing from the summary file, but I thought that might add extra
> > >> complexity to the discussion.
> > >>
> > >> Jason Altekruse
> > >>
> > >>
> > >> On Wed, Sep 30, 2020 at 2:53 PM Ryan Blue <rb...@netflix.com> wrote:
> > >>
> > >>> I went back and looked at the code a bit. Looks like the deprecation
> > >>> also had to do with MR job split planning.
> > >>>
> > >>> The main reason summary files were useful (I think) was that split
> > >>> planning was done using the footers. This was extremely slow when
> > reading
> > >>> individual files, even when parallelizing on the driver. But, there
> > were
> > >>> correctness issues introduced by the metadata files because engines
> > could
> > >>> replace the data files easily because they used the same names, like
> > >>> `part-r-00000`. To avoid the planning time, we switched to using
> > FileSplit
> > >>> based on uniform split sizes, just like other formats. That removed
> the
> > >>> need for metadata files for split planning, and I think that's when
> we
> > >>> deprecated them.
> > >>>
> > >>> When not reading the row group information from the metadata file, it
> > is
> > >>> assumed that all of the other metadata is the same
> > >>> <
> >
> https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L180-L184
> > >
> > >>> because the metadata is used for all of the requested files. This
> means
> > >>> that for the use case of tracking file schemas, Parquet would do the
> > wrong
> > >>> thing.
> > >>>
> > >>> So the metadata files don't work quite right for table metadata and
> are
> > >>> no longer used or recommended for planning splits. Seems reasonable
> to
> > >>> deprecate them to me.
> > >>>
> > >>> For the table-level metadata, like schema, I would highly recommend
> > >>> tracking that centrally in a table abstraction. If you're using
> > standard
> > >>> Parquet tables in Spark, there are severe limitations to what you can
> > do
> > >>> safely. What's worse, there is no validation for a path-based table
> > that
> > >>> you're not producing data with an incompatible schema or making an
> > >>> incompatible change. So you can write two different types to the same
> > >>> column name or break your table by renaming a column. This is one of
> > the
> > >>> reasons why we wanted to introduce better table metadata, so tables
> can
> > >>> behave like real SQL tables.
> > >>>
> > >>> rb
> > >>>
> > >>> On Wed, Sep 30, 2020 at 9:38 AM Jason Altekruse <
> > >>> altekrusejason@gmail.com> wrote:
> > >>>
> > >>>> Hey Ryan,
> > >>>>
> > >>>> Thanks for the response, I do not want to push the parquet community
> > to
> > >>>> keep around features that will cause users headaches, but I am still
> > >>>> looking for the best solution to the problem I am facing.
> > >>>>
> > >>>> One thing I could use some clarity on, given what I have seen in
> > >>>> various tools, I am actually not sure that there is a significant
> > risk of
> > >>>> wrong results with one possible small recommendation in relation to
> > these
> > >>>> files.
> > >>>>
> > >>>> I was not assuming that directories were immutable, and the method
> > >>>> included in parquet-mr that I referenced,
> > >>>> readAllFootersInParallelUsingSummaryFiles, also goes against the
> > notion
> > >>>> that this is a hard requirement. It specifically takes in a file
> > listing,
> > >>>> which needs to be provided by some external source, either the FS
> > directly,
> > >>>> or some transactional system like delta in my case and actually uses
> > the
> > >>>> metadata summary file just as a cache for footers, with explicit
> code
> > to
> > >>>> fall back to red any footers missing from the summary directly from
> > the FS
> > >>>> itself.
> > >>>>
> > >>>> It sounds like some projects in the past have used this file to
> avoid
> > >>>> doing an FS listing, which I absolutely agree isn't safe to do and
> > will
> > >>>> cause problems when people copy in new files to a directory. Can we
> > just
> > >>>> document that this practice is bad? And possibly just deprecate any
> > code
> > >>>> that reads the summary file without this kind of fallback and an
> > >>>> expectation that callers pass in a list of files they expect to get
> > footers
> > >>>> for?
> > >>>>
> > >>>> I also don't know if I have ever seen a project take advantage of
> the
> > >>>> fact that you can technically directly append to a parquet file by
> > reading
> > >>>> in the previous footer, appending new row groups and writing out a
> > whole
> > >>>> new footer with the new metadata combined with the old, leaving dead
> > bytes
> > >>>> in the file where the old footer sat. I do remember discussing this
> > >>>> possibility with Julien at some point, but I don't know if
> parquet-mr
> > or
> > >>>> any other projects actually have written code to do this. If this is
> > done,
> > >>>> it would provide another way for the summary file to become stale,
> > and this
> > >>>> would not be detectable with just knowing the filename, the summary
> > would
> > >>>> need to contain file length info.
> > >>>>
> > >>>> There is also the possibility that parquet files could be deleted
> and
> > >>>> rewritten in the same filenames, but this isn't common in any
> > hadoop/spark
> > >>>> ecosystem projects I know of, they all generate unique filenames
> using
> > >>>> application IDs or GUIDs.
> > >>>>
> > >>>> Jason Altekruse
> > >>>>
> > >>>> On Tue, Sep 29, 2020 at 8:26 PM Ryan Blue <rblue@netflix.com.invalid
> >
> > >>>> wrote:
> > >>>>
> > >>>>> I don't remember deprecating it, but I've always recommended
> against
> > >>>>> using
> > >>>>> it because of the assumptions it requires.
> > >>>>>
> > >>>>> Those assumptions are routinely violated by processing engines and
> > >>>>> users
> > >>>>> that expect to be able to drop files into directories and see the
> > >>>>> results
> > >>>>> in their table. Since this was a feature without guard rails or
> > >>>>> documentation to explain how to safely use it, I think it is a good
> > >>>>> idea to
> > >>>>> steer people away from it and deprecate it unless someone wants to
> > >>>>> address
> > >>>>> those concerns. Now, I think there are much better alternatives
> > (thanks
> > >>>>> Jacques!) so I probably wouldn't recommend spending time on
> bringing
> > >>>>> this
> > >>>>> up to date and making it marginally safer.
> > >>>>>
> > >>>>> On Tue, Sep 29, 2020 at 11:38 AM Julien Le Dem <
> > julien.ledem@gmail.com
> > >>>>> >
> > >>>>> wrote:
> > >>>>>
> > >>>>> > Hi Jason,
> > >>>>> > Thank you for bringing this up.
> > >>>>> > A correctness issue would only come up when more parquet files
> are
> > >>>>> > added to the same folder or files are modified. Historically
> > folders
> > >>>>> have
> > >>>>> > been considered immutables and the summary file reflects the
> > >>>>> metadata for
> > >>>>> > all the files in the folder. The summary file contains the names
> of
> > >>>>> the
> > >>>>> > files it is for, so extra files in the folder can also be
> detected
> > >>>>> and
> > >>>>> > dealt with at read time without correctness issues.
> > >>>>> > Like you mentioned the read path allows for those files to not be
> > >>>>> present.
> > >>>>> > I think a better solution than deprecating would be to have a
> > switch
> > >>>>> > allowing turning off those summary files when one wants to be
> able
> > >>>>> to not
> > >>>>> > respect the immutable folder contact.
> > >>>>> > Projects like Iceberg can elect to not produce them and allow
> > >>>>> modifying
> > >>>>> > and adding more files to the same folder without creating
> > correctness
> > >>>>> > problems.
> > >>>>> > I would be in favor of removing those Deprecated annotations and
> > >>>>> document
> > >>>>> > the use of a switch to optionally not produce the summary files
> > when
> > >>>>> > electing to modify folders.
> > >>>>> > I'm curious to hear from Ryan about this who did the change in
> the
> > >>>>> first
> > >>>>> > place.
> > >>>>> > Best,
> > >>>>> > Julien
> > >>>>> >
> > >>>>> > On Fri, Sep 25, 2020 at 3:06 PM Jason Altekruse <
> > >>>>> altekrusejason@gmail.com>
> > >>>>> > wrote:
> > >>>>> >
> > >>>>> >> Hy Jacques,
> > >>>>> >>
> > >>>>> >> It's good to hear from you, thanks for the pointer to Iceberg. I
> > am
> > >>>>> aware
> > >>>>> >> of it as well as other similar projects, including Delta Lake,
> > >>>>> which my
> > >>>>> >> team is already using. Unfortunately even with Delta, there is
> > only
> > >>>>> a
> > >>>>> >> placeholder in the project currently where they will be tracking
> > >>>>> file
> > >>>>> >> level
> > >>>>> >> statistics at some point in the future, we are also evaluating
> the
> > >>>>> >> possibility of implementing this in delta itself. While it and
> > >>>>> Iceberg
> > >>>>> >> aren't quite the same architecturally, I think there is enough
> > >>>>> overlap
> > >>>>> >> that
> > >>>>> >> it might be a bit awkward to use the two in conjunction with one
> > >>>>> another.
> > >>>>> >>
> > >>>>> >> From my testing so far, it appears that delta pretty easily can
> > >>>>> operate
> > >>>>> >> alongside these older metadata summary files without the two
> > >>>>> fighting with
> > >>>>> >> each other. Delta is responsible for maintaining a
> transactionally
> > >>>>> >> consistent list of files, and this file can coexist in the
> > >>>>> directory just
> > >>>>> >> to allow efficient pruning on the driver side on a best effort
> > >>>>> basis, as
> > >>>>> >> it
> > >>>>> >> can gracefully fall back to the FS if it is missing a newer
> file.
> > >>>>> >>
> > >>>>> >> We are somewhat nervous about depending on something that is
> > marked
> > >>>>> >> deprecated, but as it is so close to a "just works" state for
> our
> > >>>>> needs, I
> > >>>>> >> was hoping to confirm with the community if there were other
> risks
> > >>>>> I was
> > >>>>> >> missing.
> > >>>>> >>
> > >>>>> >> Jason Altekruse
> > >>>>> >>
> > >>>>> >> On Wed, Sep 23, 2020 at 6:29 PM Jacques Nadeau <
> > jacques@apache.org>
> > >>>>> >> wrote:
> > >>>>> >>
> > >>>>> >> > Hey Jason,
> > >>>>> >> >
> > >>>>> >> > I'd suggest you look at Apache Iceberg. It is a much more
> mature
> > >>>>> way of
> > >>>>> >> > handling metadata efficiency issues and provides a substantial
> > >>>>> superset
> > >>>>> >> of
> > >>>>> >> > functionality over the old metadata cache files.
> > >>>>> >> >
> > >>>>> >> > On Wed, Sep 23, 2020 at 4:16 PM Jason Altekruse <
> > >>>>> >> altekrusejason@gmail.com>
> > >>>>> >> > wrote:
> > >>>>> >> >
> > >>>>> >> > > Hello again,
> > >>>>> >> > >
> > >>>>> >> > > I took a look through the mail archives and found a little
> > more
> > >>>>> >> > information
> > >>>>> >> > > in this and a few other threads.
> > >>>>> >> > >
> > >>>>> >> > >
> > >>>>> >> > >
> > >>>>> >> >
> > >>>>> >>
> > >>>>>
> >
> http://mail-archives.apache.org/mod_mbox//parquet-dev/201707.mbox/%3CCAO4re1k8-bZZZWBRuLCnm1V7AoJE1fdunSuBn%2BecRuFGPgcXnA%40mail.gmail.com%3E
> > >>>>> >> > >
> > >>>>> >> > > While I do understand the benefits for federating out the
> > >>>>> reading of
> > >>>>> >> > > footers for the sake of not worrying about synchronization
> > >>>>> between the
> > >>>>> >> > > cached metadata and any changes to the files on disk, it
> does
> > >>>>> appear
> > >>>>> >> > there
> > >>>>> >> > > is still a use case that isn't solved well with this design,
> > >>>>> needle
> > >>>>> >> in a
> > >>>>> >> > > haystack selective filter queries, where the data is sorted
> by
> > >>>>> the
> > >>>>> >> filter
> > >>>>> >> > > column. For example in the tests I ran with queries against
> > >>>>> lots of
> > >>>>> >> > parquet
> > >>>>> >> > > files where the vast majority are pruned by a bunch of small
> > >>>>> tasks, it
> > >>>>> >> > > takes 33 seconds vs just 1-2 seconds with driver side
> pruning
> > >>>>> using
> > >>>>> >> the
> > >>>>> >> > > summary file (requires a small spark changet).
> > >>>>> >> > >
> > >>>>> >> > > In our use case we are never going to be replacing contents
> of
> > >>>>> >> existing
> > >>>>> >> > > parquet files (with a delete and rewrite on HDFS) or
> appending
> > >>>>> new row
> > >>>>> >> > > groups onto existing files. In that case I don't believe we
> > >>>>> should
> > >>>>> >> > > experience any correctness problems, but I wanted to confirm
> > if
> > >>>>> there
> > >>>>> >> is
> > >>>>> >> > > something I am missing. I am
> > >>>>> >> > > using readAllFootersInParallelUsingSummaryFiles which does
> > fall
> > >>>>> back
> > >>>>> >> to
> > >>>>> >> > > read individual footers if they are missing from the summary
> > >>>>> file.
> > >>>>> >> > >
> > >>>>> >> > > I am also curious if a solution to the correctness problems
> > >>>>> could be
> > >>>>> >> to
> > >>>>> >> > > include a file length and/or last modified time into the
> > >>>>> summary file,
> > >>>>> >> > > which could confirm against FS metadata that the files on
> disk
> > >>>>> are
> > >>>>> >> still
> > >>>>> >> > in
> > >>>>> >> > > sync with the metadata summary relatively quickly. Would it
> be
> > >>>>> >> possible
> > >>>>> >> > to
> > >>>>> >> > > consider avoiding this deprecation if I was to work on an
> > >>>>> update to
> > >>>>> >> > > implement this?
> > >>>>> >> > >
> > >>>>> >> > > - Jason Altekruse
> > >>>>> >> > >
> > >>>>> >> > >
> > >>>>> >> > > On Tue, Sep 15, 2020 at 8:52 PM Jason Altekruse <
> > >>>>> >> > altekrusejason@gmail.com>
> > >>>>> >> > > wrote:
> > >>>>> >> > >
> > >>>>> >> > > > Hello all,
> > >>>>> >> > > >
> > >>>>> >> > > > I have been working on optimizing reads in spark to avoid
> > >>>>> spinning
> > >>>>> >> up
> > >>>>> >> > > lots
> > >>>>> >> > > > of short lived tasks that just perform row group pruning
> in
> > >>>>> >> selective
> > >>>>> >> > > > filter queries.
> > >>>>> >> > > >
> > >>>>> >> > > > My high level question is why metadata summary files were
> > >>>>> marked
> > >>>>> >> > > > deprecated in this Parquet changeset? There isn't much
> > >>>>> explanation
> > >>>>> >> > given
> > >>>>> >> > > > or a description of what should be used instead.
> > >>>>> >> > > > https://github.com/apache/parquet-mr/pull/429
> > >>>>> >> > > >
> > >>>>> >> > > > There are other members of the broader parquet community
> > that
> > >>>>> are
> > >>>>> >> also
> > >>>>> >> > > > confused by this deprecation, see this discussion in an
> > arrow
> > >>>>> PR.
> > >>>>> >> > > > https://github.com/apache/arrow/pull/4166
> > >>>>> >> > > >
> > >>>>> >> > > > In the course of making my small prototype I got an extra
> > >>>>> >> performance
> > >>>>> >> > > > boost by making spark write out metadata summary files,
> > >>>>> rather than
> > >>>>> >> > > having
> > >>>>> >> > > > to read all footers on the driver. This effect would be
> even
> > >>>>> more
> > >>>>> >> > > > pronounced on a completely remote storage system like S3.
> > >>>>> Writing
> > >>>>> >> these
> > >>>>> >> > > > summary files was disabled by default in SPARK-15719,
> > because
> > >>>>> of the
> > >>>>> >> > > > performance impact of appending a small number of new
> files
> > >>>>> to an
> > >>>>> >> > > existing
> > >>>>> >> > > > dataset with many files.
> > >>>>> >> > > >
> > >>>>> >> > > > https://issues.apache.org/jira/browse/SPARK-15719
> > >>>>> >> > > >
> > >>>>> >> > > > This spark JIRA does make decent points considering how
> > spark
> > >>>>> >> operates
> > >>>>> >> > > > today, but I think that there is a performance
> optimization
> > >>>>> >> opportunity
> > >>>>> >> > > > that is missed because the row group pruning is deferred
> to
> > a
> > >>>>> bunch
> > >>>>> >> of
> > >>>>> >> > > > separate short lived tasks rather than done upfront,
> > >>>>> currently spark
> > >>>>> >> > only
> > >>>>> >> > > > uses footers on the driver for schema merging.
> > >>>>> >> > > >
> > >>>>> >> > > > Thanks for the help!
> > >>>>> >> > > > Jason Altekruse
> > >>>>> >> > > >
> > >>>>> >> > >
> > >>>>> >> >
> > >>>>> >>
> > >>>>> >
> > >>>>>
> > >>>>> --
> > >>>>> Ryan Blue
> > >>>>> Software Engineer
> > >>>>> Netflix
> > >>>>>
> > >>>>
> > >>>
> > >>> --
> > >>> Ryan Blue
> > >>> Software Engineer
> > >>> Netflix
> > >>>
> > >>
> > >
> > > --
> > > Ryan Blue
> > > Software Engineer
> > > Netflix
> > >
> >
>

Re: Metadata summary file deprecation

Posted by Patrick Woody <pa...@gmail.com>.
Hey Jason,

Somewhat tangential advice here, but I did basically this exact thing a few
years back in Spark w/ Parquet metadata and had a couple of notes from it
running in prod for a while:
- The majority of individual cases where the assumptions hold (no mismatch
of summary metadata and actual files), this works perfectly fine. Very good
sweet spot on medium-sized data. The latency gain isn't worth it for
smaller tables. Large tables can be a toss up - outstandingly useful when
you have well laid out data with a sort-order, less useful/worse
performance if it is randomly distributed even if the query is highly
selective.
- The memory pressure across queries can get a bit crazy here if you are
using it in a Spark driver with many queries going off concurrently.
- The degenerate cases for high file-count tables and tables with large
statistics (many columns or actual values) can be tedious to handle and
will make your driver OOM.

All these things are roughly solvable, but figured I'd chime in if you end
up going down this route.

One last aside - these are exactly the types of workflows that the
formalization of the table format of Iceberg can enable in a safe and
scalable way - excited to see where that goes!

Best,
Pat


On Wed, Sep 30, 2020 at 8:30 PM Jason Altekruse <al...@gmail.com>
wrote:

> My current prototype does require a small spark changeset, here is the
> diff. As I mentioned above I probably want to modify this not to read all
> footers from the driver if a summary file is totally absent, the remaining
> files could fall back to use the executor side pruning and uniform split
> planning.
>
> ---
>  .../sql/execution/DataSourceScanExec.scala    | 56 ++++++++++++++++---
>  .../parquet/ParquetFileFormat.scala           | 13 +++++
>  .../datasources/parquet/ParquetFilters.scala  |  2 +-
>  3 files changed, 62 insertions(+), 9 deletions(-)
>
> diff --git
>
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
>
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
> index 7dc9faeac7..9400dc976c 100644
> ---
>
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
> +++
>
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
> @@ -17,10 +17,16 @@
>
>  package org.apache.spark.sql.execution
>
> +import scala.collection.JavaConverters._
>  import scala.collection.mutable.ArrayBuffer
>
>  import org.apache.commons.lang3.StringUtils
> +import org.apache.hadoop.conf.Configuration
>  import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus,
> Path}
> +import org.apache.parquet.filter2.compat.{FilterCompat, RowGroupFilter}
> +import org.apache.parquet.hadoop.{Footer, ParquetFileReader}
> +import org.apache.parquet.hadoop.metadata.ParquetMetadata
> +import org.apache.parquet.schema.MessageType
>
>  import org.apache.spark.rdd.RDD
>  import org.apache.spark.sql.SparkSession
> @@ -321,6 +327,7 @@ case class FileSourceScanExec(
>    private lazy val inputRDD: RDD[InternalRow] = {
>      // Update metrics for taking effect in both code generation node and
> normal node.
>      updateDriverMetrics()
> +    val hadoopConf =
>
> relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)
>      val readFile: (PartitionedFile) => Iterator[InternalRow] =
>        relation.fileFormat.buildReaderWithPartitionValues(
>          sparkSession = relation.sparkSession,
> @@ -329,13 +336,13 @@ case class FileSourceScanExec(
>          requiredSchema = requiredSchema,
>          filters = pushedDownFilters,
>          options = relation.options,
> -        hadoopConf =
>
> relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
> +        hadoopConf = hadoopConf)
>
>      relation.bucketSpec match {
>        case Some(bucketing) if
> relation.sparkSession.sessionState.conf.bucketingEnabled =>
>          createBucketedReadRDD(bucketing, readFile, selectedPartitions,
> relation)
>        case _ =>
> -        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
> +        createNonBucketedReadRDD(readFile, hadoopConf, selectedPartitions,
> relation)
>      }
>    }
>
> @@ -453,6 +460,7 @@ case class FileSourceScanExec(
>     */
>    private def createNonBucketedReadRDD(
>        readFile: (PartitionedFile) => Iterator[InternalRow],
> +      hadoopConf: Configuration,
>        selectedPartitions: Array[PartitionDirectory],
>        fsRelation: HadoopFsRelation): RDD[InternalRow] = {
>      val defaultMaxSplitBytes =
> @@ -466,17 +474,49 @@ case class FileSourceScanExec(
>      logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes
> bytes, " +
>        s"open cost is considered as scanning $openCostInBytes bytes.")
>
> +    val footersMap: Map[Path, ParquetMetadata] =
> +      if (fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
> pushedDownFilters.nonEmpty) {
> +        // pushedDownFilters
> +        val splitFiles: java.util.Collection[FileStatus] =
> selectedPartitions.flatMap { partition =>
> +          partition.files }.toSeq.asJavaCollection
> +        val footers: java.util.List[Footer] =
> +          ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
> +            hadoopConf, splitFiles, false)
> +        footers.asScala.map(footer => footer.getFile ->
> footer.getParquetMetadata).toMap
> +      } else {
> +        Map()
> +      }
>      val splitFiles = selectedPartitions.flatMap { partition =>
>        partition.files.flatMap { file =>
>          val blockLocations = getBlockLocations(file)
>          if (fsRelation.fileFormat.isSplitable(
>              fsRelation.sparkSession, fsRelation.options, file.getPath)) {
> -          (0L until file.getLen by maxSplitBytes).map { offset =>
> -            val remaining = file.getLen - offset
> -            val size = if (remaining > maxSplitBytes) maxSplitBytes else
> remaining
> -            val hosts = getBlockHosts(blockLocations, offset, size)
> -            PartitionedFile(
> -              partition.values, file.getPath.toUri.toString, offset, size,
> hosts)
> +          if (fsRelation.fileFormat.isInstanceOf[ParquetSource] &&
> +              pushedDownFilters.nonEmpty) {
> +            val footer = footersMap(file.getPath)
> +            val fileSchema: MessageType =
> footer.getFileMetaData().getSchema();
> +            val filter =
> +
>
>  ParquetSource.makeParquetFilters(fsRelation.sparkSession.sessionState.conf)
> +                .createFilter(fileSchema, pushedDownFilters(1) /* TODO and
> these together */).get
> +
> +            val blocks = RowGroupFilter.filterRowGroups(
> +              FilterCompat.get(filter), footer.getBlocks(), fileSchema)
> +            (blocks.asScala
> +                .map(block =>
> block.getColumns.asScala.head.getStartingPos)).map { offset =>
> +              val remaining = file.getLen - offset
> +              val size = if (remaining > maxSplitBytes) maxSplitBytes else
> remaining
> +              val hosts = getBlockHosts(blockLocations, offset, size)
> +              PartitionedFile(
> +                partition.values, file.getPath.toUri.toString, offset,
> size, hosts)
> +            }
> +          } else {
> +            (0L until file.getLen by maxSplitBytes).map { offset =>
> +              val remaining = file.getLen - offset
> +              val size = if (remaining > maxSplitBytes) maxSplitBytes else
> remaining
> +              val hosts = getBlockHosts(blockLocations, offset, size)
> +              PartitionedFile(
> +                partition.values, file.getPath.toUri.toString, offset,
> size, hosts)
> +            }
>            }
>          } else {
>            val hosts = getBlockHosts(blockLocations, 0, file.getLen)
> diff --git
>
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
>
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
> index 16cd570901..247a4b05f4 100644
> ---
>
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
> +++
>
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
> @@ -473,6 +473,19 @@ class ParquetFileFormat
>  }
>
>  object ParquetFileFormat extends Logging {
> +
> +  def makeParquetFilters(sqlConf: SQLConf): ParquetFilters = {
> +    val pushDownDate = sqlConf.parquetFilterPushDownDate
> +    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
> +    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
> +    val pushDownStringStartWith =
> sqlConf.parquetFilterPushDownStringStartWith
> +    val pushDownInFilterThreshold =
> sqlConf.parquetFilterPushDownInFilterThreshold
> +    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
> +
> +    new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
> +      pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
> +  }
> +
>    private[parquet] def readSchema(
>        footers: Seq[Footer], sparkSession: SparkSession):
> Option[StructType] = {
>
> diff --git
>
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
>
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
> index 7e420d36f4..8a44331629 100644
> ---
>
> a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
> +++
>
> b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
> @@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
>  /**
>   * Some utility function to convert Spark data source filters to Parquet
> filters.
>   */
> -private[parquet] class ParquetFilters(
> +class ParquetFilters(
>      pushDownDate: Boolean,
>      pushDownTimestamp: Boolean,
>      pushDownDecimal: Boolean,
> --
> 2.23.0
>
> On Wed, Sep 30, 2020, 5:29 PM Ryan Blue <rb...@netflix.com> wrote:
>
> > I agree that using column-level metrics is a great way to avoid doing
> > extra work. I didn't think that Parquet used column metrics from the
> > metadata file or directly from footers for job planning though?
> >
> > That's another thing that we've built into the newer formats. Iceberg
> will
> > prune unnecessary data files using lower/upper bounds, null counts, etc.
> >
> > On Wed, Sep 30, 2020 at 3:19 PM Jason Altekruse <
> altekrusejason@gmail.com>
> > wrote:
> >
> >> I'm not skipping row group metadata, I am trying to accomplish driver
> >> side pruning, I need to read row group information to get the column
> stats.
> >> We are managing the schema elsewhere.
> >>
> >> On engines replacing files, this is why I had proposed possibly adding
> >> length and/or last modified time to the summary files.
> >>
> >> I do understand that there are useful benefits to pushing as much as
> >> possible to the executors, and I agree with this change, but it has
> >> side-effects. Now for a table with thousands of files spark spins up
> >> thousands of tasks, if most of these can just be pruned out based on
> >> metadata, this can happen much faster with a consolidated list of all of
> >> the metadata rather than having all of the coordination overhead of the
> >> small tasks.
> >>
> >> I actually would like to have a design that would do the "fall-back"
> >> using the driver side pruning and uniform split planning for any footers
> >> missing from the summary file, but I thought that might add extra
> >> complexity to the discussion.
> >>
> >> Jason Altekruse
> >>
> >>
> >> On Wed, Sep 30, 2020 at 2:53 PM Ryan Blue <rb...@netflix.com> wrote:
> >>
> >>> I went back and looked at the code a bit. Looks like the deprecation
> >>> also had to do with MR job split planning.
> >>>
> >>> The main reason summary files were useful (I think) was that split
> >>> planning was done using the footers. This was extremely slow when
> reading
> >>> individual files, even when parallelizing on the driver. But, there
> were
> >>> correctness issues introduced by the metadata files because engines
> could
> >>> replace the data files easily because they used the same names, like
> >>> `part-r-00000`. To avoid the planning time, we switched to using
> FileSplit
> >>> based on uniform split sizes, just like other formats. That removed the
> >>> need for metadata files for split planning, and I think that's when we
> >>> deprecated them.
> >>>
> >>> When not reading the row group information from the metadata file, it
> is
> >>> assumed that all of the other metadata is the same
> >>> <
> https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L180-L184
> >
> >>> because the metadata is used for all of the requested files. This means
> >>> that for the use case of tracking file schemas, Parquet would do the
> wrong
> >>> thing.
> >>>
> >>> So the metadata files don't work quite right for table metadata and are
> >>> no longer used or recommended for planning splits. Seems reasonable to
> >>> deprecate them to me.
> >>>
> >>> For the table-level metadata, like schema, I would highly recommend
> >>> tracking that centrally in a table abstraction. If you're using
> standard
> >>> Parquet tables in Spark, there are severe limitations to what you can
> do
> >>> safely. What's worse, there is no validation for a path-based table
> that
> >>> you're not producing data with an incompatible schema or making an
> >>> incompatible change. So you can write two different types to the same
> >>> column name or break your table by renaming a column. This is one of
> the
> >>> reasons why we wanted to introduce better table metadata, so tables can
> >>> behave like real SQL tables.
> >>>
> >>> rb
> >>>
> >>> On Wed, Sep 30, 2020 at 9:38 AM Jason Altekruse <
> >>> altekrusejason@gmail.com> wrote:
> >>>
> >>>> Hey Ryan,
> >>>>
> >>>> Thanks for the response, I do not want to push the parquet community
> to
> >>>> keep around features that will cause users headaches, but I am still
> >>>> looking for the best solution to the problem I am facing.
> >>>>
> >>>> One thing I could use some clarity on, given what I have seen in
> >>>> various tools, I am actually not sure that there is a significant
> risk of
> >>>> wrong results with one possible small recommendation in relation to
> these
> >>>> files.
> >>>>
> >>>> I was not assuming that directories were immutable, and the method
> >>>> included in parquet-mr that I referenced,
> >>>> readAllFootersInParallelUsingSummaryFiles, also goes against the
> notion
> >>>> that this is a hard requirement. It specifically takes in a file
> listing,
> >>>> which needs to be provided by some external source, either the FS
> directly,
> >>>> or some transactional system like delta in my case and actually uses
> the
> >>>> metadata summary file just as a cache for footers, with explicit code
> to
> >>>> fall back to red any footers missing from the summary directly from
> the FS
> >>>> itself.
> >>>>
> >>>> It sounds like some projects in the past have used this file to avoid
> >>>> doing an FS listing, which I absolutely agree isn't safe to do and
> will
> >>>> cause problems when people copy in new files to a directory. Can we
> just
> >>>> document that this practice is bad? And possibly just deprecate any
> code
> >>>> that reads the summary file without this kind of fallback and an
> >>>> expectation that callers pass in a list of files they expect to get
> footers
> >>>> for?
> >>>>
> >>>> I also don't know if I have ever seen a project take advantage of the
> >>>> fact that you can technically directly append to a parquet file by
> reading
> >>>> in the previous footer, appending new row groups and writing out a
> whole
> >>>> new footer with the new metadata combined with the old, leaving dead
> bytes
> >>>> in the file where the old footer sat. I do remember discussing this
> >>>> possibility with Julien at some point, but I don't know if parquet-mr
> or
> >>>> any other projects actually have written code to do this. If this is
> done,
> >>>> it would provide another way for the summary file to become stale,
> and this
> >>>> would not be detectable with just knowing the filename, the summary
> would
> >>>> need to contain file length info.
> >>>>
> >>>> There is also the possibility that parquet files could be deleted and
> >>>> rewritten in the same filenames, but this isn't common in any
> hadoop/spark
> >>>> ecosystem projects I know of, they all generate unique filenames using
> >>>> application IDs or GUIDs.
> >>>>
> >>>> Jason Altekruse
> >>>>
> >>>> On Tue, Sep 29, 2020 at 8:26 PM Ryan Blue <rb...@netflix.com.invalid>
> >>>> wrote:
> >>>>
> >>>>> I don't remember deprecating it, but I've always recommended against
> >>>>> using
> >>>>> it because of the assumptions it requires.
> >>>>>
> >>>>> Those assumptions are routinely violated by processing engines and
> >>>>> users
> >>>>> that expect to be able to drop files into directories and see the
> >>>>> results
> >>>>> in their table. Since this was a feature without guard rails or
> >>>>> documentation to explain how to safely use it, I think it is a good
> >>>>> idea to
> >>>>> steer people away from it and deprecate it unless someone wants to
> >>>>> address
> >>>>> those concerns. Now, I think there are much better alternatives
> (thanks
> >>>>> Jacques!) so I probably wouldn't recommend spending time on bringing
> >>>>> this
> >>>>> up to date and making it marginally safer.
> >>>>>
> >>>>> On Tue, Sep 29, 2020 at 11:38 AM Julien Le Dem <
> julien.ledem@gmail.com
> >>>>> >
> >>>>> wrote:
> >>>>>
> >>>>> > Hi Jason,
> >>>>> > Thank you for bringing this up.
> >>>>> > A correctness issue would only come up when more parquet files are
> >>>>> > added to the same folder or files are modified. Historically
> folders
> >>>>> have
> >>>>> > been considered immutables and the summary file reflects the
> >>>>> metadata for
> >>>>> > all the files in the folder. The summary file contains the names of
> >>>>> the
> >>>>> > files it is for, so extra files in the folder can also be detected
> >>>>> and
> >>>>> > dealt with at read time without correctness issues.
> >>>>> > Like you mentioned the read path allows for those files to not be
> >>>>> present.
> >>>>> > I think a better solution than deprecating would be to have a
> switch
> >>>>> > allowing turning off those summary files when one wants to be able
> >>>>> to not
> >>>>> > respect the immutable folder contact.
> >>>>> > Projects like Iceberg can elect to not produce them and allow
> >>>>> modifying
> >>>>> > and adding more files to the same folder without creating
> correctness
> >>>>> > problems.
> >>>>> > I would be in favor of removing those Deprecated annotations and
> >>>>> document
> >>>>> > the use of a switch to optionally not produce the summary files
> when
> >>>>> > electing to modify folders.
> >>>>> > I'm curious to hear from Ryan about this who did the change in the
> >>>>> first
> >>>>> > place.
> >>>>> > Best,
> >>>>> > Julien
> >>>>> >
> >>>>> > On Fri, Sep 25, 2020 at 3:06 PM Jason Altekruse <
> >>>>> altekrusejason@gmail.com>
> >>>>> > wrote:
> >>>>> >
> >>>>> >> Hy Jacques,
> >>>>> >>
> >>>>> >> It's good to hear from you, thanks for the pointer to Iceberg. I
> am
> >>>>> aware
> >>>>> >> of it as well as other similar projects, including Delta Lake,
> >>>>> which my
> >>>>> >> team is already using. Unfortunately even with Delta, there is
> only
> >>>>> a
> >>>>> >> placeholder in the project currently where they will be tracking
> >>>>> file
> >>>>> >> level
> >>>>> >> statistics at some point in the future, we are also evaluating the
> >>>>> >> possibility of implementing this in delta itself. While it and
> >>>>> Iceberg
> >>>>> >> aren't quite the same architecturally, I think there is enough
> >>>>> overlap
> >>>>> >> that
> >>>>> >> it might be a bit awkward to use the two in conjunction with one
> >>>>> another.
> >>>>> >>
> >>>>> >> From my testing so far, it appears that delta pretty easily can
> >>>>> operate
> >>>>> >> alongside these older metadata summary files without the two
> >>>>> fighting with
> >>>>> >> each other. Delta is responsible for maintaining a transactionally
> >>>>> >> consistent list of files, and this file can coexist in the
> >>>>> directory just
> >>>>> >> to allow efficient pruning on the driver side on a best effort
> >>>>> basis, as
> >>>>> >> it
> >>>>> >> can gracefully fall back to the FS if it is missing a newer file.
> >>>>> >>
> >>>>> >> We are somewhat nervous about depending on something that is
> marked
> >>>>> >> deprecated, but as it is so close to a "just works" state for our
> >>>>> needs, I
> >>>>> >> was hoping to confirm with the community if there were other risks
> >>>>> I was
> >>>>> >> missing.
> >>>>> >>
> >>>>> >> Jason Altekruse
> >>>>> >>
> >>>>> >> On Wed, Sep 23, 2020 at 6:29 PM Jacques Nadeau <
> jacques@apache.org>
> >>>>> >> wrote:
> >>>>> >>
> >>>>> >> > Hey Jason,
> >>>>> >> >
> >>>>> >> > I'd suggest you look at Apache Iceberg. It is a much more mature
> >>>>> way of
> >>>>> >> > handling metadata efficiency issues and provides a substantial
> >>>>> superset
> >>>>> >> of
> >>>>> >> > functionality over the old metadata cache files.
> >>>>> >> >
> >>>>> >> > On Wed, Sep 23, 2020 at 4:16 PM Jason Altekruse <
> >>>>> >> altekrusejason@gmail.com>
> >>>>> >> > wrote:
> >>>>> >> >
> >>>>> >> > > Hello again,
> >>>>> >> > >
> >>>>> >> > > I took a look through the mail archives and found a little
> more
> >>>>> >> > information
> >>>>> >> > > in this and a few other threads.
> >>>>> >> > >
> >>>>> >> > >
> >>>>> >> > >
> >>>>> >> >
> >>>>> >>
> >>>>>
> http://mail-archives.apache.org/mod_mbox//parquet-dev/201707.mbox/%3CCAO4re1k8-bZZZWBRuLCnm1V7AoJE1fdunSuBn%2BecRuFGPgcXnA%40mail.gmail.com%3E
> >>>>> >> > >
> >>>>> >> > > While I do understand the benefits for federating out the
> >>>>> reading of
> >>>>> >> > > footers for the sake of not worrying about synchronization
> >>>>> between the
> >>>>> >> > > cached metadata and any changes to the files on disk, it does
> >>>>> appear
> >>>>> >> > there
> >>>>> >> > > is still a use case that isn't solved well with this design,
> >>>>> needle
> >>>>> >> in a
> >>>>> >> > > haystack selective filter queries, where the data is sorted by
> >>>>> the
> >>>>> >> filter
> >>>>> >> > > column. For example in the tests I ran with queries against
> >>>>> lots of
> >>>>> >> > parquet
> >>>>> >> > > files where the vast majority are pruned by a bunch of small
> >>>>> tasks, it
> >>>>> >> > > takes 33 seconds vs just 1-2 seconds with driver side pruning
> >>>>> using
> >>>>> >> the
> >>>>> >> > > summary file (requires a small spark changet).
> >>>>> >> > >
> >>>>> >> > > In our use case we are never going to be replacing contents of
> >>>>> >> existing
> >>>>> >> > > parquet files (with a delete and rewrite on HDFS) or appending
> >>>>> new row
> >>>>> >> > > groups onto existing files. In that case I don't believe we
> >>>>> should
> >>>>> >> > > experience any correctness problems, but I wanted to confirm
> if
> >>>>> there
> >>>>> >> is
> >>>>> >> > > something I am missing. I am
> >>>>> >> > > using readAllFootersInParallelUsingSummaryFiles which does
> fall
> >>>>> back
> >>>>> >> to
> >>>>> >> > > read individual footers if they are missing from the summary
> >>>>> file.
> >>>>> >> > >
> >>>>> >> > > I am also curious if a solution to the correctness problems
> >>>>> could be
> >>>>> >> to
> >>>>> >> > > include a file length and/or last modified time into the
> >>>>> summary file,
> >>>>> >> > > which could confirm against FS metadata that the files on disk
> >>>>> are
> >>>>> >> still
> >>>>> >> > in
> >>>>> >> > > sync with the metadata summary relatively quickly. Would it be
> >>>>> >> possible
> >>>>> >> > to
> >>>>> >> > > consider avoiding this deprecation if I was to work on an
> >>>>> update to
> >>>>> >> > > implement this?
> >>>>> >> > >
> >>>>> >> > > - Jason Altekruse
> >>>>> >> > >
> >>>>> >> > >
> >>>>> >> > > On Tue, Sep 15, 2020 at 8:52 PM Jason Altekruse <
> >>>>> >> > altekrusejason@gmail.com>
> >>>>> >> > > wrote:
> >>>>> >> > >
> >>>>> >> > > > Hello all,
> >>>>> >> > > >
> >>>>> >> > > > I have been working on optimizing reads in spark to avoid
> >>>>> spinning
> >>>>> >> up
> >>>>> >> > > lots
> >>>>> >> > > > of short lived tasks that just perform row group pruning in
> >>>>> >> selective
> >>>>> >> > > > filter queries.
> >>>>> >> > > >
> >>>>> >> > > > My high level question is why metadata summary files were
> >>>>> marked
> >>>>> >> > > > deprecated in this Parquet changeset? There isn't much
> >>>>> explanation
> >>>>> >> > given
> >>>>> >> > > > or a description of what should be used instead.
> >>>>> >> > > > https://github.com/apache/parquet-mr/pull/429
> >>>>> >> > > >
> >>>>> >> > > > There are other members of the broader parquet community
> that
> >>>>> are
> >>>>> >> also
> >>>>> >> > > > confused by this deprecation, see this discussion in an
> arrow
> >>>>> PR.
> >>>>> >> > > > https://github.com/apache/arrow/pull/4166
> >>>>> >> > > >
> >>>>> >> > > > In the course of making my small prototype I got an extra
> >>>>> >> performance
> >>>>> >> > > > boost by making spark write out metadata summary files,
> >>>>> rather than
> >>>>> >> > > having
> >>>>> >> > > > to read all footers on the driver. This effect would be even
> >>>>> more
> >>>>> >> > > > pronounced on a completely remote storage system like S3.
> >>>>> Writing
> >>>>> >> these
> >>>>> >> > > > summary files was disabled by default in SPARK-15719,
> because
> >>>>> of the
> >>>>> >> > > > performance impact of appending a small number of new files
> >>>>> to an
> >>>>> >> > > existing
> >>>>> >> > > > dataset with many files.
> >>>>> >> > > >
> >>>>> >> > > > https://issues.apache.org/jira/browse/SPARK-15719
> >>>>> >> > > >
> >>>>> >> > > > This spark JIRA does make decent points considering how
> spark
> >>>>> >> operates
> >>>>> >> > > > today, but I think that there is a performance optimization
> >>>>> >> opportunity
> >>>>> >> > > > that is missed because the row group pruning is deferred to
> a
> >>>>> bunch
> >>>>> >> of
> >>>>> >> > > > separate short lived tasks rather than done upfront,
> >>>>> currently spark
> >>>>> >> > only
> >>>>> >> > > > uses footers on the driver for schema merging.
> >>>>> >> > > >
> >>>>> >> > > > Thanks for the help!
> >>>>> >> > > > Jason Altekruse
> >>>>> >> > > >
> >>>>> >> > >
> >>>>> >> >
> >>>>> >>
> >>>>> >
> >>>>>
> >>>>> --
> >>>>> Ryan Blue
> >>>>> Software Engineer
> >>>>> Netflix
> >>>>>
> >>>>
> >>>
> >>> --
> >>> Ryan Blue
> >>> Software Engineer
> >>> Netflix
> >>>
> >>
> >
> > --
> > Ryan Blue
> > Software Engineer
> > Netflix
> >
>