You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Zheng, Xudong" <do...@gmail.com> on 2015/03/31 08:47:51 UTC

Parquet Hive table become very slow on 1.3?

Hi all,

We are using Parquet Hive table, and we are upgrading to Spark 1.3. But we
find that, just a simple COUNT(*) query will much slower (100x) than Spark
1.2.

I find the most time spent on driver to get HDFS blocks. I find large
amount of get below logs printed:

15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
  fileLength=77153436
  underConstruction=false
  blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
getBlockSize()=77153436; corrupt=false; offset=0;
locs=[10.152.116.172:50010, 10.152.116.169:50010,
10.153.125.184:50010]}]
  lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
getBlockSize()=77153436; corrupt=false; offset=0;
locs=[10.152.116.169:50010, 10.153.125.184:50010,
10.152.116.172:50010]}
  isLastBlockComplete=true}
15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010


I compare the printed log with Spark 1.2, although the number of
getBlockLocations call is similar, but each such operation only cost 20~30
ms (but it is 2000ms~3000ms now), and it didn't print the detailed
LocatedBlocks info.

Another finding is, if I read the Parquet file via scala code form
spark-shell as below, it looks fine, the computation will return the result
quick as before.

sqlContext.parquetFile("data/myparquettable")


Any idea about it? Thank you!


-- 
郑旭东
Zheng, Xudong

Re: Parquet Hive table become very slow on 1.3?

Posted by Rex Xiong <by...@gmail.com>.
Yin,

Thanks for your reply.
We already patched this PR to our 1.3.0
As Xudong mentioned, we have thousand of parquet files, it's very very slow
in first read, and another app will add more files and refresh table
regularly.
Cheng Lian's PR-5334 seems can resolve this issue, it will skip read all
footer if we set auto merge to false.
But it's not done yet.

Thanks

2015-04-22 23:10 GMT+08:00 Yin Huai <yh...@databricks.com>:

