You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by 1057445597 <10...@qq.com> on 2022/06/21 12:44:11 UTC

回复:How to speed up arrow's reading of S3 Parquet files?

There are 340 columns in Parquet, and there are arrays of columns, and in total there are about a thousand floats per row




1057445597
1057445597@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user"                                                                                    <1057445597@qq.com&gt;;
发送时间:&nbsp;2022年6月20日(星期一) 晚上10:12
收件人:&nbsp;"user"<user@arrow.apache.org&gt;;

主题:&nbsp;How to speed up arrow's reading of S3 Parquet files?



I am using Arrow to do tensorflow dataset for training. Structured data is stored on S3 as parquet files. I used Arrow to construct a TFIO dataset, but after the overall test, I found that the reading speed was slower than alluxio. Specific test data are as follows:


alluxio store as tfrecord
dataset base alluxio:47700 row/s, global_steps/sec: 6.5,
dataset base arrow:&nbsp; 15100 row/s, global_steps/sec: 3.7


In the case of reading with arrow without translating to tensor, here's the test:31000row/s

Does it feel slow, or should it? Is there any way to speed it up?



When reading data, the maximum speed of the Internet is 150MBs, but the maximum speed of the machine is 2500MBs, which is not fully utilized.
&nbsp; &nbsp; 
My code looks something like this,If you want to go into more detail, you can look at this PR:PR


 auto dataset = GetDatasetFromS3(K_ACCESS_KEY1, K_SECRET_KEY1, K_ENDPOINT_OVERRIDE1, K_BZZP);
  auto arrow_thread_pool_ = arrow::internal::ThreadPool::MakeEternal(16).ValueOrDie();
  auto scan_options_ = std::make_shared<arrow::dataset::ScanOptions&gt;();
  scan_options_-&gt;use_threads = true;
  scan_options_-&gt;io_context = arrow::io::IOContext(arrow_thread_pool_.get());
  auto scanner_builder = std::make_shared<arrow::dataset::ScannerBuilder&gt;(dataset, scan_options_);





  scanner_builder-&gt;Project(column_names);
  // }
  scanner_builder-&gt;BatchSize(60*1024);
  scanner_builder-&gt;UseThreads(true);
  auto scanner = scanner_builder-&gt;Finish().ValueOrDie();
  auto reader_ = scanner-&gt;ToRecordBatchReader().ValueOrDie();
  std::shared_ptr<arrow::RecordBatch&gt; current_batch_ = nullptr;
  reader_-&gt;ReadNext(&amp;current_batch_);
  long total_count = 0;
  total_count += current_batch_-&gt;num_rows();
  while(current_batch_) {
    reader_-&gt;ReadNext(&amp;current_batch_);
    if (current_batch_) {
      total_count += current_batch_-&gt;num_rows();
      std::cout << "row: " << current_batch_-&gt;num_rows();
    }
  }
  std::cout << std::endl;
  std::cout << "Total rows: " << total_count << std::endl;
    end = clock();
    double endtime = (double)(end-start)/CLOCKS_PER_SEC;
    std::cout << "Total time: " << endtime << "s"<< std::endl;
std::cout << "Speed: " << total_count / endtime << " rows/s" << std::endl;







1057445597
1057445597@qq.com



&nbsp;