You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by 1057445597 <10...@qq.com> on 2022/09/08 14:58:43 UTC
Acero slowly?!help
My code is roughly as follows: read a small file and save it as a table, read another four files, use ConcatenateTables to merge into a table, and then use Acero to do join, project and filter. But when the program gets to plan->StartProducing(), it gets slower. I observed that the machine only has one CPU in use. Is there any way to tune it?
do {
...
auto concat_table_result = arrow::ConcatenateTables(big_tables);
if (!concat_table_result.ok()) {
res = errors::Internal(concat_table_result.status().ToString());
break;
}
big_table = concat_table_result.ValueOrDie();
// exec plan
cp::ExecContext exec_context;
auto plan_result = cp::ExecPlan::Make(&exec_context);
if (!plan_result.ok()) {
res = errors::Internal(plan_result.status().ToString());
break;
}
auto plan = plan_result.ValueOrDie();
arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
// table source node
auto l_table_source_node_option =
cp::TableSourceNodeOptions{small_table, 1};
auto r_table_source_node_option =
cp::TableSourceNodeOptions{big_table, 1};
auto l_source_result = cp::MakeExecNode(
"table_source", plan.get(), {}, l_table_source_node_option);
if (!l_source_result.ok()) {
res = errors::Internal(l_source_result.status().ToString());
break;
}
auto l_source = l_source_result.ValueOrDie();
auto r_source_result = cp::MakeExecNode(
"table_source", plan.get(), {}, r_table_source_node_option);
if (!r_source_result.ok()) {
res = errors::Internal(r_source_result.status().ToString());
break;
}
auto r_source = r_source_result.ValueOrDie();
// hashjoin node
cp::HashJoinNodeOptions join_opts{
cp::JoinType::INNER, {"uuid"}, {"uuid"}};
auto hashjoin_result = cp::MakeExecNode(
"hashjoin", plan.get(), {l_source, r_source}, join_opts);
if (!hashjoin_result.ok()) {
res = errors::Internal(r_source_result.status().ToString());
break;
}
auto hashjoin = hashjoin_result.ValueOrDie();
// projection node
std::vector<cp::Expression> projections;
projections.reserve(dataset()->column_names_.size());
auto hash_join_schema = hashjoin->output_schema();
std::string err_column_names;
for (const auto& name : dataset()->column_names_) {
int fieldIndex = hash_join_schema->GetFieldIndex(name);
if (-1 != fieldIndex) {
projections.push_back(cp::field_ref(fieldIndex));
} else {
err_column_names = err_column_names + " " + name;
}
}
if (err_column_names.length() != 0) {
res = errors::InvalidArgument(
"these column names don't exist after hash join: ",
err_column_names);
break;
}
auto project_result = cp::MakeExecNode(
"project", plan.get(), {hashjoin},
cp::ProjectNodeOptions{projections, dataset()->column_names_});
if (!project_result.ok()) {
res = errors::Internal(project_result.status().ToString());
break;
}
auto project = project_result.ValueOrDie();
// filter node
if (!dataset()->filter_.empty()) {
auto filter_result = cp::MakeExecNode(
"filter", plan.get(), {project},
cp::FilterNodeOptions{dataset()->filter_expr_});
if (!filter_result.ok()) {
res = errors::Internal(filter_result.status().ToString());
break;
}
auto filter = filter_result.ValueOrDie();
auto sink_gen_result = cp::MakeExecNode(
"sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen});
if (!sink_gen_result.ok()) {
res = errors::Internal(filter_result.status().ToString());
break;
}
} else {
auto sink_gen_result = cp::MakeExecNode(
"sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen});
if (!sink_gen_result.ok()) {
res = errors::Internal(sink_gen_result.status().ToString());
break;
}
}
auto sink_reader = cp::MakeGeneratorReader(
project->output_schema(), std::move(sink_gen),
exec_context.memory_pool());
std::cout << project->output_schema()->ToString() << std::endl;
// validate the Execplan
plan->Validate();
// start the Execplan
plan->StartProducing();
auto response_table_result =
arrow::Table::FromRecordBatchReader(sink_reader.get());
if (!response_table_result.ok()) {
res = errors::Internal(response_table_result.status().ToString());
break;
}
auto response_table = response_table_result.ValueOrDie();
// stop producing
plan->StopProducing();
// plan mark finished
auto future = plan->finished();
if (!future.status().ok()) {
res = errors::Internal(future.status().ToString());
break;
}
...
} while(0)
1057445597
1057445597@qq.com
Re: Acero slowly?!help
Posted by Weston Pace <we...@gmail.com>.
It seems a default ExecContext is single-threaded. Do you get more
parallelism if you create your exec context as follows?
cp::ExecContext exec_context(default_memory_pool(),
arrow::internal::GetCpuThreadPool());
On Thu, Sep 8, 2022 at 7:59 AM 1057445597 <10...@qq.com> wrote:
> My code is roughly as follows: read a small file and save it as a table,
> read another four files, use ConcatenateTables to merge into a table, and
> then use Acero to do join, project and filter. But when the program gets to
> plan->StartProducing(), it gets slower. I observed that the machine only
> has one CPU in use. Is there any way to tune it?
>
>
>
>
> do {
> ...
> auto concat_table_result = arrow::ConcatenateTables(big_tables);
> if (!concat_table_result.ok()) {
> res = errors::Internal(concat_table_result.status().ToString());
> break;
> }
> big_table = concat_table_result.ValueOrDie();
>
> // exec plan
> cp::ExecContext exec_context;
> auto plan_result = cp::ExecPlan::Make(&exec_context);
> if (!plan_result.ok()) {
> res = errors::Internal(plan_result.status().ToString());
> break;
> }
> auto plan = plan_result.ValueOrDie();
>
>
> arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
>
> // table source node
> auto l_table_source_node_option =
> cp::TableSourceNodeOptions{small_table, 1};
> auto r_table_source_node_option =
> cp::TableSourceNodeOptions{big_table, 1};
>
> auto l_source_result = cp::MakeExecNode(
> "table_source", plan.get(), {}, l_table_source_node_option);
> if (!l_source_result.ok()) {
> res = errors::Internal(l_source_result.status().ToString());
> break;
> }
> auto l_source = l_source_result.ValueOrDie();
>
> auto r_source_result = cp::MakeExecNode(
> "table_source", plan.get(), {}, r_table_source_node_option);
> if (!r_source_result.ok()) {
> res = errors::Internal(r_source_result.status().ToString());
> break;
> }
> auto r_source = r_source_result.ValueOrDie();
>
> // hashjoin node
> cp::HashJoinNodeOptions join_opts{
> cp::JoinType::INNER, {"uuid"}, {"uuid"}};
> auto hashjoin_result = cp::MakeExecNode(
> "hashjoin", plan.get(), {l_source, r_source}, join_opts);
> if (!hashjoin_result.ok()) {
> res = errors::Internal(r_source_result.status().ToString());
> break;
> }
> auto hashjoin = hashjoin_result.ValueOrDie();
>
> // projection node
> std::vector<cp::Expression> projections;
> projections.reserve(dataset()->column_names_.size());
> auto hash_join_schema = hashjoin->output_schema();
> std::string err_column_names;
> for (const auto& name : dataset()->column_names_) {
> int fieldIndex = hash_join_schema->GetFieldIndex(name);
> if (-1 != fieldIndex) {
> projections.push_back(cp::field_ref(fieldIndex));
> } else {
> err_column_names = err_column_names + " " + name;
> }
> }
>
> if (err_column_names.length() != 0) {
> res = errors::InvalidArgument(
> "these column names don't exist after hash join: ",
> err_column_names);
> break;
> }
>
>
> auto project_result = cp::MakeExecNode(
> "project", plan.get(), {hashjoin},
> cp::ProjectNodeOptions{projections, dataset()->column_names_});
> if (!project_result.ok()) {
> res = errors::Internal(project_result.status().ToString());
> break;
> }
> auto project = project_result.ValueOrDie();
>
> // filter node
> if (!dataset()->filter_.empty()) {
> auto filter_result = cp::MakeExecNode(
> "filter", plan.get(), {project},
> cp::FilterNodeOptions{dataset()->filter_expr_});
> if (!filter_result.ok()) {
> res = errors::Internal(filter_result.status().ToString());
> break;
> }
> auto filter = filter_result.ValueOrDie();
>
> auto sink_gen_result = cp::MakeExecNode(
> "sink", plan.get(), {filter}, cp::SinkNodeOptions{&sink_gen});
> if (!sink_gen_result.ok()) {
> res = errors::Internal(filter_result.status().ToString());
> break;
> }
>
> } else {
> auto sink_gen_result = cp::MakeExecNode(
> "sink", plan.get(), {project}, cp::SinkNodeOptions{&sink_gen});
> if (!sink_gen_result.ok()) {
> res = errors::Internal(sink_gen_result.status().ToString());
> break;
> }
> }
>
> auto sink_reader = cp::MakeGeneratorReader(
> project->output_schema(), std::move(sink_gen),
> exec_context.memory_pool());
>
> std::cout << project->output_schema()->ToString() << std::endl;
>
> // validate the Execplan
> plan->Validate();
>
> // start the Execplan
> plan->StartProducing();
>
> auto response_table_result =
> arrow::Table::FromRecordBatchReader(sink_reader.get());
> if (!response_table_result.ok()) {
> res = errors::Internal(response_table_result.status().ToString());
> break;
> }
> auto response_table = response_table_result.ValueOrDie();
>
> // stop producing
> plan->StopProducing();
> // plan mark finished
> auto future = plan->finished();
> if (!future.status().ok()) {
> res = errors::Internal(future.status().ToString());
> break;
> }
> ...
> } while(0)
>
> ------------------------------
> 1057445597
> 1057445597@qq.com
>
> <https://wx.mail.qq.com/home/index?t=readmail_businesscard_midpage&nocheck=true&name=1057445597&icon=http%3A%2F%2Fthirdqq.qlogo.cn%2Fg%3Fb%3Dsdk%26k%3DIlyZtc5eQb1ZfPd0rzpQlQ%26s%3D100%26t%3D1551800738%3Frand%3D1648208978&mail=1057445597%40qq.com&code=>
>
>