> Xudong and Rex,
>
> Can you try 1.3.1? With PR 5339 <http://github.com/apache/spark/pull/5339> ,
> after we get a hive parquet from metastore and convert it to our native
> parquet code path, we will cache the converted relation. For now, the first
> access to that hive parquet table reads all of the footers (when you first
> refer to that table in a query or call
> sqlContext.table("hiveParquetTable")). All of your later accesses will hit
> the metadata cache.
>
> Thanks,
>
> Yin
>
> On Tue, Apr 21, 2015 at 1:13 AM, Rex Xiong <by...@gmail.com> wrote:
>
>> We have the similar issue with massive parquet files, Cheng Lian, could
>> you have a look?
>>
>> 2015-04-08 15:47 GMT+08:00 Zheng, Xudong <do...@gmail.com>:
>>
>>> Hi Cheng,
>>>
>>> I tried both these patches, and seems still not resolve my issue. And I
>>> found the most time is spend on this line in newParquet.scala:
>>>
>>> ParquetFileReader.readAllFootersInParallel(
>>>   sparkContext.hadoopConfiguration, seqAsJavaList(leaves),
>>> taskSideMetaData)
>>>
>>> Which need read all the files under the Parquet folder, while our
>>> Parquet folder has a lot of Parquet files (near 2000), read one file need
>>> about 2 seconds, so it become very slow ... And the PR 5231 did not skip
>>> this steps so it not resolve my issue.
>>>
>>> As our Parquet files are generated by a Spark job, so the number of
>>> .parquet files is same with the number of tasks, that is why we have so
>>> many files. But these files actually have the same schema. Is there any way
>>> to merge these files into one, or avoid scan each of them?
>>>
>>> On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian <li...@gmail.com>
>>> wrote:
>>>
>>>>  Hey Xudong,
>>>>
>>>> We had been digging this issue for a while, and believe PR 5339
>>>> <http://github.com/apache/spark/pull/5339> and PR 5334
>>>> <http://github.com/apache/spark/pull/5339> should fix this issue.
>>>>
>>>> There two problems:
>>>>
>>>> 1. Normally we cache Parquet table metadata for better performance, but
>>>> when converting Hive metastore Hive tables, the cache is not used. Thus
>>>> heavy operations like schema discovery is done every time a metastore
>>>> Parquet table is converted.
>>>> 2. With Parquet task side metadata reading (which is turned on by
>>>> default), we can actually skip the row group information in the footer.
>>>> However, we accidentally called a Parquet function which doesn't skip row
>>>> group information.
>>>>
>>>> For your question about schema merging, Parquet allows different
>>>> part-files have different but compatible schemas. For example,
>>>> part-00001.parquet has columns a and b, while part-00002.parquet may has
>>>> columns a and c. In some cases, the summary files (_metadata and
>>>> _common_metadata) contains the merged schema (a, b, and c), but it's not
>>>> guaranteed. For example, when the user defined metadata stored different
>>>> part-files contain different values for the same key, Parquet simply gives
>>>> up writing summary files. That's why all part-files must be touched to get
>>>> a precise merged schema.
>>>>
>>>> However, in scenarios where a centralized arbitrative schema is
>>>> available (e.g. Hive metastore schema, or the schema provided by user via
>>>> data source DDL), we don't need to do schema merging on driver side, but
>>>> defer it to executor side and each task only needs to reconcile those
>>>> part-files it needs to touch. This is also what the Parquet developers did
>>>> recently for parquet-hadoop
>>>> <https://github.com/apache/incubator-parquet-mr/pull/45>.
>>>>
>>>> Cheng
>>>>
>>>>
>>>> On 3/31/15 11:49 PM, Zheng, Xudong wrote:
>>>>
>>>> Thanks Cheng!
>>>>
>>>>  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
>>>> but the PR 5231 seems not. Not sure any other things I did wrong ...
>>>>
>>>>  BTW, actually, we are very interested in the schema merging feature
>>>> in Spark 1.3, so both these two solution will disable this feature, right?
>>>> It seems that Parquet metadata is store in a file named _metadata in the
>>>> Parquet file folder (each folder is a partition as we use partition table),
>>>> why we need scan all Parquet part files? Is there any other solutions could
>>>> keep schema merging feature at the same time? We are really like this
>>>> feature :)
>>>>
>>>> On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian <li...@gmail.com>
>>>> wrote:
>>>>
>>>>>  Hi Xudong,
>>>>>
>>>>> This is probably because of Parquet schema merging is turned on by
>>>>> default. This is generally useful for Parquet files with different but
>>>>> compatible schemas. But it needs to read metadata from all Parquet
>>>>> part-files. This can be problematic when reading Parquet files with lots of
>>>>> part-files, especially when the user doesn't need schema merging.
>>>>>
>>>>> This issue is tracked by SPARK-6575, and here is a PR for it:
>>>>> https://github.com/apache/spark/pull/5231. This PR adds a
>>>>> configuration to disable schema merging by default when doing Hive
>>>>> metastore Parquet table conversion.
>>>>>
>>>>> Another workaround is to fallback to the old Parquet code by setting
>>>>> spark.sql.parquet.useDataSourceApi to false.
>>>>>
>>>>> Cheng
>>>>>
>>>>>
>>>>> On 3/31/15 2:47 PM, Zheng, Xudong wrote:
>>>>>
>>>>> Hi all,
>>>>>
>>>>>  We are using Parquet Hive table, and we are upgrading to Spark 1.3.
>>>>> But we find that, just a simple COUNT(*) query will much slower (100x) than
>>>>> Spark 1.2.
>>>>>
>>>>>  I find the most time spent on driver to get HDFS blocks. I find
>>>>> large amount of get below logs printed:
>>>>>
>>>>>  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
>>>>> 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
>>>>>   fileLength=77153436
>>>>>   underConstruction=false
>>>>>   blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
>>>>>   lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.169:50010, 10.153.125.184:50010, 10.152.116.172:50010]}
>>>>>   isLastBlockComplete=true}
>>>>> 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010
>>>>>
>>>>>
>>>>>  I compare the printed log with Spark 1.2, although the number of
>>>>> getBlockLocations call is similar, but each such operation only cost 20~30
>>>>> ms (but it is 2000ms~3000ms now), and it didn't print the detailed
>>>>> LocatedBlocks info.
>>>>>
>>>>>  Another finding is, if I read the Parquet file via scala code form
>>>>> spark-shell as below, it looks fine, the computation will return the result
>>>>> quick as before.
>>>>>
>>>>>  sqlContext.parquetFile("data/myparquettable")
>>>>>
>>>>>
>>>>>  Any idea about it? Thank you!
>>>>>
>>>>>
>>>>>  --
>>>>>   郑旭东
>>>>> Zheng, Xudong
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>  --
>>>>   郑旭东
>>>> Zheng, Xudong
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> 郑旭东
>>> Zheng, Xudong
>>>
>>>
>>
>

Re: Parquet Hive table become very slow on 1.3?

Posted by Yin Huai <yh...@databricks.com>.
Xudong and Rex,

