You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 莫涛 <mo...@sensetime.com> on 2017/04/17 08:23:56 UTC

答复: 答复: How to store 10M records in HDFS to speed up further filtering?

Hi Ryan,


The attachment is a screen shot for the spark job and this is the only stage for this job.

I've changed the partition size to 1GB by "--conf spark.sql.files.maxPartitionBytes=1073741824".


1. spark-orc seems not that smart. The input size is almost the whole data. I guess "only for matched ones the binary data is read" is not true as orc does not know the offset of each BINARY so things like seek could not happen

2. I've tried orc and it does skip the partition that has no hit. This could be a solution but the performance depends on the distribution of the given ID list. No partition could be skipped in the worst case.


Mo Tao



________________________________
发件人: Ryan <ry...@gmail.com>
发送时间: 2017年4月17日 15:42:46
收件人: 莫涛
抄送: user
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

1. Per my understanding, for orc files, it should push down the filters, which means all id columns will be scanned but only for matched ones the binary data is read. I haven't dig into spark-orc reader though..

2. orc itself have row group index and bloom filter index. you may try configurations like 'orc.bloom.filter.columns' on the source table first. From the spark side, with mapPartitions, it's possible to build sort of index for each partition.

And could you check how many tasks does the filter stage have? maybe there's too few partitions..

On Mon, Apr 17, 2017 at 3:01 PM, 莫涛 <mo...@sensetime.com>> wrote:

Hi Ryan,


1. "expected qps and response time for the filter request"

I expect that only the requested BINARY are scanned instead of all records, so the response time would be "10K * 5MB / disk read speed", or several times of this.

In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB data takes about 6 hours now. It should becomes several minutes as expected.


2. "build a search tree using ids within each partition to act like an index, or create a bloom filter to see if current partition would have any hit"

Sounds like the thing I'm looking for!

Could you kindly provide some links for reference? I found nothing in spark document about index or bloom filter working inside partition.


Thanks very much!


Mo Tao

________________________________
发件人: Ryan <ry...@gmail.com>>
发送时间: 2017年4月17日 14:32:00
收件人: 莫涛
抄送: user
主题: Re: How to store 10M records in HDFS to speed up further filtering?

you can build a search tree using ids within each partition to act like an index, or create a bloom filter to see if current partition would have any hit.

What's your expected qps and response time for the filter request?


On Mon, Apr 17, 2017 at 2:23 PM, MoTao <mo...@sensetime.com>> wrote:
Hi all,

I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
average.
In my daily application, I need to filter out 10K BINARY according to an ID
list.
How should I store the whole data to make the filtering faster?

I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
and column-based format (orc).
However, both of them require to scan almost ALL records, making the
filtering stage very very slow.
The code block for filtering looks like:

val IDSet: Set[String] = ...
val checkID = udf { ID: String => IDSet(ID) }
spark.read.orc("/path/to/whole/data")
  .filter(checkID($"ID"))
  .select($"ID", $"BINARY")
  .write...

Thanks for any advice!




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>




Re: 答复: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

Posted by Ryan <ry...@gmail.com>.
Hi mo,

I don't think it needs shuffle cause the bloom filter only depends on data
within each row group, not the whole data. But the HAR solution seems nice.
I've thought of combining small files together and store the offsets.. not
aware of hdfs has provided such functionality. And after some searching I
find sequence file might be a comparator of har you may interested with.

Thanks for all people involved. I've learnt a lot too :-)


On Thu, Apr 20, 2017 at 5:25 PM, 莫涛 <mo...@sensetime.com> wrote:

