You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rabin Banerjee <de...@gmail.com> on 2017/11/21 16:08:37 UTC

Re: Spark/Parquet/Statistics question

Spark is not adding any STAT meta in parquet files in Version 1.6.x.
Scanning all files for filter.

(1 to 300000).map(i => (i, i.toString)).toDF("a",
"b").sort("a").coalesce(1).write.format("parquet").saveAsTable("metrics")

./parquet-meta /user/hive/warehouse/metrics/*.parquet

file:
file:/user/hive/warehouse/metrics/part-r-00000-6552bc8f-ec05-4ce8-ad8d-dc22bdd2e502.gz.parquet


creator:     parquet-mr version 1.6.0

extra:       org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}},{"name":"b","type":"string","nullable":true,"metadata":{}}]}



file schema: spark_schema

--------------------------------------------------------------------------------------------------------------------------------------------

a:           OPTIONAL INT32 R:0 D:1

b:           OPTIONAL BINARY O:UTF8 R:0 D:1


row group 1: RC:300000 TS:4089139 OFFSET:4

--------------------------------------------------------------------------------------------------------------------------------------------

a:            INT32 GZIP DO:0 FPO:4 SZ:415087/1200095/2.89 VC:300000
ENC:BIT_PACKED,RLE,PLAIN

b:            BINARY GZIP DO:0 FPO:415091 SZ:667334/2889044/4.33 VC:300000
ENC:BIT_PACKED,RLE,PLAIN

On Tue, Jan 17, 2017 at 9:41 PM, Michael Segel <ms...@hotmail.com>
wrote:

> Hi,
> Lexicographically speaking, Min/Max should work because String(s)  support
> a comparator operator.  So anything which supports an equality test (<,>,
> <= , >= , == …) can also support min and max functions as well.
>
> I guess the question is if Spark does support this, and if not, why?
> Yes, it makes sense.
>
>
>
> > On Jan 17, 2017, at 9:17 AM, Jörn Franke <jo...@gmail.com> wrote:
> >
> > Hallo,
> >
> > I am not sure what you mean by min/max for strings. I do not know if
> this makes sense. What the ORC format has is bloom filters for strings etc.
> - are you referring to this?
> >
> > In order to apply min/max filters Spark needs to read the meta data of
> the file. If the filter is applied or not - this you can see from the
> number of bytes read.
> >
> >
> > Best regards
> >
> >> On 17 Jan 2017, at 15:28, djiang <dj...@dataxu.com> wrote:
> >>
> >> Hi,
> >>
> >> I have been looking into how Spark stores statistics (min/max) in
> Parquet as
> >> well as how it uses the info for query optimization.
> >> I have got a few questions.
> >> First setup: Spark 2.1.0, the following sets up a Dataframe of 1000
> rows,
> >> with a long type and a string type column.
> >> They are sorted by different columns, though.
> >>
> >> scala> spark.sql("select id, cast(id as string) text from
> >> range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
> >> scala> spark.sql("select id, cast(id as string) text from
> >> range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
> >>
> >> I added some code to parquet-tools to print out stats and examine the
> >> generated parquet files:
> >>
> >> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> >> /secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-
> d7a5a06e4425.snappy.parquet
> >> file:
> >> file:/secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-
> b5c3-d7a5a06e4425.snappy.parquet
> >> creator:     parquet-mr version 1.8.1 (build
> >> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
> >> extra:       org.apache.spark.sql.parquet.row.metadata =
> >> {"type":"struct","fields":[{"name":"id","type":"long","
> nullable":false,"metadata":{}},{"name":"text","type":"
> string","nullable":false,"metadata":{}}]}
> >>
> >> file schema: spark_schema
> >> ------------------------------------------------------------
> --------------------
> >> id:          REQUIRED INT64 R:0 D:0
> >> text:        REQUIRED BINARY O:UTF8 R:0 D:0
> >>
> >> row group 1: RC:5 TS:133 OFFSET:4
> >> ------------------------------------------------------------
> --------------------
> >> id:           INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> >> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
> >> text:         BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
> >> ENC:PLAIN,BIT_PACKED
> >>
> >> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> >> /secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-
> b8a6-d67cc38a2bde.snappy.parquet
> >> file:
> >> file:/secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-b8a6-
> d67cc38a2bde.snappy.parquet
> >> creator:     parquet-mr version 1.8.1 (build
> >> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
> >> extra:       org.apache.spark.sql.parquet.row.metadata =
> >> {"type":"struct","fields":[{"name":"id","type":"long","
> nullable":false,"metadata":{}},{"name":"text","type":"
> string","nullable":false,"metadata":{}}]}
> >>
> >> file schema: spark_schema
> >> ------------------------------------------------------------
> --------------------
> >> id:          REQUIRED INT64 R:0 D:0
> >> text:        REQUIRED BINARY O:UTF8 R:0 D:0
> >>
> >> row group 1: RC:5 TS:140 OFFSET:4
> >> ------------------------------------------------------------
> --------------------
> >> id:           INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> >> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
> >> text:         BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5
> >> ENC:PLAIN,BIT_PACKED
> >>
> >> So the question is why is Spark, particularly, 2.1.0, only generate
> min/max
> >> for numeric columns, but not strings(BINARY) fields, even if the string
> >> field is included in the sort? Maybe I missed a configuraiton?
> >>
> >> The second issue, is how can I confirm Spark is utilizing the min/max?
> >> scala> sc.setLogLevel("INFO")
> >> scala> spark.sql("select * from parquet.`/secret/spark21-sortById`
> where
> >> id=4").show
> >> I got many lines like this:
> >> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
> >> and(noteq(id, null), eq(id, 4))
> >> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
> >> file:///secret/spark21-sortById/part-00000-39f7ac12-
> 6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
> >> range: 0-558, partition values: [empty row]
> >> ...
> >> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
> >> and(noteq(id, null), eq(id, 4))
> >> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
> >> file:///secret/spark21-sortById/part-00193-39f7ac12-
> 6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
> >> range: 0-574, partition values: [empty row]
> >> ...
> >>
> >> The question is it looks like Spark is scanning every file, even if
> from the
> >> min/max, Spark should be able to determine only part-00000 has the
> relevant
> >> data. Or maybe I read it wrong, that Spark is skipping the files? Maybe
> >> Spark can only use partition value for data skipping?
> >>
> >> Thanks,
> >>
> >> Dong
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Parquet-Statistics-question-tp28312.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >>
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >
>
>