Can you try 1.3.1? With PR 5339 <http://github.com/apache/spark/pull/5339> ,
after we get a hive parquet from metastore and convert it to our native
parquet code path, we will cache the converted relation. For now, the first
access to that hive parquet table reads all of the footers (when you first
refer to that table in a query or call
sqlContext.table("hiveParquetTable")). All of your later accesses will hit
the metadata cache.

Thanks,

Yin

On Tue, Apr 21, 2015 at 1:13 AM, Rex Xiong <by...@gmail.com> wrote:

> We have the similar issue with massive parquet files, Cheng Lian, could
> you have a look?
>
> 2015-04-08 15:47 GMT+08:00 Zheng, Xudong <do...@gmail.com>:
>
>> Hi Cheng,
>>
>> I tried both these patches, and seems still not resolve my issue. And I
>> found the most time is spend on this line in newParquet.scala:
>>
>> ParquetFileReader.readAllFootersInParallel(
>>   sparkContext.hadoopConfiguration, seqAsJavaList(leaves),
>> taskSideMetaData)
>>
>> Which need read all the files under the Parquet folder, while our Parquet
>> folder has a lot of Parquet files (near 2000), read one file need about 2
>> seconds, so it become very slow ... And the PR 5231 did not skip this steps
>> so it not resolve my issue.
>>
>> As our Parquet files are generated by a Spark job, so the number of
>> .parquet files is same with the number of tasks, that is why we have so
>> many files. But these files actually have the same schema. Is there any way
>> to merge these files into one, or avoid scan each of them?
>>
>> On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian <li...@gmail.com> wrote:
>>
>>>  Hey Xudong,
>>>
>>> We had been digging this issue for a while, and believe PR 5339
>>> <http://github.com/apache/spark/pull/5339> and PR 5334
>>> <http://github.com/apache/spark/pull/5339> should fix this issue.
>>>
>>> There two problems:
>>>
>>> 1. Normally we cache Parquet table metadata for better performance, but
>>> when converting Hive metastore Hive tables, the cache is not used. Thus
>>> heavy operations like schema discovery is done every time a metastore
>>> Parquet table is converted.
>>> 2. With Parquet task side metadata reading (which is turned on by
>>> default), we can actually skip the row group information in the footer.
>>> However, we accidentally called a Parquet function which doesn't skip row
>>> group information.
>>>
>>> For your question about schema merging, Parquet allows different
>>> part-files have different but compatible schemas. For example,
>>> part-00001.parquet has columns a and b, while part-00002.parquet may has
>>> columns a and c. In some cases, the summary files (_metadata and
>>> _common_metadata) contains the merged schema (a, b, and c), but it's not
>>> guaranteed. For example, when the user defined metadata stored different
>>> part-files contain different values for the same key, Parquet simply gives
>>> up writing summary files. That's why all part-files must be touched to get
>>> a precise merged schema.
>>>
>>> However, in scenarios where a centralized arbitrative schema is
>>> available (e.g. Hive metastore schema, or the schema provided by user via
>>> data source DDL), we don't need to do schema merging on driver side, but
>>> defer it to executor side and each task only needs to reconcile those
>>> part-files it needs to touch. This is also what the Parquet developers did
>>> recently for parquet-hadoop
>>> <https://github.com/apache/incubator-parquet-mr/pull/45>.
>>>
>>> Cheng
>>>
>>>
>>> On 3/31/15 11:49 PM, Zheng, Xudong wrote:
>>>
>>> Thanks Cheng!
>>>
>>>  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
>>> but the PR 5231 seems not. Not sure any other things I did wrong ...
>>>
>>>  BTW, actually, we are very interested in the schema merging feature in
>>> Spark 1.3, so both these two solution will disable this feature, right? It
>>> seems that Parquet metadata is store in a file named _metadata in the
>>> Parquet file folder (each folder is a partition as we use partition table),
>>> why we need scan all Parquet part files? Is there any other solutions could
>>> keep schema merging feature at the same time? We are really like this
>>> feature :)
>>>
>>> On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian <li...@gmail.com>
>>> wrote:
>>>
>>>>  Hi Xudong,
>>>>
>>>> This is probably because of Parquet schema merging is turned on by
>>>> default. This is generally useful for Parquet files with different but
>>>> compatible schemas. But it needs to read metadata from all Parquet
>>>> part-files. This can be problematic when reading Parquet files with lots of
>>>> part-files, especially when the user doesn't need schema merging.
>>>>
>>>> This issue is tracked by SPARK-6575, and here is a PR for it:
>>>> https://github.com/apache/spark/pull/5231. This PR adds a
>>>> configuration to disable schema merging by default when doing Hive
>>>> metastore Parquet table conversion.
>>>>
>>>> Another workaround is to fallback to the old Parquet code by setting
>>>> spark.sql.parquet.useDataSourceApi to false.
>>>>
>>>> Cheng
>>>>
>>>>
>>>> On 3/31/15 2:47 PM, Zheng, Xudong wrote:
>>>>
>>>> Hi all,
>>>>
>>>>  We are using Parquet Hive table, and we are upgrading to Spark 1.3.
>>>> But we find that, just a simple COUNT(*) query will much slower (100x) than
>>>> Spark 1.2.
>>>>
>>>>  I find the most time spent on driver to get HDFS blocks. I find large
>>>> amount of get below logs printed:
>>>>
>>>>  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
>>>> 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
>>>>   fileLength=77153436
>>>>   underConstruction=false
>>>>   blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
>>>>   lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.169:50010, 10.153.125.184:50010, 10.152.116.172:50010]}
>>>>   isLastBlockComplete=true}
>>>> 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010
>>>>
>>>>
>>>>  I compare the printed log with Spark 1.2, although the number of
>>>> getBlockLocations call is similar, but each such operation only cost 20~30
>>>> ms (but it is 2000ms~3000ms now), and it didn't print the detailed
>>>> LocatedBlocks info.
>>>>
>>>>  Another finding is, if I read the Parquet file via scala code form
>>>> spark-shell as below, it looks fine, the computation will return the result
>>>> quick as before.
>>>>
>>>>  sqlContext.parquetFile("data/myparquettable")
>>>>
>>>>
>>>>  Any idea about it? Thank you!
>>>>
>>>>
>>>>  --
>>>>   郑旭东
>>>> Zheng, Xudong
>>>>
>>>>
>>>>
>>>
>>>
>>>  --
>>>   郑旭东
>>> Zheng, Xudong
>>>
>>>
>>>
>>
>>
>> --
>> 郑旭东
>> Zheng, Xudong
>>
>>
>

