You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by 1057445597 <10...@qq.com> on 2022/07/27 06:51:03 UTC

回复: how to filter a table to select rows

I changed my method to filter as follows, meet same core dump, it seems filter_expression_ lo​se efficacy,I use my func&nbsp;ParseExpression ​parse a string to Expression. then I use ReadFile to read a parquet file for ReacordBatch. The errors only occurs when I call ReadFile in the background thread. when I call ParseExpression in the background thread to create a new Expression, the error goes away。
Or if I don't use Background Threads, I won't make a mistake,the filter works and no core dump,It's very strange.




this is the way i call ReadFile in background thread:
background_worker_-&gt;Schedule(std::bind(&amp;Iterator::ReadFile, this, current_file_idx_ + 1, true));
I should have captured filter_expression_, why doesn't it seem to work?



  




code:


    class Iterator : public ArrowBaseIterator<Dataset&gt; {
     public:
      explicit Iterator(const Params&amp; params)
          : ArrowBaseIterator<Dataset&gt;(params) {}

     private:
      Status SetupStreamsLocked(Env* env)
          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
        if (!s3fs_) {
          arrow::fs::EnsureS3Initialized();
          auto s3Options = arrow::fs::S3Options::FromAccessKey(
              dataset()-&gt;aws_access_key_, dataset()-&gt;aws_secret_key_);
          s3Options.endpoint_override = dataset()-&gt;aws_endpoint_override_;
          s3fs_ = arrow::fs::S3FileSystem::Make(s3Options).ValueOrDie();
          if (!dataset()-&gt;filter_.empty()) {
            std::cout << "before parse expression" << std::endl;
            TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()-&gt;filter_, filter_expression_));
            std::cout << "after parse expression" << std::endl;
          }
        }
        TF_RETURN_IF_ERROR(ReadFile(current_file_idx_));
#if 0
        if (current_batch_idx_ < record_batches_.size()) {
          current_batch_ = record_batches_[current_batch_idx_];
        }
        else {
          current_batch_ = nullptr;
        }
#endif

#if 1
        if (!background_worker_) {
          background_worker_ =
              std::make_shared<BackgroundWorker&gt;(env, "download_next_worker");
        }

        if (current_batch_idx_ < record_batches_.size()) {
          current_batch_ = record_batches_[current_batch_idx_];
        }

        if (current_file_idx_ + 1 < dataset()-&gt;parquet_files_.size()) {
          background_worker_-&gt;Schedule(std::bind(&amp;Iterator::ReadFile, this,
                                                 current_file_idx_ + 1, true));
        }
