You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by 1057445597 <10...@qq.com> on 2022/09/08 14:58:43 UTC

Acero slowly?!help

My code is roughly as follows: read a small file and save it as a table, read another four files, use ConcatenateTables to merge into a table, and then use Acero to do join, project and filter. But when the program gets to plan-&gt;StartProducing(), it gets slower. I observed that the machine only has one CPU in use. Is there any way to tune it?




       


     do {
         ...
         auto concat_table_result = arrow::ConcatenateTables(big_tables);
          if (!concat_table_result.ok()) {
            res = errors::Internal(concat_table_result.status().ToString());
            break;
          }
          big_table = concat_table_result.ValueOrDie();

          // exec plan
          cp::ExecContext exec_context;
          auto plan_result = cp::ExecPlan::Make(&amp;exec_context);
          if (!plan_result.ok()) {
            res = errors::Internal(plan_result.status().ToString());
            break;
          }
          auto plan = plan_result.ValueOrDie();


          arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch&gt;&gt; sink_gen;

          // table source node
          auto l_table_source_node_option =
              cp::TableSourceNodeOptions{small_table, 1};
          auto r_table_source_node_option =
              cp::TableSourceNodeOptions{big_table, 1};

          auto l_source_result = cp::MakeExecNode(
              "table_source", plan.get(), {}, l_table_source_node_option);
          if (!l_source_result.ok()) {
            res = errors::Internal(l_source_result.status().ToString());
            break;
          }
          auto l_source = l_source_result.ValueOrDie();

          auto r_source_result = cp::MakeExecNode(
              "table_source", plan.get(), {}, r_table_source_node_option);
          if (!r_source_result.ok()) {
            res = errors::Internal(r_source_result.status().ToString());
            break;
          }
          auto r_source = r_source_result.ValueOrDie();

          // hashjoin node
          cp::HashJoinNodeOptions join_opts{
              cp::JoinType::INNER, {"uuid"}, {"uuid"}};
          auto hashjoin_result = cp::MakeExecNode(
              "hashjoin", plan.get(), {l_source, r_source}, join_opts);
          if (!hashjoin_result.ok()) {
            res = errors::Internal(r_source_result.status().ToString());
            break;
          }
          auto hashjoin = hashjoin_result.ValueOrDie();

          // projection node
          std::vector<cp::Expression&gt; projections;
          projections.reserve(dataset()-&gt;column_names_.size());
          auto hash_join_schema = hashjoin-&gt;output_schema();
          std::string err_column_names;
          for (const auto&amp; name : dataset()-&gt;column_names_) {
            int fieldIndex = hash_join_schema-&gt;GetFieldIndex(name);
            if (-1 != fieldIndex) {
              projections.push_back(cp::field_ref(fieldIndex));
            } else {
              err_column_names = err_column_names + " " + name;
            }
          }

          if (err_column_names.length() != 0) {
            res = errors::InvalidArgument(
                "these column names don't exist after hash join: ",
                err_column_names);
            break;
          }


          auto project_result = cp::MakeExecNode(
              "project", plan.get(), {hashjoin},
              cp::ProjectNodeOptions{projections, dataset()-&gt;column_names_});
          if (!project_result.ok()) {
            res = errors::Internal(project_result.status().ToString());
            break;
          }
          auto project = project_result.ValueOrDie();

          // filter node
          if (!dataset()-&gt;filter_.empty()) {
            auto filter_result = cp::MakeExecNode(
                "filter", plan.get(), {project},
                cp::FilterNodeOptions{dataset()-&gt;filter_expr_});
            if (!filter_result.ok()) {
              res = errors::Internal(filter_result.status().ToString());
              break;
            }
            auto filter = filter_result.ValueOrDie();

            auto sink_gen_result = cp::MakeExecNode(
                "sink", plan.get(), {filter}, cp::SinkNodeOptions{&amp;sink_gen});
            if (!sink_gen_result.ok()) {
              res = errors::Internal(filter_result.status().ToString());
              break;
            }

          } else {
            auto sink_gen_result = cp::MakeExecNode(
                "sink", plan.get(), {project}, cp::SinkNodeOptions{&amp;sink_gen});
            if (!sink_gen_result.ok()) {
              res = errors::Internal(sink_gen_result.status().ToString());
              break;
            }
          }

          auto sink_reader = cp::MakeGeneratorReader(
              project-&gt;output_schema(), std::move(sink_gen),
              exec_context.memory_pool());

          std::cout << project-&gt;output_schema()-&gt;ToString() << std::endl;

          // validate the Execplan
          plan-&gt;Validate();

          // start the Execplan
          plan-&gt;StartProducing();

          auto response_table_result =
              arrow::Table::FromRecordBatchReader(sink_reader.get());
          if (!response_table_result.ok()) {
            res = errors::Internal(response_table_result.status().ToString());
            break;
          }
          auto response_table = response_table_result.ValueOrDie();


          // stop producing
          plan-&gt;StopProducing();
          // plan mark finished
          auto future = plan-&gt;finished();
          if (!future.status().ok()) {
            res = errors::Internal(future.status().ToString());
            break;
          }


&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ...
&nbsp; &nbsp; &nbsp; &nbsp; } while(0)




1057445597
1057445597@qq.com



&nbsp;

Re: Acero slowly?!help

Posted by Weston Pace <we...@gmail.com>.
It seems a default ExecContext is single-threaded.  Do you get more
parallelism if you create your exec context as follows?