Re: Parquet Hive table become very slow on 1.3?

Posted by Rex Xiong <by...@gmail.com>.
We have the similar issue with massive parquet files, Cheng Lian, could you
have a look?

2015-04-08 15:47 GMT+08:00 Zheng, Xudong <do...@gmail.com>:

> Hi Cheng,
>
> I tried both these patches, and seems still not resolve my issue. And I
> found the most time is spend on this line in newParquet.scala:
>
> ParquetFileReader.readAllFootersInParallel(
>   sparkContext.hadoopConfiguration, seqAsJavaList(leaves),
> taskSideMetaData)
>
> Which need read all the files under the Parquet folder, while our Parquet
> folder has a lot of Parquet files (near 2000), read one file need about 2
> seconds, so it become very slow ... And the PR 5231 did not skip this steps
> so it not resolve my issue.
>
> As our Parquet files are generated by a Spark job, so the number of
> .parquet files is same with the number of tasks, that is why we have so
> many files. But these files actually have the same schema. Is there any way
> to merge these files into one, or avoid scan each of them?
>
> On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian <li...@gmail.com> wrote:
>
>>  Hey Xudong,
>>
>> We had been digging this issue for a while, and believe PR 5339
>> <http://github.com/apache/spark/pull/5339> and PR 5334
>> <http://github.com/apache/spark/pull/5339> should fix this issue.
>>
>> There two problems:
>>
>> 1. Normally we cache Parquet table metadata for better performance, but
>> when converting Hive metastore Hive tables, the cache is not used. Thus
>> heavy operations like schema discovery is done every time a metastore
>> Parquet table is converted.
>> 2. With Parquet task side metadata reading (which is turned on by
>> default), we can actually skip the row group information in the footer.
>> However, we accidentally called a Parquet function which doesn't skip row
>> group information.
>>
>> For your question about schema merging, Parquet allows different
>> part-files have different but compatible schemas. For example,
>> part-00001.parquet has columns a and b, while part-00002.parquet may has
>> columns a and c. In some cases, the summary files (_metadata and
>> _common_metadata) contains the merged schema (a, b, and c), but it's not
>> guaranteed. For example, when the user defined metadata stored different
>> part-files contain different values for the same key, Parquet simply gives
>> up writing summary files. That's why all part-files must be touched to get
>> a precise merged schema.
>>
>> However, in scenarios where a centralized arbitrative schema is available
>> (e.g. Hive metastore schema, or the schema provided by user via data source
>> DDL), we don't need to do schema merging on driver side, but defer it to
>> executor side and each task only needs to reconcile those part-files it
>> needs to touch. This is also what the Parquet developers did recently for
>> parquet-hadoop <https://github.com/apache/incubator-parquet-mr/pull/45>.
>>
>> Cheng
>>
>>
>> On 3/31/15 11:49 PM, Zheng, Xudong wrote:
>>
>> Thanks Cheng!
>>
>>  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
>> but the PR 5231 seems not. Not sure any other things I did wrong ...
>>
>>  BTW, actually, we are very interested in the schema merging feature in
>> Spark 1.3, so both these two solution will disable this feature, right? It
>> seems that Parquet metadata is store in a file named _metadata in the
>> Parquet file folder (each folder is a partition as we use partition table),
>> why we need scan all Parquet part files? Is there any other solutions could
>> keep schema merging feature at the same time? We are really like this
>> feature :)
>>
>> On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian <li...@gmail.com>
>> wrote:
>>
>>>  Hi Xudong,
>>>
>>> This is probably because of Parquet schema merging is turned on by
>>> default. This is generally useful for Parquet files with different but
>>> compatible schemas. But it needs to read metadata from all Parquet
>>> part-files. This can be problematic when reading Parquet files with lots of
>>> part-files, especially when the user doesn't need schema merging.
>>>
>>> This issue is tracked by SPARK-6575, and here is a PR for it:
>>> https://github.com/apache/spark/pull/5231. This PR adds a configuration
>>> to disable schema merging by default when doing Hive metastore Parquet
>>> table conversion.
>>>
>>> Another workaround is to fallback to the old Parquet code by setting
>>> spark.sql.parquet.useDataSourceApi to false.
>>>
>>> Cheng
>>>
>>>
>>> On 3/31/15 2:47 PM, Zheng, Xudong wrote:
>>>
>>> Hi all,
>>>
>>>  We are using Parquet Hive table, and we are upgrading to Spark 1.3.
>>> But we find that, just a simple COUNT(*) query will much slower (100x) than
>>> Spark 1.2.
>>>
>>>  I find the most time spent on driver to get HDFS blocks. I find large
>>> amount of get below logs printed:
>>>
>>>  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
>>> 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
>>>   fileLength=77153436
>>>   underConstruction=false
>>>   blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
>>>   lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.169:50010, 10.153.125.184:50010, 10.152.116.172:50010]}
>>>   isLastBlockComplete=true}
>>> 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010
>>>
>>>
>>>  I compare the printed log with Spark 1.2, although the number of
>>> getBlockLocations call is similar, but each such operation only cost 20~30
>>> ms (but it is 2000ms~3000ms now), and it didn't print the detailed
>>> LocatedBlocks info.
>>>
>>>  Another finding is, if I read the Parquet file via scala code form
>>> spark-shell as below, it looks fine, the computation will return the result
>>> quick as before.
>>>
>>>  sqlContext.parquetFile("data/myparquettable")
>>>
>>>
>>>  Any idea about it? Thank you!
>>>
>>>
>>>  --
>>>   郑旭东
>>> Zheng, Xudong
>>>
>>>
>>>
>>
>>
>>  --
>>   郑旭东
>> Zheng, Xudong
>>
>>
>>
>
>
> --
> 郑旭东
> Zheng, Xudong
>
>

