You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by 1057445597 <10...@qq.com> on 2022/10/26 17:54:43 UTC

回复: Scanner Filter memory leak?

Sorry for not seeing your reply clearly, I did read it as a table first, because I only want to read those columns that I need. Here is my code:


https://github.com/tensorflow/io/pull/1720/files#diff-7133d540dc86c9bb9e552655025061798314e226695c00b4e1d8cecb178a2920R1181


arrow_dataset_op.cc:1181


I still think there may be a problem. Memory consumption is growing too fast. Any suggestions?


I added the monitoring graph of using filter to read parquet files for deep learning training. In the attachment, you can see that the memory footprint is increasing. However, the memory usage of the training task without filter is maintained at about 30G. Isn't this a memory leak?









1057445597
1057445597@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user"                                                                                    <weston.pace@gmail.com&gt;;
发送时间:&nbsp;2022年10月27日(星期四) 凌晨1:23
收件人:&nbsp;"user"<user@arrow.apache.org&gt;;

主题:&nbsp;Re: Scanner Filter memory leak?



Are you reading the output into a table?&nbsp; My best guess, at the moment, is that this is https://issues.apache.org/jira/browse/ARROW-18160.



We read (typically large) row groups from a parquet file.&nbsp; We then process that large row group and return processed batches.&nbsp; Without a filter those processed batches are simply views into that row group.&nbsp; With a filter we need to reallocate new batches.&nbsp; These new batches are not views 

into that row group.&nbsp; So additional memory is required.&nbsp; If you are collecting those processed batches into a table then you will need enough memory to hold the table and the source row group (we cannot free this row group until we have processed all the data).


I don't think it's a leak though I think there are a few ways we could tackle this problem.&nbsp; For example, maybe we could read from the file into smaller buffers that are sized how we plan to process the data.




On Wed, Oct 26, 2022 at 6:00 AM 1057445597 <1057445597@qq.com&gt; wrote:

When the scanner uses filter, the memory has been rising. When the filter is not used, the memory occupation is stable.