#endif
        return Status::OK();
      }

      Status NextStreamLocked(Env* env)
          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
        ArrowBaseIterator<Dataset&gt;::NextStreamLocked(env);
        if (++current_batch_idx_ < record_batches_.size()) {
          current_batch_ = record_batches_[current_batch_idx_];
        } else if (++current_file_idx_ < dataset()-&gt;parquet_files_.size()) {
          current_batch_idx_ = 0;

#if 0
          record_batches_.clear();
          return SetupStreamsLocked(env);
#endif

#if 1
          {
            mutex_lock lk(cv_mu_);
            while (!background_thread_finished_) {
              cv_.wait(lk);
            }
          }

          record_batches_.swap(next_record_batches_);
          if (!record_batches_.empty()) {
            current_batch_ = record_batches_[current_batch_idx_];
          } else {
            current_batch_ = nullptr;
          }
          background_thread_finished_ = false;
          if (current_file_idx_ + 1 < dataset()-&gt;parquet_files_.size()) {
            background_worker_-&gt;Schedule(std::bind(
                &amp;Iterator::ReadFile, this, current_file_idx_ + 1, true));
          }
#endif
        }
        return Status::OK();
      }

      void ResetStreamsLocked() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
        ArrowBaseIterator<Dataset&gt;::ResetStreamsLocked();
        current_file_idx_ = 0;
        current_batch_idx_ = 0;
        record_batches_.clear();
        next_record_batches_.clear();
      }

      Status ReadFile(int file_index, bool background = false)
          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
        auto access_file =
            s3fs_-&gt;OpenInputFile(dataset()-&gt;parquet_files_[file_index])
                .ValueOrDie();

        parquet::ArrowReaderProperties properties;
        properties.set_use_threads(true);
        properties.set_pre_buffer(true);
        parquet::ReaderProperties parquet_properties =
            parquet::default_reader_properties();

        std::shared_ptr<parquet::arrow::FileReaderBuilder&gt; builder =
            std::make_shared<parquet::arrow::FileReaderBuilder&gt;();
        builder-&gt;Open(access_file, parquet_properties);

        std::unique_ptr<parquet::arrow::FileReader&gt; reader;
        builder-&gt;properties(properties)-&gt;Build(&amp;reader);

        if (column_indices_.empty()) {
          std::shared_ptr<arrow::Schema&gt; schema;
          reader-&gt;GetSchema(&amp;schema);
          // check column name exist
          std::string err_column_names;
          for (const auto&amp; name : dataset()-&gt;column_names_) {
            int fieldIndex = schema-&gt;GetFieldIndex(name);
            column_indices_.push_back(fieldIndex);
            if (-1 == fieldIndex) {
              err_column_names = err_column_names + " " + name;
            }
          }

          if (err_column_names.length() != 0) {
            return errors::InvalidArgument("these column names don't exist: ",
                                           err_column_names);
          }
        }
        // Read file columns and build a table
        std::shared_ptr<::arrow::Table&gt; table;
        CHECK_ARROW(reader-&gt;ReadTable(column_indices_, &amp;table));
        // Convert the table to a sequence of batches
        auto tr = std::make_shared<arrow::TableBatchReader&gt;(*table.get());

        using namespace arrow::compute;
        // ensure arrow::dataset node factories are in the registry
        // arrow::dataset::internal::Initialize();
        // execution context
        ExecContext exec_context;
        auto plan = ExecPlan::Make(&amp;exec_context).ValueOrDie();
        arrow::AsyncGenerator<arrow::util::optional<ExecBatch&gt;&gt; sink_gen;
        auto source_node_options = arrow::compute::SourceNodeOptions{table-&gt;schema(), 
                              [tr]() {
                              using ExecBatch = arrow::compute::ExecBatch;
                              using ExecBatchOptional = arrow::util::optional<ExecBatch&gt;;
                              auto arrow_record_batch_result = tr-&gt;Next();
                              if (!arrow_record_batch_result.ok()) {
                                std::cout << "end1" << std::endl;
                                return arrow::AsyncGeneratorEnd<ExecBatchOptional&gt;();
                              }
                              auto arrow_record_batch = std::move(*arrow_record_batch_result);
                              if (!arrow_record_batch) {
                                std::cout << "end2" << std::endl;
                                return arrow::AsyncGeneratorEnd<ExecBatchOptional&gt;();
                              }
                              std::cout << "num rows: " << arrow_record_batch-&gt;num_rows() << std::endl;
                              return arrow::Future<ExecBatchOptional&gt;::MakeFinished(
                                ExecBatch(*arrow_record_batch));
                            }
                          };
        auto source_node = MakeExecNode("source", plan.get(), {}, source_node_options).ValueOrDie();
        // std::cout << "filter:   "<< filter_expression_.ToString() << std::endl;
        // arrow::compute::Expression filter_expr;

        // TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()-&gt;filter_, filter_expr));

        auto filter_node = MakeExecNode("filter", plan.get(), {source_node}, FilterNodeOptions{this-&gt;filter_expression_}).ValueOrDie();

        MakeExecNode("sink", plan.get(), {filter_node}, SinkNodeOptions{&amp;sink_gen});

        std::shared_ptr<::arrow::Schema&gt; schema;
        schema = table-&gt;schema();;
        auto sink_reader =  MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
          plan-&gt;Validate();
        std::cout << "ExecPlan created : " << plan-&gt;ToString() << std::endl;
        // // start the ExecPlan
        plan-&gt;StartProducing();

        std::shared_ptr<arrow::Table&gt; response_table;

        response_table = arrow::Table::FromRecordBatchReader(sink_reader.get()).ValueOrDie();

        std::cout << "Results : " << response_table-&gt;ToString() << std::endl;

        // // stop producing
        plan-&gt;StopProducing();
        // // plan mark finished
        auto future = plan-&gt;finished();

        auto ttr = std::make_shared<arrow::TableBatchReader&gt;(*response_table.get());

        // filter
        // auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
        // scanner_builder-&gt;UseThreads(false);
        // if (!dataset()-&gt;filter_.empty()) {
        //   std::cout << filter_expression_.ToString() << std::endl;
        //   scanner_builder-&gt;Filter(filter_expression_);
        // }
        std::shared_ptr<arrow::RecordBatch&gt; batch = nullptr;
        // auto scanner = scanner_builder-&gt;Finish().ValueOrDie();
        // auto batch_reader = scanner-&gt;ToRecordBatchReader().ValueOrDie();
        CHECK_ARROW(ttr-&gt;ReadNext(&amp;batch));
        TF_RETURN_IF_ERROR(CheckBatchColumnTypes(batch));
        next_record_batches_.clear();
        while (batch != nullptr) {
          if (!background) {
            record_batches_.emplace_back(batch);
          } else {
            next_record_batches_.emplace_back(batch);
          }
          CHECK_ARROW(ttr-&gt;ReadNext(&amp;batch));
        }

        if (background) {
          mutex_lock lk(cv_mu_);
          background_thread_finished_ = true;
          cv_.notify_all();
        }

        return Status::OK();
      }

      size_t current_file_idx_ TF_GUARDED_BY(mu_) = 0;
      size_t current_batch_idx_ TF_GUARDED_BY(mu_) = 0;
      std::vector<std::shared_ptr<arrow::RecordBatch&gt;&gt; record_batches_
          TF_GUARDED_BY(mu_);
      std::vector<std::shared_ptr<arrow::RecordBatch&gt;&gt; next_record_batches_
          TF_GUARDED_BY(mu_);
      std::shared_ptr<arrow::fs::S3FileSystem&gt; s3fs_ TF_GUARDED_BY(mu_) =
          nullptr;
      std::vector<int&gt; column_indices_ TF_GUARDED_BY(mu_);
      std::shared_ptr<BackgroundWorker&gt; background_worker_ = nullptr;
      mutex cv_mu_;
      condition_variable cv_;
      bool background_thread_finished_ = false;
      arrow::compute::Expression filter_expression_;
    };





bt:


#0&nbsp; 0x00007ffff7e0c18b in raise () from /usr/lib/x86_64-linux-gnu/libc.so.6
#1&nbsp; 0x00007ffff7deb859 in abort () from /usr/lib/x86_64-linux-gnu/libc.so.6
#2&nbsp; 0x00007ff82929e2b0 in arrow::util::CerrLog::~CerrLog (this=0x7ffad40d40b0, __in_chrg=<optimized out&gt;) at external/arrow/cpp/src/arrow/util/logging.cc:72
#3&nbsp; 0x00007ff82929e2d0 in arrow::util::CerrLog::~CerrLog (this=0x7ffad40d40b0, __in_chrg=<optimized out&gt;) at external/arrow/cpp/src/arrow/util/logging.cc:74
#4&nbsp; 0x00007ff82929e0ef in arrow::util::ArrowLog::~ArrowLog (this=0x7ff804f83d60, __in_chrg=<optimized out&gt;) at external/arrow/cpp/src/arrow/util/logging.cc:250
#5&nbsp; 0x00007ff828f949cb in arrow::compute::CallNotNull (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression_internal.h:40
#6&nbsp; 0x00007ff828f8f3e0 in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema&gt; (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:458
#7&nbsp; 0x00007ff828f8f4bb in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema&gt; (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:460
#8&nbsp; 0x00007ff828f8efe7 in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema&gt; (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x0) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:441
#9&nbsp; 0x00007ff828f820bd in arrow::compute::Expression::Bind (this=0x7ff804f846a0, in_schema=..., exec_context=0x0) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:476
#10 0x00007ff828fc3693 in arrow::compute::(anonymous namespace)::FilterNode::Make (plan=0x7ffad40d63f0, inputs=std::vector of length 1, capacity 1 = {...}, options=...) at external/arrow/cpp/src/arrow/compute/exec/filter_node.cc:53
#11 0x00007ff828fc5b06 in std::_Function_handler<arrow::Result<arrow::compute::ExecNode*&gt; (arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;, arrow::compute::ExecNodeOptions const&amp;), arrow::Result<arrow::compute::ExecNode*&gt; (*)(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;, arrow::compute::ExecNodeOptions const&amp;)&gt;::_M_invoke(std::_Any_data const&amp;, arrow::compute::ExecPlan*&amp;&amp;, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;&amp;&amp;, arrow::compute::ExecNodeOptions const&amp;) (__functor=...,&nbsp;
&nbsp; &nbsp; __args#0=@0x7ff804f847c8: 0x7ffad40d63f0, __args#1=..., __args#2=...) at /usr/include/c++/9/bits/std_function.h:286
#12 0x00007ff828be00d7 in std::function<arrow::Result<arrow::compute::ExecNode*&gt; (arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;, arrow::compute::ExecNodeOptions const&amp;)&gt;::operator()(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;, arrow::compute::ExecNodeOptions const&amp;) const (this=0x7ff804f848a0, __args#0=0x7ffad40d63f0, __args#1=std::vector of length 0, capacity 0, __args#2=...) at /usr/include/c++/9/bits/std_function.h:688
#13 0x00007ff828bd251c in arrow::compute::MakeExecNode (Python Exception <class 'gdb.error'&gt; No type named class std::basic_string<char, std::char_traits<char&gt;, std::allocator<char&gt; &gt;::_Rep.:&nbsp;
factory_name=, plan=0x7ffad40d63f0, inputs=std::vector of length 0, capacity 0, options=..., registry=0x7ff82d1ac960 <arrow::compute::default_exec_factory_registry()::instance&gt;) at external/arrow/cpp/src/arrow/compute/exec/exec_plan.h:360
#14 0x00007ff828bde6e2 in tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile (this=0x18471300, file_index=1, background=true) at tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc:1248
#15 0x00007ff828bedcbc in std::__invoke_impl<tensorflow::Status, tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&amp;)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&amp;, unsigned long&amp;, bool&amp;&gt; (__f=
&nbsp; &nbsp; @0x19161d80: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828bdddfa <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)&gt;, __t=@0x19161da0: 0x18471300)
&nbsp; &nbsp; at /usr/include/c++/9/bits/invoke.h:73
#16 0x00007ff828bec6b2 in std::__invoke<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&amp;)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&amp;, unsigned long&amp;, bool&amp;&gt; (__fn=
&nbsp; &nbsp; @0x19161d80: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828bdddfa <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)&gt;) at /usr/include/c++/9/bits/invoke.h:96
#17 0x00007ff828beadb2 in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)&gt;::__call<tensorflow::Status, , 0ul, 1ul, 2ul&gt;(std::tuple<&gt;&amp;&amp;, std::_Index_tuple<0ul, 1ul, 2ul&gt;) (this=0x19161d80,&nbsp;
&nbsp; &nbsp; __args=...) at /usr/include/c++/9/functional:402
#18 0x00007ff828be916c in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)&gt;::operator()<, tensorflow::Status&gt;() (this=0x19161d80) at /usr/include/c++/9/functional:484
#19 0x00007ff828be6633 in std::_Function_handler<void (), std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)&gt; &gt;::_M_invoke(std::_Any_data const&amp;) (__functor=...)
&nbsp; &nbsp; at /usr/include/c++/9/bits/std_function.h:300
#20 0x00007ffd92bd68ea in tensorflow::data::BackgroundWorker::WorkerLoop() () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#21 0x00007ffd9338f5d8 in tensorflow::(anonymous namespace)::PThread::ThreadFn(void*) () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#22 0x00007ffff7dac609 in start_thread (arg=<optimized out&gt;) at pthread_create.c:477
#23 0x00007ffff7ee8293 in clone () from /usr/lib/x86_64-linux-gnu/libc.so.6