Re: Parquet Hive table become very slow on 1.3?

Posted by "Zheng, Xudong" <do...@gmail.com>.
Hi Cheng,

I tried both these patches, and seems still not resolve my issue. And I
found the most time is spend on this line in newParquet.scala:

ParquetFileReader.readAllFootersInParallel(
  sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData)

Which need read all the files under the Parquet folder, while our Parquet
folder has a lot of Parquet files (near 2000), read one file need about 2
seconds, so it become very slow ... And the PR 5231 did not skip this steps
so it not resolve my issue.

As our Parquet files are generated by a Spark job, so the number of
.parquet files is same with the number of tasks, that is why we have so
many files. But these files actually have the same schema. Is there any way
to merge these files into one, or avoid scan each of them?

On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian <li...@gmail.com> wrote:

>  Hey Xudong,
>
> We had been digging this issue for a while, and believe PR 5339
> <http://github.com/apache/spark/pull/5339> and PR 5334
> <http://github.com/apache/spark/pull/5339> should fix this issue.
>
> There two problems:
>
> 1. Normally we cache Parquet table metadata for better performance, but
> when converting Hive metastore Hive tables, the cache is not used. Thus
> heavy operations like schema discovery is done every time a metastore
> Parquet table is converted.
> 2. With Parquet task side metadata reading (which is turned on by
> default), we can actually skip the row group information in the footer.
> However, we accidentally called a Parquet function which doesn't skip row
> group information.
>
> For your question about schema merging, Parquet allows different
> part-files have different but compatible schemas. For example,
> part-00001.parquet has columns a and b, while part-00002.parquet may has
> columns a and c. In some cases, the summary files (_metadata and
> _common_metadata) contains the merged schema (a, b, and c), but it's not
> guaranteed. For example, when the user defined metadata stored different
> part-files contain different values for the same key, Parquet simply gives
> up writing summary files. That's why all part-files must be touched to get
> a precise merged schema.
>
> However, in scenarios where a centralized arbitrative schema is available
> (e.g. Hive metastore schema, or the schema provided by user via data source
> DDL), we don't need to do schema merging on driver side, but defer it to
> executor side and each task only needs to reconcile those part-files it
> needs to touch. This is also what the Parquet developers did recently for
> parquet-hadoop <https://github.com/apache/incubator-parquet-mr/pull/45>.
>
> Cheng
>
>
> On 3/31/15 11:49 PM, Zheng, Xudong wrote:
>
> Thanks Cheng!
>
>  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
> but the PR 5231 seems not. Not sure any other things I did wrong ...
>
>  BTW, actually, we are very interested in the schema merging feature in
> Spark 1.3, so both these two solution will disable this feature, right? It
> seems that Parquet metadata is store in a file named _metadata in the
> Parquet file folder (each folder is a partition as we use partition table),
> why we need scan all Parquet part files? Is there any other solutions could
> keep schema merging feature at the same time? We are really like this
> feature :)
>
> On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian <li...@gmail.com> wrote:
>
>>  Hi Xudong,
>>
>> This is probably because of Parquet schema merging is turned on by
>> default. This is generally useful for Parquet files with different but
>> compatible schemas. But it needs to read metadata from all Parquet
>> part-files. This can be problematic when reading Parquet files with lots of
>> part-files, especially when the user doesn't need schema merging.
>>
>> This issue is tracked by SPARK-6575, and here is a PR for it:
>> https://github.com/apache/spark/pull/5231. This PR adds a configuration
>> to disable schema merging by default when doing Hive metastore Parquet
>> table conversion.
>>
>> Another workaround is to fallback to the old Parquet code by setting
>> spark.sql.parquet.useDataSourceApi to false.
>>
>> Cheng
>>
>>
>> On 3/31/15 2:47 PM, Zheng, Xudong wrote:
>>
>> Hi all,
>>
>>  We are using Parquet Hive table, and we are upgrading to Spark 1.3. But
>> we find that, just a simple COUNT(*) query will much slower (100x) than
>> Spark 1.2.
>>
>>  I find the most time spent on driver to get HDFS blocks. I find large
>> amount of get below logs printed:
>>
>>  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
>> 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
>>   fileLength=77153436
>>   underConstruction=false
>>   blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
>>   lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.169:50010, 10.153.125.184:50010, 10.152.116.172:50010]}
>>   isLastBlockComplete=true}
>> 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010
>>
>>
>>  I compare the printed log with Spark 1.2, although the number of
>> getBlockLocations call is similar, but each such operation only cost 20~30
>> ms (but it is 2000ms~3000ms now), and it didn't print the detailed
>> LocatedBlocks info.
>>
>>  Another finding is, if I read the Parquet file via scala code form
>> spark-shell as below, it looks fine, the computation will return the result
>> quick as before.
>>
>>  sqlContext.parquetFile("data/myparquettable")
>>
>>
>>  Any idea about it? Thank you!
>>
>>
>>  --
>>   郑旭东
>> Zheng, Xudong
>>
>>
>>
>
>
>  --
>   郑旭东
> Zheng, Xudong
>
>
>