cp::ExecContext exec_context(default_memory_pool(),
arrow::internal::GetCpuThreadPool());

On Thu, Sep 8, 2022 at 7:59 AM 1057445597 <10...@qq.com> wrote:

> My code is roughly as follows: read a small file and save it as a table,
> read another four files, use ConcatenateTables to merge into a table, and
> then use Acero to do join, project and filter. But when the program gets to
> plan->StartProducing(), it gets slower. I observed that the machine only
> has one CPU in use. Is there any way to tune it?
>
>
>
>
> do {
> ...
> auto concat_table_result = arrow::ConcatenateTables(big_tables);
> if (!concat_table_result.ok()) {
> res = errors::Internal(concat_table_result.status().ToString());
> break;
> }
> big_table = concat_table_result.ValueOrDie();
>
> // exec plan
> cp::ExecContext exec_context;
> auto plan_result = cp::ExecPlan::Make(&exec_context);
> if (!plan_result.ok()) {
> res = errors::Internal(plan_result.status().ToString());
> break;
> }
> auto plan = plan_result.ValueOrDie();
>
>
> arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
>
> // table source node
> auto l_table_source_node_option =
> cp::TableSourceNodeOptions{small_table, 1};
> auto r_table_source_node_option =
> cp::TableSourceNodeOptions{big_table, 1};
>
> auto l_source_result = cp::MakeExecNode(
> "table_source", plan.get(), {}, l_table_source_node_option);
> if (!l_source_result.ok()) {
> res = errors::Internal(l_source_result.status().ToString());
> break;
> }
> auto l_source = l_source_result.ValueOrDie();
>
> auto r_source_result = cp::MakeExecNode(
> "table_source", plan.get(), {}, r_table_source_node_option);
> if (!r_source_result.ok()) {
> res = errors::Internal(r_source_result.status().ToString());
> break;
> }
> auto r_source = r_source_result.ValueOrDie();
>
> // hashjoin node
> cp::HashJoinNodeOptions join_opts{
> cp::JoinType::INNER, {"uuid"}, {"uuid"}};
> auto hashjoin_result = cp::MakeExecNode(
> "hashjoin", plan.get(), {l_source, r_source}, join_opts);
> if (!hashjoin_result.ok()) {
> res = errors::Internal(r_source_result.status().ToString());
> break;
> }
> auto hashjoin = hashjoin_result.ValueOrDie();
>
> // projection node
> std::vector<cp::Expression> projections;
> projections.reserve(dataset()->column_names_.size());
> auto hash_join_schema = hashjoin->output_schema();
> std::string err_column_names;
> for (const auto& name : dataset()->column_names_) {
> int fieldIndex = hash_join_schema->GetFieldIndex(name);
> if (-1 != fieldIndex) {
> projections.push_back(cp::field_ref(fieldIndex));
> } else {
> err_column_names = err_column_names + " " + name;
> }
> }
>
> if (err_column_names.length() != 0) {
> res = errors::InvalidArgument(
> "these column names don't exist after hash join: ",
> err_column_names);
> break;
> }
>
>
> auto project_result = cp::MakeExecNode(
> "project", plan.get(), {hashjoin},
> cp::ProjectNodeOptions{projections, dataset()->column_names_});
> if (!project_result.ok()) {
> res = errors::Internal(project_result.status().ToString());
> break;
> }
> auto project = project_result.ValueOrDie();
>
> // filter node
> if (!dataset()->filter_.empty()) {
> auto filter_result = cp::MakeExecNode(
> "filter", plan.get(), {project},
> cp::FilterNodeOptions{dataset()->filter_expr_});
> if (!filter_result.ok()) {
> res = errors::Internal(filter_result.status().ToString());
> break;
> }
> auto filter = filter_result.ValueOrDie();
>
> auto sink_gen_result = cp::MakeExecNode(
> "sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen});
> if (!sink_gen_result.ok()) {
> res = errors::Internal(filter_result.status().ToString());
> break;
> }
>
> } else {
> auto sink_gen_result = cp::MakeExecNode(
> "sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen});
> if (!sink_gen_result.ok()) {
> res = errors::Internal(sink_gen_result.status().ToString());
> break;
> }
> }
>
> auto sink_reader = cp::MakeGeneratorReader(
> project->output_schema(), std::move(sink_gen),
> exec_context.memory_pool());
>
> std::cout << project->output_schema()->ToString() << std::endl;
>
> // validate the Execplan
> plan->Validate();
>
> // start the Execplan
> plan->StartProducing();
>
> auto response_table_result =
> arrow::Table::FromRecordBatchReader(sink_reader.get());
> if (!response_table_result.ok()) {
> res = errors::Internal(response_table_result.status().ToString());
> break;
> }
> auto response_table = response_table_result.ValueOrDie();
>
> // stop producing
> plan->StopProducing();
> // plan mark finished
> auto future = plan->finished();
> if (!future.status().ok()) {
> res = errors::Internal(future.status().ToString());
> break;
> }
>                       ...
>         } while(0)
>
> ------------------------------
> 1057445597
> 1057445597@qq.com
>
> <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage&nocheck=true&name=1057445597&icon=http%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Dsdk%26k%3DIlyZtc5eQb1ZfPd0rzpQlQ%26s%3D100%26t%3D1551800738%3Frand%3D1648208978&mail=1057445597%40qq.com&code=>
>
>