1057445597
1057445597@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user"                                                                                    <krassovskysasha@gmail.com&gt;;
发送时间:&nbsp;2022年7月27日(星期三) 凌晨0:57
收件人:&nbsp;"user"<user@arrow.apache.org&gt;;

主题:&nbsp;Re: how to filter a table to select rows



Hi 1057445597,Could you provide more information about your core dump? What backtrace does it give? I notice you’re not checking the Status returned by scanner_builder-&gt;Filter. That could be a place to start.


Sasha Krassovsky


On Jul 26, 2022, at 8:27 AM, Aldrin <akmontan@ucsc.edu&gt; wrote:

You can create an InMemoryDataset from a RecordBatch. See [1] for docs and [2] for example code. You may be able to find something similar for filtering tables.



[1]: https://arrow.apache.org/docs/cpp/api/dataset.html#_CPPv4N5arrow7dataset15InMemoryDataset15InMemoryDatasetENSt10shared_ptrI6SchemaEE17RecordBatchVector
[2]: https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/mainline/src/cpp/processing/operators.cpp#L50


Aldrin Montana
Computer Science PhD Student
UC Santa Cruz














On Mon, Jul 25, 2022 at 8:49 PM 1057445597 <1057445597@qq.com&gt; wrote:

I use the follows code to filter table, but always core dump at scanner_builder-&gt;Filter(filter_expression_). Is there a better way to filter a table? or a Recordbatch?


by the way dataset::ScannerBuilder always core dump when I used it in tfio to create a tensorflow dataset, It's most likely buggy




        // Read file columns and build a table
        std::shared_ptr<::arrow::Table&gt; table;
        CHECK_ARROW(reader-&gt;ReadTable(column_indices_, &amp;table));
        // Convert the table to a sequence of batches
        auto tr = std::make_shared<arrow::TableBatchReader&gt;(*table.get());

        // filter
        auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
        if (!dataset()-&gt;filter_.empty()) {
          std::cout << filter_expression_.ToString() << std::endl;
          scanner_builder-&gt;Filter(filter_expression_);
        }






1057445597
1057445597@qq.com



&nbsp;

回复: how to filter a table to select rows

Posted by 1057445597 <10...@qq.com>.
when I use ScannerBuilder, :


code:
       // filter
        auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
        if (!dataset()-&gt;filter_.empty()) {
          // TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()-&gt;filter_, filter_expression_));
          scanner_builder-&gt;Filter(filter_expression_);
        }
        std::shared_ptr<arrow::RecordBatch&gt; batch = nullptr;
        auto scanner = scanner_builder-&gt;Finish().ValueOrDie();
        auto batch_reader = scanner-&gt;ToRecordBatchReader().ValueOrDie();
        // CHECK_ARROW(ttr-&gt;ReadNext(&amp;batch));
        CHECK_ARROW(batch_reader-&gt;ReadNext(&amp;batch));
        TF_RETURN_IF_ERROR(CheckBatchColumnTypes(batch));
        next_record_batches_.clear();
        while (batch != nullptr) {
          if (!background) {
            record_batches_.emplace_back(batch);
          } else {
            next_record_batches_.emplace_back(batch);
          }
          CHECK_ARROW(batch_reader-&gt;ReadNext(&amp;batch));
        }






(gdb) bt
#0&nbsp; 0x00007ffff7e0c18b in raise () from /usr/lib/x86_64-linux-gnu/libc.so.6
#1&nbsp; 0x00007ffff7deb859 in abort () from /usr/lib/x86_64-linux-gnu/libc.so.6
#2&nbsp; 0x00007ff8292531a2 in arrow::util::CerrLog::~CerrLog (this=0x7ffae00d6540, __in_chrg=<optimized out&gt;) at external/arrow/cpp/src/arrow/util/logging.cc:72
#3&nbsp; 0x00007ff8292531c2 in arrow::util::CerrLog::~CerrLog (this=0x7ffae00d6540, __in_chrg=<optimized out&gt;) at external/arrow/cpp/src/arrow/util/logging.cc:74
#4&nbsp; 0x00007ff829252fe1 in arrow::util::ArrowLog::~ArrowLog (this=0x7ff804dde7f0, __in_chrg=<optimized out&gt;) at external/arrow/cpp/src/arrow/util/logging.cc:250
#5&nbsp; 0x00007ff828e1a83f in arrow::compute::CallNotNull (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression_internal.h:40
#6&nbsp; 0x00007ff828e0a081 in arrow::compute::FieldsInExpression (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:638
#7&nbsp; 0x00007ff828e0a0ec in arrow::compute::FieldsInExpression (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:639
#8&nbsp; 0x00007ff828fce6da in arrow::dataset::ScannerBuilder::Filter (this=0x7ffae00d43e0, filter=...) at external/arrow/cpp/src/arrow/dataset/scanner.cc:805
#9&nbsp; 0x00007ff828a9d8ce in tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile (this=0x18471320, file_index=1, background=true) at tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc:1278
#10 0x00007ff828aa9b20 in std::__invoke_impl<tensorflow::Status, tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&amp;)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&amp;, unsigned long&amp;, bool&amp;&gt; (__f=
&nbsp; &nbsp; @0x1943bfd0: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828a9d2b6 <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)&gt;, __t=@0x1943bff0: 0x18471320)
&nbsp; &nbsp; at /usr/include/c++/9/bits/invoke.h:73
#11 0x00007ff828aa8830 in std::__invoke<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&amp;)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&amp;, unsigned long&amp;, bool&amp;&gt; (__fn=
&nbsp; &nbsp; @0x1943bfd0: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828a9d2b6 <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)&gt;) at /usr/include/c++/9/bits/invoke.h:96
#12 0x00007ff828aa7428 in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)&gt;::__call<tensorflow::Status, , 0ul, 1ul, 2ul&gt;(std::tuple<&gt;&amp;&amp;, std::_Index_tuple<0ul, 1ul, 2ul&gt;) (this=0x1943bfd0,&nbsp;
&nbsp; &nbsp; __args=...) at /usr/include/c++/9/functional:402
#13 0x00007ff828aa6012 in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)&gt;::operator()<, tensorflow::Status&gt;() (this=0x1943bfd0) at /usr/include/c++/9/functional:484
#14 0x00007ff828aa3e93 in std::_Function_handler<void (), std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)&gt; &gt;::_M_invoke(std::_Any_data const&amp;) (__functor=...)
&nbsp; &nbsp; at /usr/include/c++/9/bits/std_function.h:300
#15 0x00007ffd92bd68ea in tensorflow::data::BackgroundWorker::WorkerLoop() () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#16 0x00007ffd9338f5d8 in tensorflow::(anonymous namespace)::PThread::ThreadFn(void*) () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#17 0x00007ffff7dac609 in start_thread (arg=<optimized out&gt;) at pthread_create.c:477
#18 0x00007ffff7ee8293 in clone () from /usr/lib/x86_64-linux-gnu/libc.so.6