-- 
郑旭东
Zheng, Xudong

Re: Parquet Hive table become very slow on 1.3?

Posted by Cheng Lian <li...@gmail.com>.
Hey Xudong,

We had been digging this issue for a while, and believe PR 5339 
<http://github.com/apache/spark/pull/5339> and PR 5334 
<http://github.com/apache/spark/pull/5339> should fix this issue.

There two problems:

1. Normally we cache Parquet table metadata for better performance, but 
when converting Hive metastore Hive tables, the cache is not used. Thus 
heavy operations like schema discovery is done every time a metastore 
Parquet table is converted.
2. With Parquet task side metadata reading (which is turned on by 
default), we can actually skip the row group information in the footer. 
However, we accidentally called a Parquet function which doesn't skip 
row group information.

For your question about schema merging, Parquet allows different 
part-files have different but compatible schemas. For example, 
part-00001.parquet has columns a and b, while part-00002.parquet may has 
columns a and c. In some cases, the summary files (_metadata and 
_common_metadata) contains the merged schema (a, b, and c), but it's not 
guaranteed. For example, when the user defined metadata stored different 
part-files contain different values for the same key, Parquet simply 
gives up writing summary files. That's why all part-files must be 
touched to get a precise merged schema.

However, in scenarios where a centralized arbitrative schema is 
available (e.g. Hive metastore schema, or the schema provided by user 
via data source DDL), we don't need to do schema merging on driver side, 
but defer it to executor side and each task only needs to reconcile 
those part-files it needs to touch. This is also what the Parquet 
developers did recently for parquet-hadoop 
<https://github.com/apache/incubator-parquet-mr/pull/45>.