> Hi Ryan,
>
>
> The attachment is the event timeline on executors. They are always busy
> computing.
>
> More executors are helpful but that's not my job as a developer.
>
>
> 1. The bad performance could be caused by my poor implementation, as "checkID"
> would not pushdown as a user defined function.
>
> 2. To make the group index works, I need to sort the data by id, which
> leads to shuffle of 50T data. That's somehow crazy.
>
>
> I'm on the way testing HAR, but the discussion brings me lots of insight
> about ORC.
>
> Thanks for your help!
>
>
> ------------------------------
> *发件人:* Ryan <ry...@gmail.com>
> *发送时间:* 2017年4月17日 16:48:47
> *收件人:* 莫涛
> *抄送:* user
> *主题:* Re: 答复: 答复: How to store 10M records in HDFS to speed up further
> filtering?
>
> how about the event timeline on executors? It seems add more executor
> could help.
>
> 1. I found a jira(https://issues.apache.org/jira/browse/SPARK-11621) that
> states the ppd should work. And I think "only for matched ones the binary
> data is read" is true if proper index is configured. The row group wouldn't
> be read if the predicate isn't satisfied due to index.
>
> 2. It is absolutely true the performance gain depends on the id
> distribution...
>
> On Mon, Apr 17, 2017 at 4:23 PM, 莫涛 <mo...@sensetime.com> wrote:
>
>> Hi Ryan,
>>
>>
>> The attachment is a screen shot for the spark job and this is the only
>> stage for this job.
>>
>> I've changed the partition size to 1GB by "--conf
>> spark.sql.files.maxPartitionBytes=1073741824 <010%207374%201824>".
>>
>>
>> 1. spark-orc seems not that smart. The input size is almost the whole
>> data. I guess "only for matched ones the binary data is read" is not
>> true as orc does not know the offset of each BINARY so things like seek
>> could not happen
>>
>> 2. I've tried orc and it does skip the partition that has no hit. This
>> could be a solution but the performance depends on the distribution of the
>> given ID list. No partition could be skipped in the worst case.
>>
>>
>> Mo Tao
>>
>>
>>
>> ------------------------------
>> *发件人:* Ryan <ry...@gmail.com>
>> *发送时间:* 2017年4月17日 15:42:46
>> *收件人:* 莫涛
>> *抄送:* user
>> *主题:* Re: 答复: How to store 10M records in HDFS to speed up further
>> filtering?
>>
>> 1. Per my understanding, for orc files, it should push down the filters,
>> which means all id columns will be scanned but only for matched ones the
>> binary data is read. I haven't dig into spark-orc reader though..
>>
>> 2. orc itself have row group index and bloom filter index. you may try
>> configurations like 'orc.bloom.filter.columns' on the source table first.
>> From the spark side, with mapPartitions, it's possible to build sort of
>> index for each partition.
>>
>> And could you check how many tasks does the filter stage have? maybe
>> there's too few partitions..
>>
>> On Mon, Apr 17, 2017 at 3:01 PM, 莫涛 <mo...@sensetime.com> wrote:
>>
>>> Hi Ryan,
>>>
>>>
>>> 1. "expected qps and response time for the filter request"
>>>
>>> I expect that only the requested BINARY are scanned instead of all
>>> records, so the response time would be "10K * 5MB / disk read speed",
>>> or several times of this.
>>>
>>> In practice, our cluster has 30 SAS disks and scanning all the 10M *
>>> 5MB data takes about 6 hours now. It should becomes several minutes as
>>> expected.
>>>
>>>
>>> 2. "build a search tree using ids within each partition to act like an
>>> index, or create a bloom filter to see if current partition would have any
>>> hit"
>>>
>>> Sounds like the thing I'm looking for!
>>>
>>> Could you kindly provide some links for reference? I found nothing in
>>> spark document about index or bloom filter working inside partition.
>>>
>>>
>>> Thanks very much!
>>>
>>>
>>> Mo Tao
>>>
>>> ------------------------------
>>> *发件人:* Ryan <ry...@gmail.com>
>>> *发送时间:* 2017年4月17日 14:32:00
>>> *收件人:* 莫涛
>>> *抄送:* user
>>> *主题:* Re: How to store 10M records in HDFS to speed up further
>>> filtering?
>>>
>>> you can build a search tree using ids within each partition to act like
>>> an index, or create a bloom filter to see if current partition would have
>>> any hit.
>>>
>>> What's your expected qps and response time for the filter request?
>>>
>>>
>>> On Mon, Apr 17, 2017 at 2:23 PM, MoTao <mo...@sensetime.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
>>>> average.
>>>> In my daily application, I need to filter out 10K BINARY according to
>>>> an ID
>>>> list.
>>>> How should I store the whole data to make the filtering faster?
>>>>
>>>> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format
>>>> (avro)
>>>> and column-based format (orc).
>>>> However, both of them require to scan almost ALL records, making the
>>>> filtering stage very very slow.
>>>> The code block for filtering looks like:
>>>>
>>>> val IDSet: Set[String] = ...
>>>> val checkID = udf { ID: String => IDSet(ID) }
>>>> spark.read.orc("/path/to/whole/data")
>>>>   .filter(checkID($"ID"))
>>>>   .select($"ID", $"BINARY")
>>>>   .write...
>>>>
>>>> Thanks for any advice!
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-sp
>>>> eed-up-further-filtering-tp28605.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>
>>>
>>
>

答复: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

Posted by 莫涛 <mo...@sensetime.com>.
Hi Ryan,


The attachment is the event timeline on executors. They are always busy computing.

More executors are helpful but that's not my job as a developer.


1. The bad performance could be caused by my poor implementation, as "checkID" would not pushdown as a user defined function.

2. To make the group index works, I need to sort the data by id, which leads to shuffle of 50T data. That's somehow crazy.


I'm on the way testing HAR, but the discussion brings me lots of insight about ORC.

Thanks for your help!


________________________________
发件人: Ryan <ry...@gmail.com>
发送时间: 2017年4月17日 16:48:47
收件人: 莫涛
抄送: user
主题: Re: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

how about the event timeline on executors? It seems add more executor could help.

1. I found a jira(https://issues.apache.org/jira/browse/SPARK-11621) that states the ppd should work. And I think "only for matched ones the binary data is read" is true if proper index is configured. The row group wouldn't be read if the predicate isn't satisfied due to index.

2. It is absolutely true the performance gain depends on the id distribution...

On Mon, Apr 17, 2017 at 4:23 PM, 莫涛 <mo...@sensetime.com>> wrote:

Hi Ryan,


The attachment is a screen shot for the spark job and this is the only stage for this job.

I've changed the partition size to 1GB by "--conf spark.sql.files.maxPartitionBytes=1073741824<tel:010%207374%201824>".


1. spark-orc seems not that smart. The input size is almost the whole data. I guess "only for matched ones the binary data is read" is not true as orc does not know the offset of each BINARY so things like seek could not happen

2. I've tried orc and it does skip the partition that has no hit. This could be a solution but the performance depends on the distribution of the given ID list. No partition could be skipped in the worst case.


Mo Tao



________________________________
发件人: Ryan <ry...@gmail.com>>
发送时间: 2017年4月17日 15:42:46
收件人: 莫涛
抄送: user
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

1. Per my understanding, for orc files, it should push down the filters, which means all id columns will be scanned but only for matched ones the binary data is read. I haven't dig into spark-orc reader though..

2. orc itself have row group index and bloom filter index. you may try configurations like 'orc.bloom.filter.columns' on the source table first. From the spark side, with mapPartitions, it's possible to build sort of index for each partition.

And could you check how many tasks does the filter stage have? maybe there's too few partitions..

On Mon, Apr 17, 2017 at 3:01 PM, 莫涛 <mo...@sensetime.com>> wrote:

Hi Ryan,


1. "expected qps and response time for the filter request"

I expect that only the requested BINARY are scanned instead of all records, so the response time would be "10K * 5MB / disk read speed", or several times of this.

In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB data takes about 6 hours now. It should becomes several minutes as expected.


2. "build a search tree using ids within each partition to act like an index, or create a bloom filter to see if current partition would have any hit"

Sounds like the thing I'm looking for!

Could you kindly provide some links for reference? I found nothing in spark document about index or bloom filter working inside partition.


Thanks very much!


Mo Tao

________________________________
发件人: Ryan <ry...@gmail.com>>
发送时间: 2017年4月17日 14:32:00
收件人: 莫涛
抄送: user
主题: Re: How to store 10M records in HDFS to speed up further filtering?

you can build a search tree using ids within each partition to act like an index, or create a bloom filter to see if current partition would have any hit.

What's your expected qps and response time for the filter request?


On Mon, Apr 17, 2017 at 2:23 PM, MoTao <mo...@sensetime.com>> wrote:
Hi all,

I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
average.
In my daily application, I need to filter out 10K BINARY according to an ID
list.
How should I store the whole data to make the filtering faster?

I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
and column-based format (orc).
However, both of them require to scan almost ALL records, making the
filtering stage very very slow.
The code block for filtering looks like:

val IDSet: Set[String] = ...
val checkID = udf { ID: String => IDSet(ID) }
spark.read.orc("/path/to/whole/data")
  .filter(checkID($"ID"))
  .select($"ID", $"BINARY")
  .write...

Thanks for any advice!




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>





Re: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

Posted by Ryan <ry...@gmail.com>.
how about the event timeline on executors? It seems add more executor could
help.

1. I found a jira(https://issues.apache.org/jira/browse/SPARK-11621) that
states the ppd should work. And I think "only for matched ones the binary
data is read" is true if proper index is configured. The row group wouldn't
be read if the predicate isn't satisfied due to index.

2. It is absolutely true the performance gain depends on the id
distribution...

On Mon, Apr 17, 2017 at 4:23 PM, 莫涛 <mo...@sensetime.com> wrote:

> Hi Ryan,
>
>
> The attachment is a screen shot for the spark job and this is the only
> stage for this job.
>
> I've changed the partition size to 1GB by "--conf spark.sql.files.
> maxPartitionBytes=1073741824 <010%207374%201824>".
>
>
> 1. spark-orc seems not that smart. The input size is almost the whole
> data. I guess "only for matched ones the binary data is read" is not true
> as orc does not know the offset of each BINARY so things like seek could
> not happen
>
> 2. I've tried orc and it does skip the partition that has no hit. This
> could be a solution but the performance depends on the distribution of the
> given ID list. No partition could be skipped in the worst case.
>
>
> Mo Tao
>
>
>
> ------------------------------
> *发件人:* Ryan <ry...@gmail.com>
> *发送时间:* 2017年4月17日 15:42:46
> *收件人:* 莫涛
> *抄送:* user
> *主题:* Re: 答复: How to store 10M records in HDFS to speed up further
> filtering?
>
> 1. Per my understanding, for orc files, it should push down the filters,
> which means all id columns will be scanned but only for matched ones the
> binary data is read. I haven't dig into spark-orc reader though..
>
> 2. orc itself have row group index and bloom filter index. you may try
> configurations like 'orc.bloom.filter.columns' on the source table first.
> From the spark side, with mapPartitions, it's possible to build sort of
> index for each partition.
>
> And could you check how many tasks does the filter stage have? maybe
> there's too few partitions..
>
> On Mon, Apr 17, 2017 at 3:01 PM, 莫涛 <mo...@sensetime.com> wrote:
>
>> Hi Ryan,
>>
>>
>> 1. "expected qps and response time for the filter request"
>>
>> I expect that only the requested BINARY are scanned instead of all
>> records, so the response time would be "10K * 5MB / disk read speed", or
>> several times of this.
>>
>> In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB
>> data takes about 6 hours now. It should becomes several minutes as expected.
>>
>>
>> 2. "build a search tree using ids within each partition to act like an
>> index, or create a bloom filter to see if current partition would have any
>> hit"
>>
>> Sounds like the thing I'm looking for!
>>
>> Could you kindly provide some links for reference? I found nothing in
>> spark document about index or bloom filter working inside partition.
>>
>>
>> Thanks very much!
>>
>>
>> Mo Tao
>>
>> ------------------------------
>> *发件人:* Ryan <ry...@gmail.com>
>> *发送时间:* 2017年4月17日 14:32:00
>> *收件人:* 莫涛
>> *抄送:* user
>> *主题:* Re: How to store 10M records in HDFS to speed up further filtering?
>>
>> you can build a search tree using ids within each partition to act like
>> an index, or create a bloom filter to see if current partition would have
>> any hit.
>>
>> What's your expected qps and response time for the filter request?
>>
>>
>> On Mon, Apr 17, 2017 at 2:23 PM, MoTao <mo...@sensetime.com> wrote:
>>
>>> Hi all,
>>>
>>> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
>>> average.
>>> In my daily application, I need to filter out 10K BINARY according to an
>>> ID
>>> list.
>>> How should I store the whole data to make the filtering faster?
>>>
>>> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
>>> and column-based format (orc).
>>> However, both of them require to scan almost ALL records, making the
>>> filtering stage very very slow.
>>> The code block for filtering looks like:
>>>
>>> val IDSet: Set[String] = ...
>>> val checkID = udf { ID: String => IDSet(ID) }
>>> spark.read.orc("/path/to/whole/data")
>>>   .filter(checkID($"ID"))
>>>   .select($"ID", $"BINARY")
>>>   .write...
>>>
>>> Thanks for any advice!
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-sp
>>> eed-up-further-filtering-tp28605.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>>>
>>
>