1057445597
1057445597@qq.com



&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user"                                                                                    <1057445597@qq.com&gt;;
发送时间:&nbsp;2022年7月27日(星期三) 下午2:51
收件人:&nbsp;"user"<user@arrow.apache.org&gt;;

主题:&nbsp;回复: how to filter a table to select rows





I changed my method to filter as follows, meet same core dump, it seems filter_expression_ lo​se efficacy,I use my func ParseExpression ​parse a string to Expression. then I use ReadFile to read a parquet file for ReacordBatch. The errors only occurs when I call ReadFile in the background thread. when I call ParseExpression in the background thread to create a new Expression, the error goes away。
Or if I don't use Background Threads, I won't make a mistake,the filter works and no core dump,It's very strange.




this is the way i call ReadFile in background thread:
background_worker_-&gt;Schedule(std::bind(&amp;Iterator::ReadFile, this, current_file_idx_ + 1, true));
I should have captured filter_expression_, why doesn't it seem to work?



  




code:


    class Iterator : public ArrowBaseIterator<Dataset&gt; {
     public:
      explicit Iterator(const Params&amp; params)
          : ArrowBaseIterator<Dataset&gt;(params) {}

     private:
      Status SetupStreamsLocked(Env* env)
          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
        if (!s3fs_) {
          arrow::fs::EnsureS3Initialized();
          auto s3Options = arrow::fs::S3Options::FromAccessKey(
              dataset()-&gt;aws_access_key_, dataset()-&gt;aws_secret_key_);
          s3Options.endpoint_override = dataset()-&gt;aws_endpoint_override_;
          s3fs_ = arrow::fs::S3FileSystem::Make(s3Options).ValueOrDie();
          if (!dataset()-&gt;filter_.empty()) {
            std::cout << "before parse expression" << std::endl;
            TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()-&gt;filter_, filter_expression_));
            std::cout << "after parse expression" << std::endl;
          }
        }
        TF_RETURN_IF_ERROR(ReadFile(current_file_idx_));
#if 0
        if (current_batch_idx_ < record_batches_.size()) {
          current_batch_ = record_batches_[current_batch_idx_];
        }
        else {
          current_batch_ = nullptr;
        }
#endif

#if 1
        if (!background_worker_) {
          background_worker_ =
              std::make_shared<BackgroundWorker&gt;(env, "download_next_worker");
        }

        if (current_batch_idx_ < record_batches_.size()) {
          current_batch_ = record_batches_[current_batch_idx_];
        }

        if (current_file_idx_ + 1 < dataset()-&gt;parquet_files_.size()) {
          background_worker_-&gt;Schedule(std::bind(&amp;Iterator::ReadFile, this,
                                                 current_file_idx_ + 1, true));
        }