Cheng

On 3/31/15 11:49 PM, Zheng, Xudong wrote:
> Thanks Cheng!
>
> Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, 
> but the PR 5231 seems not. Not sure any other things I did wrong ...
>
> BTW, actually, we are very interested in the schema merging feature in 
> Spark 1.3, so both these two solution will disable this feature, 
> right? It seems that Parquet metadata is store in a file named 
> _metadata in the Parquet file folder (each folder is a partition as we 
> use partition table), why we need scan all Parquet part files? Is 
> there any other solutions could keep schema merging feature at the 
> same time? We are really like this feature :)
>
> On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian <lian.cs.zju@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi Xudong,
>
>     This is probably because of Parquet schema merging is turned on by
>     default. This is generally useful for Parquet files with different
>     but compatible schemas. But it needs to read metadata from all
>     Parquet part-files. This can be problematic when reading Parquet
>     files with lots of part-files, especially when the user doesn't
>     need schema merging.
>
>     This issue is tracked by SPARK-6575, and here is a PR for it:
>     https://github.com/apache/spark/pull/5231. This PR adds a
>     configuration to disable schema merging by default when doing Hive
>     metastore Parquet table conversion.
>
>     Another workaround is to fallback to the old Parquet code by
>     setting spark.sql.parquet.useDataSourceApi to false.
>
>     Cheng
>
>
>     On 3/31/15 2:47 PM, Zheng, Xudong wrote:
>>     Hi all,
>>
>>     We are using Parquet Hive table, and we are upgrading to Spark
>>     1.3. But we find that, just a simple COUNT(*) query will much
>>     slower (100x) than Spark 1.2.
>>
>>     I find the most time spent on driver to get HDFS blocks. I find
>>     large amount of get below logs printed:
>>
>>     15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
>>     15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
>>        fileLength=77153436
>>        underConstruction=false
>>        blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010  <http://10.152.116.172:50010>,10.152.116.169:50010  <http://10.152.116.169:50010>, 10.153.125.184:50010]}]
>>        lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948  <tel:1075187948>_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.169:50010  <http://10.152.116.169:50010>,10.153.125.184:50010  <http://10.153.125.184:50010>,10.152.116.172:50010  <http://10.152.116.172:50010>]}
>>        isLastBlockComplete=true}
>>     15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode10.152.116.172:50010  <http://10.152.116.172:50010>
>>
>>     I compare the printed log with Spark 1.2, although the number of
>>     getBlockLocations call is similar, but each such operation only
>>     cost 20~30 ms (but it is 2000ms~3000ms now), and it didn't print
>>     the detailed LocatedBlocks info.
>>
>>     Another finding is, if I read the Parquet file via scala code
>>     form spark-shell as below, it looks fine, the computation will
>>     return the result quick as before.
>>
>>     |sqlContext.parquetFile("data/myparquettable")|
>>
>>     Any idea about it? Thank you!
>>
>>
>>     -- 
>>     郑旭东
>>     Zheng, Xudong
>>
>
>
>
>
> -- 
> 郑旭东
> Zheng, Xudong
>


Re: Parquet Hive table become very slow on 1.3?

Posted by "Zheng, Xudong" <do...@gmail.com>.
Thanks Cheng!

Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, but
the PR 5231 seems not. Not sure any other things I did wrong ...

