You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by 1057445597 <10...@qq.com> on 2022/07/27 06:51:03 UTC
回复: how to filter a table to select rows
I changed my method to filter as follows, meet same core dump, it seems filter_expression_ lose efficacy,I use my func ParseExpression parse a string to Expression. then I use ReadFile to read a parquet file for ReacordBatch. The errors only occurs when I call ReadFile in the background thread. when I call ParseExpression in the background thread to create a new Expression, the error goes away。
Or if I don't use Background Threads, I won't make a mistake,the filter works and no core dump,It's very strange.
this is the way i call ReadFile in background thread:
background_worker_->Schedule(std::bind(&Iterator::ReadFile, this, current_file_idx_ + 1, true));
I should have captured filter_expression_, why doesn't it seem to work?
code:
class Iterator : public ArrowBaseIterator<Dataset> {
public:
explicit Iterator(const Params& params)
: ArrowBaseIterator<Dataset>(params) {}
private:
Status SetupStreamsLocked(Env* env)
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
if (!s3fs_) {
arrow::fs::EnsureS3Initialized();
auto s3Options = arrow::fs::S3Options::FromAccessKey(
dataset()->aws_access_key_, dataset()->aws_secret_key_);
s3Options.endpoint_override = dataset()->aws_endpoint_override_;
s3fs_ = arrow::fs::S3FileSystem::Make(s3Options).ValueOrDie();
if (!dataset()->filter_.empty()) {
std::cout << "before parse expression" << std::endl;
TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()->filter_, filter_expression_));
std::cout << "after parse expression" << std::endl;
}
}
TF_RETURN_IF_ERROR(ReadFile(current_file_idx_));
#if 0
if (current_batch_idx_ < record_batches_.size()) {
current_batch_ = record_batches_[current_batch_idx_];
}
else {
current_batch_ = nullptr;
}
#endif
#if 1
if (!background_worker_) {
background_worker_ =
std::make_shared<BackgroundWorker>(env, "download_next_worker");
}
if (current_batch_idx_ < record_batches_.size()) {
current_batch_ = record_batches_[current_batch_idx_];
}
if (current_file_idx_ + 1 < dataset()->parquet_files_.size()) {
background_worker_->Schedule(std::bind(&Iterator::ReadFile, this,
current_file_idx_ + 1, true));
}
#endif
return Status::OK();
}
Status NextStreamLocked(Env* env)
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
ArrowBaseIterator<Dataset>::NextStreamLocked(env);
if (++current_batch_idx_ < record_batches_.size()) {
current_batch_ = record_batches_[current_batch_idx_];
} else if (++current_file_idx_ < dataset()->parquet_files_.size()) {
current_batch_idx_ = 0;
#if 0
record_batches_.clear();
return SetupStreamsLocked(env);
#endif
#if 1
{
mutex_lock lk(cv_mu_);
while (!background_thread_finished_) {
cv_.wait(lk);
}
}
record_batches_.swap(next_record_batches_);
if (!record_batches_.empty()) {
current_batch_ = record_batches_[current_batch_idx_];
} else {
current_batch_ = nullptr;
}
background_thread_finished_ = false;
if (current_file_idx_ + 1 < dataset()->parquet_files_.size()) {
background_worker_->Schedule(std::bind(
&Iterator::ReadFile, this, current_file_idx_ + 1, true));
}
#endif
}
return Status::OK();
}
void ResetStreamsLocked() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
ArrowBaseIterator<Dataset>::ResetStreamsLocked();
current_file_idx_ = 0;
current_batch_idx_ = 0;
record_batches_.clear();
next_record_batches_.clear();
}
Status ReadFile(int file_index, bool background = false)
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
auto access_file =
s3fs_->OpenInputFile(dataset()->parquet_files_[file_index])
.ValueOrDie();
parquet::ArrowReaderProperties properties;
properties.set_use_threads(true);
properties.set_pre_buffer(true);
parquet::ReaderProperties parquet_properties =
parquet::default_reader_properties();
std::shared_ptr<parquet::arrow::FileReaderBuilder> builder =
std::make_shared<parquet::arrow::FileReaderBuilder>();
builder->Open(access_file, parquet_properties);
std::unique_ptr<parquet::arrow::FileReader> reader;
builder->properties(properties)->Build(&reader);
if (column_indices_.empty()) {
std::shared_ptr<arrow::Schema> schema;
reader->GetSchema(&schema);
// check column name exist
std::string err_column_names;
for (const auto& name : dataset()->column_names_) {
int fieldIndex = schema->GetFieldIndex(name);
column_indices_.push_back(fieldIndex);
if (-1 == fieldIndex) {
err_column_names = err_column_names + " " + name;
}
}
if (err_column_names.length() != 0) {
return errors::InvalidArgument("these column names don't exist: ",
err_column_names);
}
}
// Read file columns and build a table
std::shared_ptr<::arrow::Table> table;
CHECK_ARROW(reader->ReadTable(column_indices_, &table));
// Convert the table to a sequence of batches
auto tr = std::make_shared<arrow::TableBatchReader>(*table.get());
using namespace arrow::compute;
// ensure arrow::dataset node factories are in the registry
// arrow::dataset::internal::Initialize();
// execution context
ExecContext exec_context;
auto plan = ExecPlan::Make(&exec_context).ValueOrDie();
arrow::AsyncGenerator<arrow::util::optional<ExecBatch>> sink_gen;
auto source_node_options = arrow::compute::SourceNodeOptions{table->schema(),
[tr]() {
using ExecBatch = arrow::compute::ExecBatch;
using ExecBatchOptional = arrow::util::optional<ExecBatch>;
auto arrow_record_batch_result = tr->Next();
if (!arrow_record_batch_result.ok()) {
std::cout << "end1" << std::endl;
return arrow::AsyncGeneratorEnd<ExecBatchOptional>();
}
auto arrow_record_batch = std::move(*arrow_record_batch_result);
if (!arrow_record_batch) {
std::cout << "end2" << std::endl;
return arrow::AsyncGeneratorEnd<ExecBatchOptional>();
}
std::cout << "num rows: " << arrow_record_batch->num_rows() << std::endl;
return arrow::Future<ExecBatchOptional>::MakeFinished(
ExecBatch(*arrow_record_batch));
}
};
auto source_node = MakeExecNode("source", plan.get(), {}, source_node_options).ValueOrDie();
// std::cout << "filter: "<< filter_expression_.ToString() << std::endl;
// arrow::compute::Expression filter_expr;
// TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()->filter_, filter_expr));
auto filter_node = MakeExecNode("filter", plan.get(), {source_node}, FilterNodeOptions{this->filter_expression_}).ValueOrDie();
MakeExecNode("sink", plan.get(), {filter_node}, SinkNodeOptions{&sink_gen});
std::shared_ptr<::arrow::Schema> schema;
schema = table->schema();;
auto sink_reader = MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
plan->Validate();
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// // start the ExecPlan
plan->StartProducing();
std::shared_ptr<arrow::Table> response_table;
response_table = arrow::Table::FromRecordBatchReader(sink_reader.get()).ValueOrDie();
std::cout << "Results : " << response_table->ToString() << std::endl;
// // stop producing
plan->StopProducing();
// // plan mark finished
auto future = plan->finished();
auto ttr = std::make_shared<arrow::TableBatchReader>(*response_table.get());
// filter
// auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
// scanner_builder->UseThreads(false);
// if (!dataset()->filter_.empty()) {
// std::cout << filter_expression_.ToString() << std::endl;
// scanner_builder->Filter(filter_expression_);
// }
std::shared_ptr<arrow::RecordBatch> batch = nullptr;
// auto scanner = scanner_builder->Finish().ValueOrDie();
// auto batch_reader = scanner->ToRecordBatchReader().ValueOrDie();
CHECK_ARROW(ttr->ReadNext(&batch));
TF_RETURN_IF_ERROR(CheckBatchColumnTypes(batch));
next_record_batches_.clear();
while (batch != nullptr) {
if (!background) {
record_batches_.emplace_back(batch);
} else {
next_record_batches_.emplace_back(batch);
}
CHECK_ARROW(ttr->ReadNext(&batch));
}
if (background) {
mutex_lock lk(cv_mu_);
background_thread_finished_ = true;
cv_.notify_all();
}
return Status::OK();
}
size_t current_file_idx_ TF_GUARDED_BY(mu_) = 0;
size_t current_batch_idx_ TF_GUARDED_BY(mu_) = 0;
std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches_
TF_GUARDED_BY(mu_);
std::vector<std::shared_ptr<arrow::RecordBatch>> next_record_batches_
TF_GUARDED_BY(mu_);
std::shared_ptr<arrow::fs::S3FileSystem> s3fs_ TF_GUARDED_BY(mu_) =
nullptr;
std::vector<int> column_indices_ TF_GUARDED_BY(mu_);
std::shared_ptr<BackgroundWorker> background_worker_ = nullptr;
mutex cv_mu_;
condition_variable cv_;
bool background_thread_finished_ = false;
arrow::compute::Expression filter_expression_;
};
bt:
#0 0x00007ffff7e0c18b in raise () from /usr/lib/x86_64-linux-gnu/libc.so.6
#1 0x00007ffff7deb859 in abort () from /usr/lib/x86_64-linux-gnu/libc.so.6
#2 0x00007ff82929e2b0 in arrow::util::CerrLog::~CerrLog (this=0x7ffad40d40b0, __in_chrg=<optimized out>) at external/arrow/cpp/src/arrow/util/logging.cc:72
#3 0x00007ff82929e2d0 in arrow::util::CerrLog::~CerrLog (this=0x7ffad40d40b0, __in_chrg=<optimized out>) at external/arrow/cpp/src/arrow/util/logging.cc:74
#4 0x00007ff82929e0ef in arrow::util::ArrowLog::~ArrowLog (this=0x7ff804f83d60, __in_chrg=<optimized out>) at external/arrow/cpp/src/arrow/util/logging.cc:250
#5 0x00007ff828f949cb in arrow::compute::CallNotNull (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression_internal.h:40
#6 0x00007ff828f8f3e0 in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema> (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:458
#7 0x00007ff828f8f4bb in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema> (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:460
#8 0x00007ff828f8efe7 in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema> (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x0) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:441
#9 0x00007ff828f820bd in arrow::compute::Expression::Bind (this=0x7ff804f846a0, in_schema=..., exec_context=0x0) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:476
#10 0x00007ff828fc3693 in arrow::compute::(anonymous namespace)::FilterNode::Make (plan=0x7ffad40d63f0, inputs=std::vector of length 1, capacity 1 = {...}, options=...) at external/arrow/cpp/src/arrow/compute/exec/filter_node.cc:53
#11 0x00007ff828fc5b06 in std::_Function_handler<arrow::Result<arrow::compute::ExecNode*> (arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >, arrow::compute::ExecNodeOptions const&), arrow::Result<arrow::compute::ExecNode*> (*)(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >, arrow::compute::ExecNodeOptions const&)>::_M_invoke(std::_Any_data const&, arrow::compute::ExecPlan*&&, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >&&, arrow::compute::ExecNodeOptions const&) (__functor=...,
__args#0=@0x7ff804f847c8: 0x7ffad40d63f0, __args#1=..., __args#2=...) at /usr/include/c++/9/bits/std_function.h:286
#12 0x00007ff828be00d7 in std::function<arrow::Result<arrow::compute::ExecNode*> (arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >, arrow::compute::ExecNodeOptions const&)>::operator()(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >, arrow::compute::ExecNodeOptions const&) const (this=0x7ff804f848a0, __args#0=0x7ffad40d63f0, __args#1=std::vector of length 0, capacity 0, __args#2=...) at /usr/include/c++/9/bits/std_function.h:688
#13 0x00007ff828bd251c in arrow::compute::MakeExecNode (Python Exception <class 'gdb.error'> No type named class std::basic_string<char, std::char_traits<char>, std::allocator<char> >::_Rep.:
factory_name=, plan=0x7ffad40d63f0, inputs=std::vector of length 0, capacity 0, options=..., registry=0x7ff82d1ac960 <arrow::compute::default_exec_factory_registry()::instance>) at external/arrow/cpp/src/arrow/compute/exec/exec_plan.h:360
#14 0x00007ff828bde6e2 in tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile (this=0x18471300, file_index=1, background=true) at tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc:1248
#15 0x00007ff828bedcbc in std::__invoke_impl<tensorflow::Status, tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&, unsigned long&, bool&> (__f=
@0x19161d80: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828bdddfa <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)>, __t=@0x19161da0: 0x18471300)
at /usr/include/c++/9/bits/invoke.h:73
#16 0x00007ff828bec6b2 in std::__invoke<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&, unsigned long&, bool&> (__fn=
@0x19161d80: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828bdddfa <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)>) at /usr/include/c++/9/bits/invoke.h:96
#17 0x00007ff828beadb2 in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)>::__call<tensorflow::Status, , 0ul, 1ul, 2ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul, 2ul>) (this=0x19161d80,
__args=...) at /usr/include/c++/9/functional:402
#18 0x00007ff828be916c in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)>::operator()<, tensorflow::Status>() (this=0x19161d80) at /usr/include/c++/9/functional:484
#19 0x00007ff828be6633 in std::_Function_handler<void (), std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)> >::_M_invoke(std::_Any_data const&) (__functor=...)
at /usr/include/c++/9/bits/std_function.h:300
#20 0x00007ffd92bd68ea in tensorflow::data::BackgroundWorker::WorkerLoop() () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#21 0x00007ffd9338f5d8 in tensorflow::(anonymous namespace)::PThread::ThreadFn(void*) () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#22 0x00007ffff7dac609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#23 0x00007ffff7ee8293 in clone () from /usr/lib/x86_64-linux-gnu/libc.so.6
1057445597
1057445597@qq.com
------------------ 原始邮件 ------------------
发件人: "user" <krassovskysasha@gmail.com>;
发送时间: 2022年7月27日(星期三) 凌晨0:57
收件人: "user"<user@arrow.apache.org>;
主题: Re: how to filter a table to select rows
Hi 1057445597,Could you provide more information about your core dump? What backtrace does it give? I notice you’re not checking the Status returned by scanner_builder->Filter. That could be a place to start.
Sasha Krassovsky
On Jul 26, 2022, at 8:27 AM, Aldrin <akmontan@ucsc.edu> wrote:
You can create an InMemoryDataset from a RecordBatch. See [1] for docs and [2] for example code. You may be able to find something similar for filtering tables.
[1]: https://arrow.apache.org/docs/cpp/api/dataset.html#_CPPv4N5arrow7dataset15InMemoryDataset15InMemoryDatasetENSt10shared_ptrI6SchemaEE17RecordBatchVector
[2]: https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/mainline/src/cpp/processing/operators.cpp#L50
Aldrin Montana
Computer Science PhD Student
UC Santa Cruz
On Mon, Jul 25, 2022 at 8:49 PM 1057445597 <1057445597@qq.com> wrote:
I use the follows code to filter table, but always core dump at scanner_builder->Filter(filter_expression_). Is there a better way to filter a table? or a Recordbatch?
by the way dataset::ScannerBuilder always core dump when I used it in tfio to create a tensorflow dataset, It's most likely buggy
// Read file columns and build a table
std::shared_ptr<::arrow::Table> table;
CHECK_ARROW(reader->ReadTable(column_indices_, &table));
// Convert the table to a sequence of batches
auto tr = std::make_shared<arrow::TableBatchReader>(*table.get());
// filter
auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
if (!dataset()->filter_.empty()) {
std::cout << filter_expression_.ToString() << std::endl;
scanner_builder->Filter(filter_expression_);
}
1057445597
1057445597@qq.com
回复: how to filter a table to select rows
Posted by 1057445597 <10...@qq.com>.
when I use ScannerBuilder, :
code:
// filter
auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
if (!dataset()->filter_.empty()) {
// TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()->filter_, filter_expression_));
scanner_builder->Filter(filter_expression_);
}
std::shared_ptr<arrow::RecordBatch> batch = nullptr;
auto scanner = scanner_builder->Finish().ValueOrDie();
auto batch_reader = scanner->ToRecordBatchReader().ValueOrDie();
// CHECK_ARROW(ttr->ReadNext(&batch));
CHECK_ARROW(batch_reader->ReadNext(&batch));
TF_RETURN_IF_ERROR(CheckBatchColumnTypes(batch));
next_record_batches_.clear();
while (batch != nullptr) {
if (!background) {
record_batches_.emplace_back(batch);
} else {
next_record_batches_.emplace_back(batch);
}
CHECK_ARROW(batch_reader->ReadNext(&batch));
}
(gdb) bt
#0 0x00007ffff7e0c18b in raise () from /usr/lib/x86_64-linux-gnu/libc.so.6
#1 0x00007ffff7deb859 in abort () from /usr/lib/x86_64-linux-gnu/libc.so.6
#2 0x00007ff8292531a2 in arrow::util::CerrLog::~CerrLog (this=0x7ffae00d6540, __in_chrg=<optimized out>) at external/arrow/cpp/src/arrow/util/logging.cc:72
#3 0x00007ff8292531c2 in arrow::util::CerrLog::~CerrLog (this=0x7ffae00d6540, __in_chrg=<optimized out>) at external/arrow/cpp/src/arrow/util/logging.cc:74
#4 0x00007ff829252fe1 in arrow::util::ArrowLog::~ArrowLog (this=0x7ff804dde7f0, __in_chrg=<optimized out>) at external/arrow/cpp/src/arrow/util/logging.cc:250
#5 0x00007ff828e1a83f in arrow::compute::CallNotNull (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression_internal.h:40
#6 0x00007ff828e0a081 in arrow::compute::FieldsInExpression (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:638
#7 0x00007ff828e0a0ec in arrow::compute::FieldsInExpression (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:639
#8 0x00007ff828fce6da in arrow::dataset::ScannerBuilder::Filter (this=0x7ffae00d43e0, filter=...) at external/arrow/cpp/src/arrow/dataset/scanner.cc:805
#9 0x00007ff828a9d8ce in tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile (this=0x18471320, file_index=1, background=true) at tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc:1278
#10 0x00007ff828aa9b20 in std::__invoke_impl<tensorflow::Status, tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&, unsigned long&, bool&> (__f=
@0x1943bfd0: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828a9d2b6 <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)>, __t=@0x1943bff0: 0x18471320)
at /usr/include/c++/9/bits/invoke.h:73
#11 0x00007ff828aa8830 in std::__invoke<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&, unsigned long&, bool&> (__fn=
@0x1943bfd0: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828a9d2b6 <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)>) at /usr/include/c++/9/bits/invoke.h:96
#12 0x00007ff828aa7428 in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)>::__call<tensorflow::Status, , 0ul, 1ul, 2ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul, 2ul>) (this=0x1943bfd0,
__args=...) at /usr/include/c++/9/functional:402
#13 0x00007ff828aa6012 in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)>::operator()<, tensorflow::Status>() (this=0x1943bfd0) at /usr/include/c++/9/functional:484
#14 0x00007ff828aa3e93 in std::_Function_handler<void (), std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)> >::_M_invoke(std::_Any_data const&) (__functor=...)
at /usr/include/c++/9/bits/std_function.h:300
#15 0x00007ffd92bd68ea in tensorflow::data::BackgroundWorker::WorkerLoop() () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#16 0x00007ffd9338f5d8 in tensorflow::(anonymous namespace)::PThread::ThreadFn(void*) () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#17 0x00007ffff7dac609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#18 0x00007ffff7ee8293 in clone () from /usr/lib/x86_64-linux-gnu/libc.so.6
1057445597
1057445597@qq.com
------------------ 原始邮件 ------------------
发件人: "user" <1057445597@qq.com>;
发送时间: 2022年7月27日(星期三) 下午2:51
收件人: "user"<user@arrow.apache.org>;
主题: 回复: how to filter a table to select rows
I changed my method to filter as follows, meet same core dump, it seems filter_expression_ lose efficacy,I use my func ParseExpression parse a string to Expression. then I use ReadFile to read a parquet file for ReacordBatch. The errors only occurs when I call ReadFile in the background thread. when I call ParseExpression in the background thread to create a new Expression, the error goes away。
Or if I don't use Background Threads, I won't make a mistake,the filter works and no core dump,It's very strange.
this is the way i call ReadFile in background thread:
background_worker_->Schedule(std::bind(&Iterator::ReadFile, this, current_file_idx_ + 1, true));
I should have captured filter_expression_, why doesn't it seem to work?
code:
class Iterator : public ArrowBaseIterator<Dataset> {
public:
explicit Iterator(const Params& params)
: ArrowBaseIterator<Dataset>(params) {}
private:
Status SetupStreamsLocked(Env* env)
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
if (!s3fs_) {
arrow::fs::EnsureS3Initialized();
auto s3Options = arrow::fs::S3Options::FromAccessKey(
dataset()->aws_access_key_, dataset()->aws_secret_key_);
s3Options.endpoint_override = dataset()->aws_endpoint_override_;
s3fs_ = arrow::fs::S3FileSystem::Make(s3Options).ValueOrDie();
if (!dataset()->filter_.empty()) {
std::cout << "before parse expression" << std::endl;
TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()->filter_, filter_expression_));
std::cout << "after parse expression" << std::endl;
}
}
TF_RETURN_IF_ERROR(ReadFile(current_file_idx_));
#if 0
if (current_batch_idx_ < record_batches_.size()) {
current_batch_ = record_batches_[current_batch_idx_];
}
else {
current_batch_ = nullptr;
}
#endif
#if 1
if (!background_worker_) {
background_worker_ =
std::make_shared<BackgroundWorker>(env, "download_next_worker");
}
if (current_batch_idx_ < record_batches_.size()) {
current_batch_ = record_batches_[current_batch_idx_];
}
if (current_file_idx_ + 1 < dataset()->parquet_files_.size()) {
background_worker_->Schedule(std::bind(&Iterator::ReadFile, this,
current_file_idx_ + 1, true));
}
#endif
return Status::OK();
}
Status NextStreamLocked(Env* env)
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
ArrowBaseIterator<Dataset>::NextStreamLocked(env);
if (++current_batch_idx_ < record_batches_.size()) {
current_batch_ = record_batches_[current_batch_idx_];
} else if (++current_file_idx_ < dataset()->parquet_files_.size()) {
current_batch_idx_ = 0;
#if 0
record_batches_.clear();
return SetupStreamsLocked(env);
#endif
#if 1
{
mutex_lock lk(cv_mu_);
while (!background_thread_finished_) {
cv_.wait(lk);
}
}
record_batches_.swap(next_record_batches_);
if (!record_batches_.empty()) {
current_batch_ = record_batches_[current_batch_idx_];
} else {
current_batch_ = nullptr;
}
background_thread_finished_ = false;
if (current_file_idx_ + 1 < dataset()->parquet_files_.size()) {
background_worker_->Schedule(std::bind(
&Iterator::ReadFile, this, current_file_idx_ + 1, true));
}
#endif
}
return Status::OK();
}
void ResetStreamsLocked() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
ArrowBaseIterator<Dataset>::ResetStreamsLocked();
current_file_idx_ = 0;
current_batch_idx_ = 0;
record_batches_.clear();
next_record_batches_.clear();
}
Status ReadFile(int file_index, bool background = false)
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
auto access_file =
s3fs_->OpenInputFile(dataset()->parquet_files_[file_index])
.ValueOrDie();
parquet::ArrowReaderProperties properties;
properties.set_use_threads(true);
properties.set_pre_buffer(true);
parquet::ReaderProperties parquet_properties =
parquet::default_reader_properties();
std::shared_ptr<parquet::arrow::FileReaderBuilder> builder =
std::make_shared<parquet::arrow::FileReaderBuilder>();
builder->Open(access_file, parquet_properties);
std::unique_ptr<parquet::arrow::FileReader> reader;
builder->properties(properties)->Build(&reader);
if (column_indices_.empty()) {
std::shared_ptr<arrow::Schema> schema;
reader->GetSchema(&schema);
// check column name exist
std::string err_column_names;
for (const auto& name : dataset()->column_names_) {
int fieldIndex = schema->GetFieldIndex(name);
column_indices_.push_back(fieldIndex);
if (-1 == fieldIndex) {
err_column_names = err_column_names + " " + name;
}
}
if (err_column_names.length() != 0) {
return errors::InvalidArgument("these column names don't exist: ",
err_column_names);
}
}
// Read file columns and build a table
std::shared_ptr<::arrow::Table> table;
CHECK_ARROW(reader->ReadTable(column_indices_, &table));
// Convert the table to a sequence of batches
auto tr = std::make_shared<arrow::TableBatchReader>(*table.get());
using namespace arrow::compute;
// ensure arrow::dataset node factories are in the registry
// arrow::dataset::internal::Initialize();
// execution context
ExecContext exec_context;
auto plan = ExecPlan::Make(&exec_context).ValueOrDie();
arrow::AsyncGenerator<arrow::util::optional<ExecBatch>> sink_gen;
auto source_node_options = arrow::compute::SourceNodeOptions{table->schema(),
[tr]() {
using ExecBatch = arrow::compute::ExecBatch;
using ExecBatchOptional = arrow::util::optional<ExecBatch>;
auto arrow_record_batch_result = tr->Next();
if (!arrow_record_batch_result.ok()) {
std::cout << "end1" << std::endl;
return arrow::AsyncGeneratorEnd<ExecBatchOptional>();
}
auto arrow_record_batch = std::move(*arrow_record_batch_result);
if (!arrow_record_batch) {
std::cout << "end2" << std::endl;
return arrow::AsyncGeneratorEnd<ExecBatchOptional>();
}
std::cout << "num rows: " << arrow_record_batch->num_rows() << std::endl;
return arrow::Future<ExecBatchOptional>::MakeFinished(
ExecBatch(*arrow_record_batch));
}
};
auto source_node = MakeExecNode("source", plan.get(), {}, source_node_options).ValueOrDie();
// std::cout << "filter: "<< filter_expression_.ToString() << std::endl;
// arrow::compute::Expression filter_expr;
// TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()->filter_, filter_expr));
auto filter_node = MakeExecNode("filter", plan.get(), {source_node}, FilterNodeOptions{this->filter_expression_}).ValueOrDie();
MakeExecNode("sink", plan.get(), {filter_node}, SinkNodeOptions{&sink_gen});
std::shared_ptr<::arrow::Schema> schema;
schema = table->schema();;
auto sink_reader = MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
plan->Validate();
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// // start the ExecPlan
plan->StartProducing();
std::shared_ptr<arrow::Table> response_table;
response_table = arrow::Table::FromRecordBatchReader(sink_reader.get()).ValueOrDie();
std::cout << "Results : " << response_table->ToString() << std::endl;
// // stop producing
plan->StopProducing();
// // plan mark finished
auto future = plan->finished();
auto ttr = std::make_shared<arrow::TableBatchReader>(*response_table.get());
// filter
// auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
// scanner_builder->UseThreads(false);
// if (!dataset()->filter_.empty()) {
// std::cout << filter_expression_.ToString() << std::endl;
// scanner_builder->Filter(filter_expression_);
// }
std::shared_ptr<arrow::RecordBatch> batch = nullptr;
// auto scanner = scanner_builder->Finish().ValueOrDie();
// auto batch_reader = scanner->ToRecordBatchReader().ValueOrDie();
CHECK_ARROW(ttr->ReadNext(&batch));
TF_RETURN_IF_ERROR(CheckBatchColumnTypes(batch));
next_record_batches_.clear();
while (batch != nullptr) {
if (!background) {
record_batches_.emplace_back(batch);
} else {
next_record_batches_.emplace_back(batch);
}
CHECK_ARROW(ttr->ReadNext(&batch));
}
if (background) {
mutex_lock lk(cv_mu_);
background_thread_finished_ = true;
cv_.notify_all();
}
return Status::OK();
}
size_t current_file_idx_ TF_GUARDED_BY(mu_) = 0;
size_t current_batch_idx_ TF_GUARDED_BY(mu_) = 0;
std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches_
TF_GUARDED_BY(mu_);
std::vector<std::shared_ptr<arrow::RecordBatch>> next_record_batches_
TF_GUARDED_BY(mu_);
std::shared_ptr<arrow::fs::S3FileSystem> s3fs_ TF_GUARDED_BY(mu_) =
nullptr;
std::vector<int> column_indices_ TF_GUARDED_BY(mu_);
std::shared_ptr<BackgroundWorker> background_worker_ = nullptr;
mutex cv_mu_;
condition_variable cv_;
bool background_thread_finished_ = false;
arrow::compute::Expression filter_expression_;
};
bt:
#0 0x00007ffff7e0c18b in raise () from /usr/lib/x86_64-linux-gnu/libc.so.6
#1 0x00007ffff7deb859 in abort () from /usr/lib/x86_64-linux-gnu/libc.so.6
#2 0x00007ff82929e2b0 in arrow::util::CerrLog::~CerrLog (this=0x7ffad40d40b0, __in_chrg=<optimized out>) at external/arrow/cpp/src/arrow/util/logging.cc:72
#3 0x00007ff82929e2d0 in arrow::util::CerrLog::~CerrLog (this=0x7ffad40d40b0, __in_chrg=<optimized out>) at external/arrow/cpp/src/arrow/util/logging.cc:74
#4 0x00007ff82929e0ef in arrow::util::ArrowLog::~ArrowLog (this=0x7ff804f83d60, __in_chrg=<optimized out>) at external/arrow/cpp/src/arrow/util/logging.cc:250
#5 0x00007ff828f949cb in arrow::compute::CallNotNull (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression_internal.h:40
#6 0x00007ff828f8f3e0 in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema> (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:458
#7 0x00007ff828f8f4bb in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema> (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:460
#8 0x00007ff828f8efe7 in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema> (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x0) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:441
#9 0x00007ff828f820bd in arrow::compute::Expression::Bind (this=0x7ff804f846a0, in_schema=..., exec_context=0x0) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:476
#10 0x00007ff828fc3693 in arrow::compute::(anonymous namespace)::FilterNode::Make (plan=0x7ffad40d63f0, inputs=std::vector of length 1, capacity 1 = {...}, options=...) at external/arrow/cpp/src/arrow/compute/exec/filter_node.cc:53
#11 0x00007ff828fc5b06 in std::_Function_handler<arrow::Result<arrow::compute::ExecNode*> (arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >, arrow::compute::ExecNodeOptions const&), arrow::Result<arrow::compute::ExecNode*> (*)(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >, arrow::compute::ExecNodeOptions const&)>::_M_invoke(std::_Any_data const&, arrow::compute::ExecPlan*&&, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >&&, arrow::compute::ExecNodeOptions const&) (__functor=...,
__args#0=@0x7ff804f847c8: 0x7ffad40d63f0, __args#1=..., __args#2=...) at /usr/include/c++/9/bits/std_function.h:286
#12 0x00007ff828be00d7 in std::function<arrow::Result<arrow::compute::ExecNode*> (arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >, arrow::compute::ExecNodeOptions const&)>::operator()(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*> >, arrow::compute::ExecNodeOptions const&) const (this=0x7ff804f848a0, __args#0=0x7ffad40d63f0, __args#1=std::vector of length 0, capacity 0, __args#2=...) at /usr/include/c++/9/bits/std_function.h:688
#13 0x00007ff828bd251c in arrow::compute::MakeExecNode (Python Exception <class 'gdb.error'> No type named class std::basic_string<char, std::char_traits<char>, std::allocator<char> >::_Rep.:
factory_name=, plan=0x7ffad40d63f0, inputs=std::vector of length 0, capacity 0, options=..., registry=0x7ff82d1ac960 <arrow::compute::default_exec_factory_registry()::instance>) at external/arrow/cpp/src/arrow/compute/exec/exec_plan.h:360
#14 0x00007ff828bde6e2 in tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile (this=0x18471300, file_index=1, background=true) at tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc:1248
#15 0x00007ff828bedcbc in std::__invoke_impl<tensorflow::Status, tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&, unsigned long&, bool&> (__f=
@0x19161d80: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828bdddfa <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)>, __t=@0x19161da0: 0x18471300)
at /usr/include/c++/9/bits/invoke.h:73
#16 0x00007ff828bec6b2 in std::__invoke<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&, unsigned long&, bool&> (__fn=
@0x19161d80: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828bdddfa <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)>) at /usr/include/c++/9/bits/invoke.h:96
#17 0x00007ff828beadb2 in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)>::__call<tensorflow::Status, , 0ul, 1ul, 2ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul, 2ul>) (this=0x19161d80,
__args=...) at /usr/include/c++/9/functional:402
#18 0x00007ff828be916c in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)>::operator()<, tensorflow::Status>() (this=0x19161d80) at /usr/include/c++/9/functional:484
#19 0x00007ff828be6633 in std::_Function_handler<void (), std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)> >::_M_invoke(std::_Any_data const&) (__functor=...)
at /usr/include/c++/9/bits/std_function.h:300
#20 0x00007ffd92bd68ea in tensorflow::data::BackgroundWorker::WorkerLoop() () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#21 0x00007ffd9338f5d8 in tensorflow::(anonymous namespace)::PThread::ThreadFn(void*) () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#22 0x00007ffff7dac609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#23 0x00007ffff7ee8293 in clone () from /usr/lib/x86_64-linux-gnu/libc.so.6
1057445597
1057445597@qq.com
------------------ 原始邮件 ------------------
发件人: "user" <krassovskysasha@gmail.com>;
发送时间: 2022年7月27日(星期三) 凌晨0:57
收件人: "user"<user@arrow.apache.org>;
主题: Re: how to filter a table to select rows
Hi 1057445597,Could you provide more information about your core dump? What backtrace does it give? I notice you’re not checking the Status returned by scanner_builder->Filter. That could be a place to start.
Sasha Krassovsky
On Jul 26, 2022, at 8:27 AM, Aldrin <akmontan@ucsc.edu> wrote:
You can create an InMemoryDataset from a RecordBatch. See [1] for docs and [2] for example code. You may be able to find something similar for filtering tables.
[1]: https://arrow.apache.org/docs/cpp/api/dataset.html#_CPPv4N5arrow7dataset15InMemoryDataset15InMemoryDatasetENSt10shared_ptrI6SchemaEE17RecordBatchVector
[2]: https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/mainline/src/cpp/processing/operators.cpp#L50
Aldrin Montana
Computer Science PhD Student
UC Santa Cruz
On Mon, Jul 25, 2022 at 8:49 PM 1057445597 <1057445597@qq.com> wrote:
I use the follows code to filter table, but always core dump at scanner_builder->Filter(filter_expression_). Is there a better way to filter a table? or a Recordbatch?
by the way dataset::ScannerBuilder always core dump when I used it in tfio to create a tensorflow dataset, It's most likely buggy
// Read file columns and build a table
std::shared_ptr<::arrow::Table> table;
CHECK_ARROW(reader->ReadTable(column_indices_, &table));
// Convert the table to a sequence of batches
auto tr = std::make_shared<arrow::TableBatchReader>(*table.get());
// filter
auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
if (!dataset()->filter_.empty()) {
std::cout << filter_expression_.ToString() << std::endl;
scanner_builder->Filter(filter_expression_);
}
1057445597
1057445597@qq.com