#endif
        return Status::OK();
      }

      Status NextStreamLocked(Env* env)
          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
        ArrowBaseIterator<Dataset&gt;::NextStreamLocked(env);
        if (++current_batch_idx_ < record_batches_.size()) {
          current_batch_ = record_batches_[current_batch_idx_];
        } else if (++current_file_idx_ < dataset()-&gt;parquet_files_.size()) {
          current_batch_idx_ = 0;

#if 0
          record_batches_.clear();
          return SetupStreamsLocked(env);
#endif

#if 1
          {
            mutex_lock lk(cv_mu_);
            while (!background_thread_finished_) {
              cv_.wait(lk);
            }
          }

          record_batches_.swap(next_record_batches_);
          if (!record_batches_.empty()) {
            current_batch_ = record_batches_[current_batch_idx_];
          } else {
            current_batch_ = nullptr;
          }
          background_thread_finished_ = false;
          if (current_file_idx_ + 1 < dataset()-&gt;parquet_files_.size()) {
            background_worker_-&gt;Schedule(std::bind(
                &amp;Iterator::ReadFile, this, current_file_idx_ + 1, true));
          }
#endif
        }
        return Status::OK();
      }

      void ResetStreamsLocked() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
        ArrowBaseIterator<Dataset&gt;::ResetStreamsLocked();
        current_file_idx_ = 0;
        current_batch_idx_ = 0;
        record_batches_.clear();
        next_record_batches_.clear();
      }

      Status ReadFile(int file_index, bool background = false)
          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
        auto access_file =
            s3fs_-&gt;OpenInputFile(dataset()-&gt;parquet_files_[file_index])
                .ValueOrDie();

        parquet::ArrowReaderProperties properties;
        properties.set_use_threads(true);
        properties.set_pre_buffer(true);
        parquet::ReaderProperties parquet_properties =
            parquet::default_reader_properties();

        std::shared_ptr<parquet::arrow::FileReaderBuilder&gt; builder =
            std::make_shared<parquet::arrow::FileReaderBuilder&gt;();
        builder-&gt;Open(access_file, parquet_properties);

        std::unique_ptr<parquet::arrow::FileReader&gt; reader;
        builder-&gt;properties(properties)-&gt;Build(&amp;reader);

        if (column_indices_.empty()) {
          std::shared_ptr<arrow::Schema&gt; schema;
          reader-&gt;GetSchema(&amp;schema);
          // check column name exist
          std::string err_column_names;
          for (const auto&amp; name : dataset()-&gt;column_names_) {
            int fieldIndex = schema-&gt;GetFieldIndex(name);
            column_indices_.push_back(fieldIndex);
            if (-1 == fieldIndex) {
              err_column_names = err_column_names + " " + name;
            }
          }

          if (err_column_names.length() != 0) {
            return errors::InvalidArgument("these column names don't exist: ",
                                           err_column_names);
          }
        }
        // Read file columns and build a table
        std::shared_ptr<::arrow::Table&gt; table;
        CHECK_ARROW(reader-&gt;ReadTable(column_indices_, &amp;table));
        // Convert the table to a sequence of batches
        auto tr = std::make_shared<arrow::TableBatchReader&gt;(*table.get());

        using namespace arrow::compute;
        // ensure arrow::dataset node factories are in the registry
        // arrow::dataset::internal::Initialize();
        // execution context
        ExecContext exec_context;
        auto plan = ExecPlan::Make(&amp;exec_context).ValueOrDie();
        arrow::AsyncGenerator<arrow::util::optional<ExecBatch&gt;&gt; sink_gen;
        auto source_node_options = arrow::compute::SourceNodeOptions{table-&gt;schema(), 
                              [tr]() {
                              using ExecBatch = arrow::compute::ExecBatch;
                              using ExecBatchOptional = arrow::util::optional<ExecBatch&gt;;
                              auto arrow_record_batch_result = tr-&gt;Next();
                              if (!arrow_record_batch_result.ok()) {
                                std::cout << "end1" << std::endl;
                                return arrow::AsyncGeneratorEnd<ExecBatchOptional&gt;();
                              }
                              auto arrow_record_batch = std::move(*arrow_record_batch_result);
                              if (!arrow_record_batch) {
                                std::cout << "end2" << std::endl;
                                return arrow::AsyncGeneratorEnd<ExecBatchOptional&gt;();
                              }
                              std::cout << "num rows: " << arrow_record_batch-&gt;num_rows() << std::endl;
                              return arrow::Future<ExecBatchOptional&gt;::MakeFinished(
                                ExecBatch(*arrow_record_batch));
                            }
                          };
        auto source_node = MakeExecNode("source", plan.get(), {}, source_node_options).ValueOrDie();
        // std::cout << "filter:   "<< filter_expression_.ToString() << std::endl;
        // arrow::compute::Expression filter_expr;

        // TF_RETURN_IF_ERROR(ArrowUtil::ParseExpression(dataset()-&gt;filter_, filter_expr));

        auto filter_node = MakeExecNode("filter", plan.get(), {source_node}, FilterNodeOptions{this-&gt;filter_expression_}).ValueOrDie();

        MakeExecNode("sink", plan.get(), {filter_node}, SinkNodeOptions{&amp;sink_gen});

        std::shared_ptr<::arrow::Schema&gt; schema;
        schema = table-&gt;schema();;
        auto sink_reader =  MakeGeneratorReader(schema, std::move(sink_gen), exec_context.memory_pool());
          plan-&gt;Validate();
        std::cout << "ExecPlan created : " << plan-&gt;ToString() << std::endl;
        // // start the ExecPlan
        plan-&gt;StartProducing();

        std::shared_ptr<arrow::Table&gt; response_table;

        response_table = arrow::Table::FromRecordBatchReader(sink_reader.get()).ValueOrDie();

        std::cout << "Results : " << response_table-&gt;ToString() << std::endl;

        // // stop producing
        plan-&gt;StopProducing();
        // // plan mark finished
        auto future = plan-&gt;finished();

        auto ttr = std::make_shared<arrow::TableBatchReader&gt;(*response_table.get());

        // filter
        // auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
        // scanner_builder-&gt;UseThreads(false);
        // if (!dataset()-&gt;filter_.empty()) {
        //   std::cout << filter_expression_.ToString() << std::endl;
        //   scanner_builder-&gt;Filter(filter_expression_);
        // }
        std::shared_ptr<arrow::RecordBatch&gt; batch = nullptr;
        // auto scanner = scanner_builder-&gt;Finish().ValueOrDie();
        // auto batch_reader = scanner-&gt;ToRecordBatchReader().ValueOrDie();
        CHECK_ARROW(ttr-&gt;ReadNext(&amp;batch));
        TF_RETURN_IF_ERROR(CheckBatchColumnTypes(batch));
        next_record_batches_.clear();
        while (batch != nullptr) {
          if (!background) {
            record_batches_.emplace_back(batch);
          } else {
            next_record_batches_.emplace_back(batch);
          }
          CHECK_ARROW(ttr-&gt;ReadNext(&amp;batch));
        }

        if (background) {
          mutex_lock lk(cv_mu_);
          background_thread_finished_ = true;
          cv_.notify_all();
        }

        return Status::OK();
      }

      size_t current_file_idx_ TF_GUARDED_BY(mu_) = 0;
      size_t current_batch_idx_ TF_GUARDED_BY(mu_) = 0;
      std::vector<std::shared_ptr<arrow::RecordBatch&gt;&gt; record_batches_
          TF_GUARDED_BY(mu_);
      std::vector<std::shared_ptr<arrow::RecordBatch&gt;&gt; next_record_batches_
          TF_GUARDED_BY(mu_);
      std::shared_ptr<arrow::fs::S3FileSystem&gt; s3fs_ TF_GUARDED_BY(mu_) =
          nullptr;
      std::vector<int&gt; column_indices_ TF_GUARDED_BY(mu_);
      std::shared_ptr<BackgroundWorker&gt; background_worker_ = nullptr;
      mutex cv_mu_;
      condition_variable cv_;
      bool background_thread_finished_ = false;
      arrow::compute::Expression filter_expression_;
    };