When filter is not used, the memory usage is only about 4%






          std::shared_ptr<arrow::RecordBatchReader&gt; batch_reader =
              std::make_shared<arrow::TableBatchReader&gt;(table);
          std::shared_ptr<arrow::RecordBatch&gt; batch = nullptr;

          // filter
          if (!dataset()-&gt;filter_.empty()) {
            auto scanner_builder =
                arrow::dataset::ScannerBuilder::FromRecordBatchReader(
                    batch_reader);
            auto filter_expr = arrow::compute::call("greater_equal", {arrow::compute::field_ref("rank"), arrow::compute::literal(0.5)});
            std::cout << "FilterExpr: " << filter_expr.ToString() << std::endl;
            // scanner_builder-&gt;Filter(filter_expr);
            // scanner_builder-&gt;Filter(dataset()-&gt;filter_expr_);

            auto scanner_result = scanner_builder-&gt;Finish();
            if (!scanner_result.ok()) {
              res = errors::Internal(scanner_result.status().ToString());
              break;
            }
            auto scanner = scanner_result.ValueOrDie();
            auto batch_reader_result = scanner-&gt;ToRecordBatchReader();
            if (!batch_reader_result.ok()) {
              res = errors::Internal(batch_reader_result.status().ToString());
              break;
            }
            batch_reader = batch_reader_result.ValueOrDie();
          }

          arrow_status = batch_reader-&gt;ReadNext(&amp;batch);
          if (!arrow_status.ok()) {




1057445597
1057445597@qq.com



&nbsp;

回复: Scanner Filter memory leak?

Posted by 1057445597 <10...@qq.com>.
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2


I'm going to add this when I run the program to solve this problem. My arrow version is 8.0.0





1057445597
1057445597@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user"                                                                                    <weston.pace@gmail.com&gt;;
发送时间:&nbsp;2022年10月27日(星期四) 凌晨2:49
收件人:&nbsp;"user"<user@arrow.apache.org&gt;;

主题:&nbsp;Re: Scanner Filter memory leak?



It appears you are draining the dataset into either record_batches_ or next_record_batches_:


```
          while (batch != nullptr) {   if (batch-&gt;num_rows() != 0) {     if (!background) {       record_batches_.emplace_back(batch);     } else {       next_record_batches_.emplace_back(batch);     }   }   arrow_status = batch_reader-&gt;ReadNext(&amp;batch);   if (!arrow_status.ok()) {     res = errors::Internal(arrow_status.ToString());     break;   } }


```



I would expect memory to accumulate while data accumulates in those buffers.&nbsp; Is there some other thread that is reading from those buffers?&nbsp; Or is the memory growth exceeding the total size of the dataset?&nbsp; I'm not sure why this accumulation would be different with/without a filter though.


Can you also log/graph:


```

arrow::default_memory_pool()-&gt;bytes_allocated()

```


On Wed, Oct 26, 2022 at 10:56 AM 1057445597 <1057445597@qq.com&gt; wrote:

Sorry for not seeing your reply clearly, I did read it as a table first, because I only want to read those columns that I need. Here is my code:


https://github.com/tensorflow/io/pull/1720/files#diff-7133d540dc86c9bb9e552655025061798314e226695c00b4e1d8cecb178a2920R1181


arrow_dataset_op.cc:1181


I still think there may be a problem. Memory consumption is growing too fast. Any suggestions?


I added the monitoring graph of using filter to read parquet files for deep learning training. In the attachment, you can see that the memory footprint is increasing. However, the memory usage of the training task without filter is maintained at about 30G. Isn't this a memory leak?









1057445597
1057445597@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user"                                                                                    <weston.pace@gmail.com&gt;;
发送时间:&nbsp;2022年10月27日(星期四) 凌晨1:23
收件人:&nbsp;"user"<user@arrow.apache.org&gt;;

主题:&nbsp;Re: Scanner Filter memory leak?



Are you reading the output into a table?&nbsp; My best guess, at the moment, is that this is https://issues.apache.org/jira/browse/ARROW-18160.



We read (typically large) row groups from a parquet file.&nbsp; We then process that large row group and return processed batches.&nbsp; Without a filter those processed batches are simply views into that row group.&nbsp; With a filter we need to reallocate new batches.&nbsp; These new batches are not views 

into that row group.&nbsp; So additional memory is required.&nbsp; If you are collecting those processed batches into a table then you will need enough memory to hold the table and the source row group (we cannot free this row group until we have processed all the data).


I don't think it's a leak though I think there are a few ways we could tackle this problem.&nbsp; For example, maybe we could read from the file into smaller buffers that are sized how we plan to process the data.




On Wed, Oct 26, 2022 at 6:00 AM 1057445597 <1057445597@qq.com&gt; wrote:

When the scanner uses filter, the memory has been rising. When the filter is not used, the memory occupation is stable.










When filter is not used, the memory usage is only about 4%






          std::shared_ptr<arrow::RecordBatchReader&gt; batch_reader =
              std::make_shared<arrow::TableBatchReader&gt;(table);
          std::shared_ptr<arrow::RecordBatch&gt; batch = nullptr;

          // filter
          if (!dataset()-&gt;filter_.empty()) {
            auto scanner_builder =
                arrow::dataset::ScannerBuilder::FromRecordBatchReader(
                    batch_reader);
            auto filter_expr = arrow::compute::call("greater_equal", {arrow::compute::field_ref("rank"), arrow::compute::literal(0.5)});
            std::cout << "FilterExpr: " << filter_expr.ToString() << std::endl;
            // scanner_builder-&gt;Filter(filter_expr);
            // scanner_builder-&gt;Filter(dataset()-&gt;filter_expr_);

            auto scanner_result = scanner_builder-&gt;Finish();
            if (!scanner_result.ok()) {
              res = errors::Internal(scanner_result.status().ToString());
              break;
            }
            auto scanner = scanner_result.ValueOrDie();
            auto batch_reader_result = scanner-&gt;ToRecordBatchReader();
            if (!batch_reader_result.ok()) {
              res = errors::Internal(batch_reader_result.status().ToString());
              break;
            }
            batch_reader = batch_reader_result.ValueOrDie();
          }

          arrow_status = batch_reader-&gt;ReadNext(&amp;batch);
          if (!arrow_status.ok()) {




1057445597
1057445597@qq.com



&nbsp;

回复: Scanner Filter memory leak?

Posted by 1057445597 <10...@qq.com>.
There are two designations of using the ReadFile function. One designations of reading the file directly into record_batches. The other type is to batches in the background thread and save them in next_record_batches, and when you use them, they swap to record_batches. This is so that you can use the background thread to preread a file during training. Our dataset is huge, we store close to 100 TB of training data





no filter





have filter



It looks like the memory allocation is the same with or without filter.


1057445597
1057445597@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user"                                                                                    <weston.pace@gmail.com&gt;;
发送时间:&nbsp;2022年10月27日(星期四) 凌晨2:49
收件人:&nbsp;"user"<user@arrow.apache.org&gt;;

主题:&nbsp;Re: Scanner Filter memory leak?



It appears you are draining the dataset into either record_batches_ or next_record_batches_:


```
          while (batch != nullptr) {   if (batch-&gt;num_rows() != 0) {     if (!background) {       record_batches_.emplace_back(batch);     } else {       next_record_batches_.emplace_back(batch);     }   }   arrow_status = batch_reader-&gt;ReadNext(&amp;batch);   if (!arrow_status.ok()) {     res = errors::Internal(arrow_status.ToString());     break;   } }


```



I would expect memory to accumulate while data accumulates in those buffers.&nbsp; Is there some other thread that is reading from those buffers?&nbsp; Or is the memory growth exceeding the total size of the dataset?&nbsp; I'm not sure why this accumulation would be different with/without a filter though.


Can you also log/graph:


```

arrow::default_memory_pool()-&gt;bytes_allocated()

```


On Wed, Oct 26, 2022 at 10:56 AM 1057445597 <1057445597@qq.com&gt; wrote:

Sorry for not seeing your reply clearly, I did read it as a table first, because I only want to read those columns that I need. Here is my code:


https://github.com/tensorflow/io/pull/1720/files#diff-7133d540dc86c9bb9e552655025061798314e226695c00b4e1d8cecb178a2920R1181


arrow_dataset_op.cc:1181


I still think there may be a problem. Memory consumption is growing too fast. Any suggestions?


I added the monitoring graph of using filter to read parquet files for deep learning training. In the attachment, you can see that the memory footprint is increasing. However, the memory usage of the training task without filter is maintained at about 30G. Isn't this a memory leak?









1057445597
1057445597@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user"                                                                                    <weston.pace@gmail.com&gt;;
发送时间:&nbsp;2022年10月27日(星期四) 凌晨1:23
收件人:&nbsp;"user"<user@arrow.apache.org&gt;;

主题:&nbsp;Re: Scanner Filter memory leak?



Are you reading the output into a table?&nbsp; My best guess, at the moment, is that this is https://issues.apache.org/jira/browse/ARROW-18160.



We read (typically large) row groups from a parquet file.&nbsp; We then process that large row group and return processed batches.&nbsp; Without a filter those processed batches are simply views into that row group.&nbsp; With a filter we need to reallocate new batches.&nbsp; These new batches are not views 

into that row group.&nbsp; So additional memory is required.&nbsp; If you are collecting those processed batches into a table then you will need enough memory to hold the table and the source row group (we cannot free this row group until we have processed all the data).


I don't think it's a leak though I think there are a few ways we could tackle this problem.&nbsp; For example, maybe we could read from the file into smaller buffers that are sized how we plan to process the data.




On Wed, Oct 26, 2022 at 6:00 AM 1057445597 <1057445597@qq.com&gt; wrote:

When the scanner uses filter, the memory has been rising. When the filter is not used, the memory occupation is stable.










When filter is not used, the memory usage is only about 4%






          std::shared_ptr<arrow::RecordBatchReader&gt; batch_reader =
              std::make_shared<arrow::TableBatchReader&gt;(table);
          std::shared_ptr<arrow::RecordBatch&gt; batch = nullptr;

          // filter
          if (!dataset()-&gt;filter_.empty()) {
            auto scanner_builder =
                arrow::dataset::ScannerBuilder::FromRecordBatchReader(
                    batch_reader);
            auto filter_expr = arrow::compute::call("greater_equal", {arrow::compute::field_ref("rank"), arrow::compute::literal(0.5)});
            std::cout << "FilterExpr: " << filter_expr.ToString() << std::endl;
            // scanner_builder-&gt;Filter(filter_expr);
            // scanner_builder-&gt;Filter(dataset()-&gt;filter_expr_);

            auto scanner_result = scanner_builder-&gt;Finish();
            if (!scanner_result.ok()) {
              res = errors::Internal(scanner_result.status().ToString());
              break;
            }
            auto scanner = scanner_result.ValueOrDie();
            auto batch_reader_result = scanner-&gt;ToRecordBatchReader();
            if (!batch_reader_result.ok()) {
              res = errors::Internal(batch_reader_result.status().ToString());
              break;
            }
            batch_reader = batch_reader_result.ValueOrDie();
          }

          arrow_status = batch_reader-&gt;ReadNext(&amp;batch);
          if (!arrow_status.ok()) {




1057445597
1057445597@qq.com



&nbsp;

Re: Scanner Filter memory leak?

Posted by Weston Pace <we...@gmail.com>.
It appears you are draining the dataset into either record_batches_ or
next_record_batches_:

```

while (batch != nullptr) {
  if (batch->num_rows() != 0) {
    if (!background) {
      record_batches_.emplace_back(batch);
    } else {
      next_record_batches_.emplace_back(batch);
    }
  }
  arrow_status = batch_reader->ReadNext(&batch);
  if (!arrow_status.ok()) {
    res = errors::Internal(arrow_status.ToString());
    break;
  }
}

```

I would expect memory to accumulate while data accumulates in those
buffers.  Is there some other thread that is reading from those buffers?
Or is the memory growth exceeding the total size of the dataset?  I'm not
sure why this accumulation would be different with/without a filter though.

Can you also log/graph:

```
arrow::default_memory_pool()->bytes_allocated()
```

On Wed, Oct 26, 2022 at 10:56 AM 1057445597 <10...@qq.com> wrote:

> Sorry for not seeing your reply clearly, I did read it as a table first,
> because I only want to read those columns that I need. Here is my code:
>
>
> https://github.com/tensorflow/io/pull/1720/files#diff-7133d540dc86c9bb9e552655025061798314e226695c00b4e1d8cecb178a2920R1181
>
> arrow_dataset_op.cc:1181
>
> I still think there may be a problem. Memory consumption is growing too
> fast. Any suggestions?
>
> I added the monitoring graph of using filter to read parquet files for
> deep learning training. In the attachment, you can see that the memory
> footprint is increasing. However, the memory usage of the training task
> without filter is maintained at about 30G. Isn't this a memory leak?
>
> ------------------------------
> 1057445597
> 1057445597@qq.com
>
> <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage&nocheck=true&name=1057445597&icon=http%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Dsdk%26k%3DIlyZtc5eQb1ZfPd0rzpQlQ%26s%3D100%26t%3D1551800738%3Frand%3D1648208978&mail=1057445597%40qq.com&code=>
>
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "user" <we...@gmail.com>;
> *发送时间:* 2022年10月27日(星期四) 凌晨1:23
> *收件人:* "user"<us...@arrow.apache.org>;
> *主题:* Re: Scanner Filter memory leak?
>
> Are you reading the output into a table?  My best guess, at the moment, is
> that this is https://issues.apache.org/jira/browse/ARROW-18160.
>
> We read (typically large) row groups from a parquet file.  We then process
> that large row group and return processed batches.  Without a filter those
> processed batches are simply views into that row group.  With a filter we
> need to reallocate new batches.  These new batches are not views
> into that row group.  So additional memory is required.  If you are
> collecting those processed batches into a table then you will need enough
> memory to hold the table and the source row group (we cannot free this row
> group until we have processed all the data).
>
> I don't think it's a leak though I think there are a few ways we could
> tackle this problem.  For example, maybe we could read from the file into
> smaller buffers that are sized how we plan to process the data.
>
> On Wed, Oct 26, 2022 at 6:00 AM 1057445597 <10...@qq.com> wrote:
>
>> When the scanner uses filter, the memory has been rising. When the filter
>> is not used, the memory occupation is stable.
>>
>>
>>
>>
>>
>>
>> When filter is not used, the memory usage is only about 4%
>>
>>
>>
>> std::shared_ptr<arrow::RecordBatchReader> batch_reader =
>> std::make_shared<arrow::TableBatchReader>(table);
>> std::shared_ptr<arrow::RecordBatch> batch = nullptr;
>>
>> // filter
>> if (!dataset()->filter_.empty()) {
>> auto scanner_builder =
>> arrow::dataset::ScannerBuilder::FromRecordBatchReader(
>> batch_reader);
>> auto filter_expr = arrow::compute::call("greater_equal", {arrow::compute
>> ::field_ref("rank"), arrow::compute::literal(0.5)});
>> std::cout << "FilterExpr: " << filter_expr.ToString() << std::endl;
>> // scanner_builder->Filter(filter_expr);
>> // scanner_builder->Filter(dataset()->filter_expr_);
>>
>> auto scanner_result = scanner_builder->Finish();
>> if (!scanner_result.ok()) {
>> res = errors::Internal(scanner_result.status().ToString());
>> break;
>> }
>> auto scanner = scanner_result.ValueOrDie();
>> auto batch_reader_result = scanner->ToRecordBatchReader();
>> if (!batch_reader_result.ok()) {
>> res = errors::Internal(batch_reader_result.status().ToString());
>> break;
>> }
>> batch_reader = batch_reader_result.ValueOrDie();
>> }
>>
>> arrow_status = batch_reader->ReadNext(&batch);
>> if (!arrow_status.ok()) {
>> ------------------------------
>> 1057445597
>> 1057445597@qq.com
>>
>> <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage&nocheck=true&name=1057445597&icon=http%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Dsdk%26k%3DIlyZtc5eQb1ZfPd0rzpQlQ%26s%3D100%26t%3D1551800738%3Frand%3D1648208978&mail=1057445597%40qq.com&code=>
>>
>>
>