You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/20 14:12:17 UTC

[GitHub] [arrow] LinGeLin opened a new issue, #13403: How to speed up arrow's reading of S3 Parquet files?

LinGeLin opened a new issue, #13403:
URL: https://github.com/apache/arrow/issues/13403

   I am using Arrow to do tensorflow dataset for training. Structured data is stored on S3 as parquet files. I used Arrow to construct a TFIO dataset, but after the overall test, I found that the reading speed was slower than alluxio. Specific test data are as follows:
   
   alluxio store as tfrecord
   dataset base alluxio:47700 row/s, global_steps/sec: 6.5,
   dataset base arrow:  15100 row/s, global_steps/sec: 3.7
   
   In the case of reading with arrow without translating to tensor, here's the test:31000row/s
   Does it feel slow, or should it? Is there any way to speed it up?
   
   When reading data, the maximum speed of the Internet is 150MBs, but the maximum speed of the machine is 2500MBs, which is not fully utilized.
       
   My code looks something like this,If you want to go into more detail, you can look at this PR:[PR](https://github.com/tensorflow/io/pull/1685/files#diff-7133d540dc86c9bb9e552655025061798314e226695c00b4e1d8cecb178a2920)
   
   
   ```
   auto dataset = GetDatasetFromS3(K_ACCESS_KEY1, K_SECRET_KEY1, K_ENDPOINT_OVERRIDE1, K_BZZP);
     auto arrow_thread_pool_ = arrow::internal::ThreadPool::MakeEternal(16).ValueOrDie();
     auto scan_options_ = std::make_shared<arrow::dataset::ScanOptions>();
     scan_options_->use_threads = true;
     scan_options_->io_context = arrow::io::IOContext(arrow_thread_pool_.get());
     auto scanner_builder = std::make_shared<arrow::dataset::ScannerBuilder>(dataset, scan_options_);
   
   
     scanner_builder->Project(column_names);
     // }
     scanner_builder->BatchSize(60*1024);
     scanner_builder->UseThreads(true);
     auto scanner = scanner_builder->Finish().ValueOrDie();
     auto reader_ = scanner->ToRecordBatchReader().ValueOrDie();
     std::shared_ptr<arrow::RecordBatch> current_batch_ = nullptr;
     reader_->ReadNext(&current_batch_);
     long total_count = 0;
     total_count += current_batch_->num_rows();
     while(current_batch_) {
       reader_->ReadNext(&current_batch_);
       if (current_batch_) {
         total_count += current_batch_->num_rows();
         std::cout << "row: " << current_batch_->num_rows();
       }
     }
     std::cout << std::endl;
     std::cout << "Total rows: " << total_count << std::endl;
       end = clock();
       double endtime = (double)(end-start)/CLOCKS_PER_SEC;
       std::cout << "Total time: " << endtime << "s"<< std::endl;
   std::cout << "Speed: " << total_count / endtime << " rows/s" << std::endl;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] LinGeLin commented on issue #13403: [C++] How to speed up arrow's reading of S3 Parquet files?

Posted by GitBox <gi...@apache.org>.
LinGeLin commented on issue #13403:
URL: https://github.com/apache/arrow/issues/13403#issuecomment-1206003819

   My [pr](https://github.com/tensorflow/io/pull/1685) to tfio is here, Any Suggestions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] LinGeLin commented on issue #13403: [C++] How to speed up arrow's reading of S3 Parquet files?

Posted by GitBox <gi...@apache.org>.
LinGeLin commented on issue #13403:
URL: https://github.com/apache/arrow/issues/13403#issuecomment-1206001646

   Thanks, I made a simple mistake. I used the Debug version to test, and of course it was slow. The parameters you mentioned might be useful, but now I read S3 parquet in a different way, because the dataset always core dump when the program exits
   `
         Status ReadFile(int file_index, bool background = false)
             TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
           auto access_file =
               s3fs_->OpenInputFile(dataset()->parquet_files_[file_index])
                   .ValueOrDie();
   
           parquet::ArrowReaderProperties properties;
           properties.set_use_threads(true);
           properties.set_pre_buffer(true);
           parquet::ReaderProperties parquet_properties =
               parquet::default_reader_properties();
   
           std::shared_ptr<parquet::arrow::FileReaderBuilder> builder =
               std::make_shared<parquet::arrow::FileReaderBuilder>();
           builder->Open(access_file, parquet_properties);
   
           std::unique_ptr<parquet::arrow::FileReader> reader;
           builder->properties(properties)->Build(&reader);
   
           if (column_indices_.empty()) {
             std::shared_ptr<arrow::Schema> schema;
             reader->GetSchema(&schema);
             // check column name exist
             std::string err_column_names;
             for (const auto& name : dataset()->column_names_) {
               int fieldIndex = schema->GetFieldIndex(name);
               column_indices_.push_back(fieldIndex);
               if (-1 == fieldIndex) {
                 err_column_names = err_column_names + " " + name;
               }
             }
   
             if (err_column_names.length() != 0) {
               return errors::InvalidArgument("these column names don't exist: ",
                                              err_column_names);
             }
           }
           // Read file columns and build a table
           std::shared_ptr<::arrow::Table> table;
           CHECK_ARROW(reader->ReadTable(column_indices_, &table));
           // Convert the table to a sequence of batches
           std::shared_ptr<arrow::RecordBatchReader> batch_reader =
               std::make_shared<arrow::TableBatchReader>(table);
           std::shared_ptr<arrow::RecordBatch> batch = nullptr;
   
           // filter
           if (!dataset()->filter_.empty()) {
             auto scanner_builder =
                 arrow::dataset::ScannerBuilder::FromRecordBatchReader(
                     batch_reader);
             arrow::compute::Expression filter_expr;
             TF_RETURN_IF_ERROR(
                 ArrowUtil::ParseExpression(dataset()->filter_, filter_expr));
             scanner_builder->Filter(filter_expr);
             auto scanner = scanner_builder->Finish().ValueOrDie();
             batch_reader = scanner->ToRecordBatchReader().ValueOrDie();
           }
   
           CHECK_ARROW(batch_reader->ReadNext(&batch));
           TF_RETURN_IF_ERROR(CheckBatchColumnTypes(batch));
           next_record_batches_.clear();
           while (batch != nullptr) {
             if (!background) {
               record_batches_.emplace_back(batch);
             } else {
               next_record_batches_.emplace_back(batch);
             }
             CHECK_ARROW(batch_reader->ReadNext(&batch));
           }
   
           if (background) {
             mutex_lock lk(cv_mu_);
             background_thread_finished_ = true;
             cv_.notify_all();
           }
   
           return Status::OK();
         }
   `
   
   Any other suggestions are also fine


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] LinGeLin closed issue #13403: [C++] How to speed up arrow's reading of S3 Parquet files?

Posted by GitBox <gi...@apache.org>.
LinGeLin closed issue #13403: [C++] How to speed up arrow's reading of S3 Parquet files?
URL: https://github.com/apache/arrow/issues/13403


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on issue #13403: [C++] How to speed up arrow's reading of S3 Parquet files?

Posted by GitBox <gi...@apache.org>.
pitrou commented on issue #13403:
URL: https://github.com/apache/arrow/issues/13403#issuecomment-1204880092

   @westonpace 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on issue #13403: [C++] How to speed up arrow's reading of S3 Parquet files?

Posted by GitBox <gi...@apache.org>.
westonpace commented on issue #13403:
URL: https://github.com/apache/arrow/issues/13403#issuecomment-1205693533

   S3 recommends [one concurrent read for every 85-90MB/s of bandwidth](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance-design-patterns.html).  If your aim is to saturate that 2500MB/s link then you probably want ~30 concurrent reads.
   
   In Arrow the factors that control your concurrent reads are ScanOptions::fragment_readahead, ScanOptions::batch_readahead, ScanOptions::batch_size, the size of your row groups, and the size of your I/O thread pool.
   
   Some things to try:
   
   * Are you reading all 340 columns?  If not, you could probably set a larger batch size.  I do most of my testing around 1 million rows / 20 columns.
   `scanner_builder->BatchSize(60*1024);`
   
   * What size row groups do you have in your files?  Arrow currently only reads 1 entire row group at a time.  So if each file has a single row group you will need at least 30 files to achieve max throughput (though this might use a lot of RAM if the row groups are large).
   
   * You might want to increase the fragment readahead.  It defaults to 4 files in 8.0.
   
   * You probably want to increase the size of the I/O thread pool.  It defaults to 8.
   ```
   arrow::io::SetIoThreadPoolCapacity(32);
   ```
   
   Do any of these configuration changes improve performance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on issue #13403: [C++] How to speed up arrow's reading of S3 Parquet files?

Posted by GitBox <gi...@apache.org>.
westonpace commented on issue #13403:
URL: https://github.com/apache/arrow/issues/13403#issuecomment-1206622326

   > because the dataset always core dump when the program exits
   
   Hmm, that's too bad.  Any chance you can share a reproducible example of this?
   
   > but now I read S3 parquet in a different way
   
   If you're only reading a single file and the file fits comfortably in memory then the code you posted is probably ok.  It won't overlap compute with I/O (e.g. it will fully finish the I/O before it does any compute) but if your file has only a single row group then that is inevitable at the moment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] LinGeLin commented on issue #13403: How to speed up arrow's reading of S3 Parquet files?

Posted by GitBox <gi...@apache.org>.
LinGeLin commented on issue #13403:
URL: https://github.com/apache/arrow/issues/13403#issuecomment-1161698167

   There are 340 columns in Parquet, and there are arrays of columns, and in total there are about a thousand floats per row


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org