bt:


#0&nbsp; 0x00007ffff7e0c18b in raise () from /usr/lib/x86_64-linux-gnu/libc.so.6
#1&nbsp; 0x00007ffff7deb859 in abort () from /usr/lib/x86_64-linux-gnu/libc.so.6
#2&nbsp; 0x00007ff82929e2b0 in arrow::util::CerrLog::~CerrLog (this=0x7ffad40d40b0, __in_chrg=<optimized out&gt;) at external/arrow/cpp/src/arrow/util/logging.cc:72
#3&nbsp; 0x00007ff82929e2d0 in arrow::util::CerrLog::~CerrLog (this=0x7ffad40d40b0, __in_chrg=<optimized out&gt;) at external/arrow/cpp/src/arrow/util/logging.cc:74
#4&nbsp; 0x00007ff82929e0ef in arrow::util::ArrowLog::~ArrowLog (this=0x7ff804f83d60, __in_chrg=<optimized out&gt;) at external/arrow/cpp/src/arrow/util/logging.cc:250
#5&nbsp; 0x00007ff828f949cb in arrow::compute::CallNotNull (expr=...) at external/arrow/cpp/src/arrow/compute/exec/expression_internal.h:40
#6&nbsp; 0x00007ff828f8f3e0 in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema&gt; (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:458
#7&nbsp; 0x00007ff828f8f4bb in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema&gt; (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x7ff804f84420) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:460
#8&nbsp; 0x00007ff828f8efe7 in arrow::compute::(anonymous namespace)::BindImpl<arrow::Schema&gt; (expr=..., in=..., shape=arrow::ValueDescr::ARRAY, exec_context=0x0) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:441
#9&nbsp; 0x00007ff828f820bd in arrow::compute::Expression::Bind (this=0x7ff804f846a0, in_schema=..., exec_context=0x0) at external/arrow/cpp/src/arrow/compute/exec/expression.cc:476
#10 0x00007ff828fc3693 in arrow::compute::(anonymous namespace)::FilterNode::Make (plan=0x7ffad40d63f0, inputs=std::vector of length 1, capacity 1 = {...}, options=...) at external/arrow/cpp/src/arrow/compute/exec/filter_node.cc:53
#11 0x00007ff828fc5b06 in std::_Function_handler<arrow::Result<arrow::compute::ExecNode*&gt; (arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;, arrow::compute::ExecNodeOptions const&amp;), arrow::Result<arrow::compute::ExecNode*&gt; (*)(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;, arrow::compute::ExecNodeOptions const&amp;)&gt;::_M_invoke(std::_Any_data const&amp;, arrow::compute::ExecPlan*&amp;&amp;, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;&amp;&amp;, arrow::compute::ExecNodeOptions const&amp;) (__functor=..., 
&nbsp; &nbsp; __args#0=@0x7ff804f847c8: 0x7ffad40d63f0, __args#1=..., __args#2=...) at /usr/include/c++/9/bits/std_function.h:286
#12 0x00007ff828be00d7 in std::function<arrow::Result<arrow::compute::ExecNode*&gt; (arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;, arrow::compute::ExecNodeOptions const&amp;)&gt;::operator()(arrow::compute::ExecPlan*, std::vector<arrow::compute::ExecNode*, std::allocator<arrow::compute::ExecNode*&gt; &gt;, arrow::compute::ExecNodeOptions const&amp;) const (this=0x7ff804f848a0, __args#0=0x7ffad40d63f0, __args#1=std::vector of length 0, capacity 0, __args#2=...) at /usr/include/c++/9/bits/std_function.h:688
#13 0x00007ff828bd251c in arrow::compute::MakeExecNode (Python Exception <class 'gdb.error'&gt; No type named class std::basic_string<char, std::char_traits<char&gt;, std::allocator<char&gt; &gt;::_Rep.: 
factory_name=, plan=0x7ffad40d63f0, inputs=std::vector of length 0, capacity 0, options=..., registry=0x7ff82d1ac960 <arrow::compute::default_exec_factory_registry()::instance&gt;) at external/arrow/cpp/src/arrow/compute/exec/exec_plan.h:360
#14 0x00007ff828bde6e2 in tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile (this=0x18471300, file_index=1, background=true) at tensorflow_io/core/kernels/arrow/arrow_dataset_ops.cc:1248
#15 0x00007ff828bedcbc in std::__invoke_impl<tensorflow::Status, tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&amp;)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&amp;, unsigned long&amp;, bool&amp;&gt; (__f=
&nbsp; &nbsp; @0x19161d80: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828bdddfa <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)&gt;, __t=@0x19161da0: 0x18471300)
&nbsp; &nbsp; at /usr/include/c++/9/bits/invoke.h:73
#16 0x00007ff828bec6b2 in std::__invoke<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*&amp;)(int, bool), tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*&amp;, unsigned long&amp;, bool&amp;&gt; (__fn=
&nbsp; &nbsp; @0x19161d80: (class tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*)(class tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator * const, int, bool)) 0x7ff828bdddfa <tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::ReadFile(int, bool)&gt;) at /usr/include/c++/9/bits/invoke.h:96
#17 0x00007ff828beadb2 in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)&gt;::__call<tensorflow::Status, , 0ul, 1ul, 2ul&gt;(std::tuple<&gt;&amp;&amp;, std::_Index_tuple<0ul, 1ul, 2ul&gt;) (this=0x19161d80, 
&nbsp; &nbsp; __args=...) at /usr/include/c++/9/functional:402
#18 0x00007ff828be916c in std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)&gt;::operator()<, tensorflow::Status&gt;() (this=0x19161d80) at /usr/include/c++/9/functional:484
#19 0x00007ff828be6633 in std::_Function_handler<void (), std::_Bind<tensorflow::Status (tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator::*(tensorflow::data::ArrowS3DatasetOp::Dataset::Iterator*, unsigned long, bool))(int, bool)&gt; &gt;::_M_invoke(std::_Any_data const&amp;) (__functor=...)
&nbsp; &nbsp; at /usr/include/c++/9/bits/std_function.h:300
#20 0x00007ffd92bd68ea in tensorflow::data::BackgroundWorker::WorkerLoop() () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#21 0x00007ffd9338f5d8 in tensorflow::(anonymous namespace)::PThread::ThreadFn(void*) () from /usr/local/lib/python3.8/dist-packages/tensorflow/python/../libtensorflow_framework.so.2
#22 0x00007ffff7dac609 in start_thread (arg=<optimized out&gt;) at pthread_create.c:477
#23 0x00007ffff7ee8293 in clone () from /usr/lib/x86_64-linux-gnu/libc.so.6