BTW, actually, we are very interested in the schema merging feature in
Spark 1.3, so both these two solution will disable this feature, right? It
seems that Parquet metadata is store in a file named _metadata in the
Parquet file folder (each folder is a partition as we use partition table),
why we need scan all Parquet part files? Is there any other solutions could
keep schema merging feature at the same time? We are really like this
feature :)

On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian <li...@gmail.com> wrote:

>  Hi Xudong,
>
> This is probably because of Parquet schema merging is turned on by
> default. This is generally useful for Parquet files with different but
> compatible schemas. But it needs to read metadata from all Parquet
> part-files. This can be problematic when reading Parquet files with lots of
> part-files, especially when the user doesn't need schema merging.
>
> This issue is tracked by SPARK-6575, and here is a PR for it:
> https://github.com/apache/spark/pull/5231. This PR adds a configuration
> to disable schema merging by default when doing Hive metastore Parquet
> table conversion.
>
> Another workaround is to fallback to the old Parquet code by setting
> spark.sql.parquet.useDataSourceApi to false.
>
> Cheng
>
>
> On 3/31/15 2:47 PM, Zheng, Xudong wrote:
>
> Hi all,
>
>  We are using Parquet Hive table, and we are upgrading to Spark 1.3. But
> we find that, just a simple COUNT(*) query will much slower (100x) than
> Spark 1.2.
>
>  I find the most time spent on driver to get HDFS blocks. I find large
> amount of get below logs printed:
>
>  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
> 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
>   fileLength=77153436
>   underConstruction=false
>   blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
>   lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.169:50010, 10.153.125.184:50010, 10.152.116.172:50010]}
>   isLastBlockComplete=true}
> 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 10.152.116.172:50010
>
>
>  I compare the printed log with Spark 1.2, although the number of
> getBlockLocations call is similar, but each such operation only cost 20~30
> ms (but it is 2000ms~3000ms now), and it didn't print the detailed
> LocatedBlocks info.
>
>  Another finding is, if I read the Parquet file via scala code form
> spark-shell as below, it looks fine, the computation will return the result
> quick as before.
>
>  sqlContext.parquetFile("data/myparquettable")
>
>
>  Any idea about it? Thank you!
>
>
>  --
>   郑旭东
> Zheng, Xudong
>
>
>


-- 
郑旭东
Zheng, Xudong

Re: Parquet Hive table become very slow on 1.3?

Posted by Cheng Lian <li...@gmail.com>.
Hi Xudong,

This is probably because of Parquet schema merging is turned on by 
default. This is generally useful for Parquet files with different but 
compatible schemas. But it needs to read metadata from all Parquet 
part-files. This can be problematic when reading Parquet files with lots 
of part-files, especially when the user doesn't need schema merging.

This issue is tracked by SPARK-6575, and here is a PR for it: 
https://github.com/apache/spark/pull/5231. This PR adds a configuration 
to disable schema merging by default when doing Hive metastore Parquet 
table conversion.

Another workaround is to fallback to the old Parquet code by setting 
spark.sql.parquet.useDataSourceApi to false.

Cheng

On 3/31/15 2:47 PM, Zheng, Xudong wrote:
> Hi all,
>
> We are using Parquet Hive table, and we are upgrading to Spark 1.3. 
> But we find that, just a simple COUNT(*) query will much slower (100x) 
> than Spark 1.2.
>
> I find the most time spent on driver to get HDFS blocks. I find large 
> amount of get below logs printed:
>
> 15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 2097ms
> 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
>    fileLength=77153436
>    underConstruction=false
>    blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.172:50010  <http://10.152.116.172:50010>,10.152.116.169:50010  <http://10.152.116.169:50010>, 10.153.125.184:50010]}]
>    lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; getBlockSize()=77153436; corrupt=false; offset=0; locs=[10.152.116.169:50010  <http://10.152.116.169:50010>,10.153.125.184:50010  <http://10.153.125.184:50010>,10.152.116.172:50010  <http://10.152.116.172:50010>]}
>    isLastBlockComplete=true}
> 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode10.152.116.172:50010  <http://10.152.116.172:50010>
>
> I compare the printed log with Spark 1.2, although the number of 
> getBlockLocations call is similar, but each such operation only cost 
> 20~30 ms (but it is 2000ms~3000ms now), and it didn't print the 
> detailed LocatedBlocks info.
>
> Another finding is, if I read the Parquet file via scala code form 
> spark-shell as below, it looks fine, the computation will return the 
> result quick as before.
>
> |sqlContext.parquetFile("data/myparquettable")|
>
> Any idea about it? Thank you!
>
>
> -- 
> 郑旭东
> Zheng, Xudong
>