1057445597
1057445597@qq.com



&nbsp;




------------------ 原始邮件 ------------------
发件人:                                                                                                                        "user"                                                                                    <krassovskysasha@gmail.com&gt;;
发送时间:&nbsp;2022年7月27日(星期三) 凌晨0:57
收件人:&nbsp;"user"<user@arrow.apache.org&gt;;

主题:&nbsp;Re: how to filter a table to select rows



Hi 1057445597,Could you provide more information about your core dump? What backtrace does it give? I notice you’re not checking the Status returned by scanner_builder-&gt;Filter. That could be a place to start.


Sasha Krassovsky


On Jul 26, 2022, at 8:27 AM, Aldrin <akmontan@ucsc.edu&gt; wrote:

You can create an InMemoryDataset from a RecordBatch. See [1] for docs and [2] for example code. You may be able to find something similar for filtering tables.



[1]: https://arrow.apache.org/docs/cpp/api/dataset.html#_CPPv4N5arrow7dataset15InMemoryDataset15InMemoryDatasetENSt10shared_ptrI6SchemaEE17RecordBatchVector
[2]: https://gitlab.com/skyhookdm/skytether-singlecell/-/blob/mainline/src/cpp/processing/operators.cpp#L50


Aldrin Montana
Computer Science PhD Student
UC Santa Cruz














On Mon, Jul 25, 2022 at 8:49 PM 1057445597 <1057445597@qq.com&gt; wrote:

I use the follows code to filter table, but always core dump at scanner_builder-&gt;Filter(filter_expression_). Is there a better way to filter a table? or a Recordbatch?


by the way dataset::ScannerBuilder always core dump when I used it in tfio to create a tensorflow dataset, It's most likely buggy




        // Read file columns and build a table
        std::shared_ptr<::arrow::Table&gt; table;
        CHECK_ARROW(reader-&gt;ReadTable(column_indices_, &amp;table));
        // Convert the table to a sequence of batches
        auto tr = std::make_shared<arrow::TableBatchReader&gt;(*table.get());

        // filter
        auto scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(tr);
        if (!dataset()-&gt;filter_.empty()) {
          std::cout << filter_expression_.ToString() << std::endl;
          scanner_builder-&gt;Filter(filter_expression_);
        }






1057445597
1057445597@qq.com



&nbsp;