You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/04/22 04:08:31 UTC
[arrow] branch master updated: ARROW-16148: [C++] TPC-H generator cleanup
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 24b48083f1 ARROW-16148: [C++] TPC-H generator cleanup
24b48083f1 is described below
commit 24b48083f102ef9366adb0797f5f8fa53c188a7b
Author: Sasha Krassovsky <kr...@gmail.com>
AuthorDate: Thu Apr 21 18:08:21 2022 -1000
ARROW-16148: [C++] TPC-H generator cleanup
We'd occasionally get a hang in the multi-column generators. This was due to us creating an insufficient number of tasks, so we'd never actually finish outputting. This fixes it.
This PR also adds a test to exercise it: it generates TPC-H SF 1 all in one plan and verifies the result.
Closes #12843 from save-buffer/sasha_tpch_fix
Authored-by: Sasha Krassovsky <kr...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/compute/exec/tpch_node.cc | 135 ++++++++++++++--------
cpp/src/arrow/compute/exec/tpch_node_test.cc | 163 +++++++++++++++++++--------
2 files changed, 203 insertions(+), 95 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/tpch_node.cc b/cpp/src/arrow/compute/exec/tpch_node.cc
index c7397970d7..711af0cbd2 100644
--- a/cpp/src/arrow/compute/exec/tpch_node.cc
+++ b/cpp/src/arrow/compute/exec/tpch_node.cc
@@ -1092,12 +1092,23 @@ class PartAndPartSupplierGenerator {
ThreadLocalData& tld = thread_local_data_[thread_index];
int32_t byte_width = arrow::internal::GetByteWidth(*kPartsuppTypes[column]);
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
- AllocateBuffer(batch_size_ * byte_width));
+ AllocateResizableBuffer(batch_size_ * byte_width));
ArrayData ad(kPartsuppTypes[column], batch_size_, {nullptr, std::move(buff)});
tld.partsupp[ibatch][column] = std::move(ad);
return Status::OK();
}
+ Status SetPartSuppColumnSize(size_t thread_index, size_t ibatch, int column,
+ size_t new_size) {
+ ThreadLocalData& tld = thread_local_data_[thread_index];
+ int32_t byte_width = arrow::internal::GetByteWidth(*kPartsuppTypes[column]);
+ tld.partsupp[ibatch][column].array()->length = static_cast<int64_t>(new_size);
+ ResizableBuffer* buff = checked_cast<ResizableBuffer*>(
+ tld.partsupp[ibatch][column].array()->buffers[1].get());
+ return buff->Resize(static_cast<int64_t>(new_size * byte_width),
+ /*shrink_to_fit=*/false);
+ }
+
Status PS_PARTKEY(size_t thread_index) {
ThreadLocalData& tld = thread_local_data_[thread_index];
if (!tld.generated_partsupp[PARTSUPP::PS_PARTKEY]) {
@@ -1129,8 +1140,9 @@ class PartAndPartSupplierGenerator {
ipart++;
}
}
+ RETURN_NOT_OK(
+ SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_PARTKEY, next_run));
irow += next_run;
- tld.partsupp[ibatch][PARTSUPP::PS_PARTKEY].array()->length = batch_offset;
}
}
return Status::OK();
@@ -1173,8 +1185,9 @@ class PartAndPartSupplierGenerator {
ipart++;
}
}
+ RETURN_NOT_OK(
+ SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_SUPPKEY, next_run));
irow += next_run;
- tld.partsupp[ibatch][PARTSUPP::PS_SUPPKEY].array()->length = batch_offset;
}
}
return Status::OK();
@@ -1197,7 +1210,8 @@ class PartAndPartSupplierGenerator {
int64_t next_run = std::min(batch_size_, ps_to_generate - irow);
for (int64_t irun = 0; irun < next_run; irun++) ps_availqty[irun] = dist(tld.rng);
- tld.partsupp[ibatch][PARTSUPP::PS_AVAILQTY].array()->length = next_run;
+ RETURN_NOT_OK(
+ SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_AVAILQTY, next_run));
irow += next_run;
}
}
@@ -1223,7 +1237,8 @@ class PartAndPartSupplierGenerator {
for (int64_t irun = 0; irun < next_run; irun++)
ps_supplycost[irun] = {dist(tld.rng)};
- tld.partsupp[ibatch][PARTSUPP::PS_SUPPLYCOST].array()->length = next_run;
+ RETURN_NOT_OK(SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_SUPPLYCOST,
+ next_run));
irow += next_run;
}
}
@@ -1232,7 +1247,8 @@ class PartAndPartSupplierGenerator {
Status PS_COMMENT(size_t thread_index) {
ThreadLocalData& tld = thread_local_data_[thread_index];
- if (tld.part[PARTSUPP::PS_COMMENT].kind() == Datum::NONE) {
+ if (!tld.generated_partsupp[PARTSUPP::PS_COMMENT]) {
+ tld.generated_partsupp[PARTSUPP::PS_COMMENT] = true;
int64_t irow = 0;
int64_t ps_to_generate = kPartSuppRowsPerPart * tld.part_to_generate;
for (size_t ibatch = 0; ibatch < tld.partsupp.size(); ibatch++) {
@@ -1327,13 +1343,14 @@ class OrdersAndLineItemGenerator {
std::min(batch_size_, orders_rows_to_generate_ - orders_rows_generated_);
orders_rows_generated_ += tld.orders_to_generate;
orders_batches_generated_.fetch_add(1);
+ tld.first_batch_offset = 0;
+ RETURN_NOT_OK(GenerateRowCounts(thread_index));
+ lineitem_batches_generated_.fetch_add(static_cast<int64_t>(tld.lineitem.size()));
ARROW_DCHECK(orders_rows_generated_ <= orders_rows_to_generate_);
}
}
tld.orders.resize(ORDERS::kNumCols);
std::fill(tld.orders.begin(), tld.orders.end(), Datum());
- RETURN_NOT_OK(GenerateRowCounts(thread_index));
- tld.first_batch_offset = 0;
tld.generated_lineitem.reset();
for (int col : orders_cols_) RETURN_NOT_OK(kOrdersGenerators[col](thread_index));
@@ -1395,15 +1412,16 @@ class OrdersAndLineItemGenerator {
tld.orders_to_generate =
std::min(batch_size_, orders_rows_to_generate_ - orders_rows_generated_);
orders_rows_generated_ += tld.orders_to_generate;
- orders_batches_generated_.fetch_add(1ll);
+ orders_batches_generated_.fetch_add(1);
+ RETURN_NOT_OK(GenerateRowCounts(thread_index));
+ lineitem_batches_generated_.fetch_add(
+ static_cast<int64_t>(tld.lineitem.size() - from_queue));
ARROW_DCHECK(orders_rows_generated_ <= orders_rows_to_generate_);
}
tld.orders.resize(ORDERS::kNumCols);
std::fill(tld.orders.begin(), tld.orders.end(), Datum());
- RETURN_NOT_OK(GenerateRowCounts(thread_index));
tld.generated_lineitem.reset();
if (from_queue) {
- lineitem_batches_generated_.fetch_sub(1);
for (size_t i = 0; i < lineitem_cols_.size(); i++)
if (tld.lineitem[0][lineitem_cols_[i]].kind() == Datum::NONE)
tld.lineitem[0][lineitem_cols_[i]] = std::move(queued[i]);
@@ -1435,7 +1453,6 @@ class OrdersAndLineItemGenerator {
ARROW_ASSIGN_OR_RAISE(ExecBatch eb, ExecBatch::Make(std::move(lineitem_result)));
lineitem_results.emplace_back(std::move(eb));
}
- lineitem_batches_generated_.fetch_add(static_cast<int64_t>(lineitem_results.size()));
// Return the first batch, enqueue the rest.
{
std::lock_guard<std::mutex> lock(lineitem_output_queue_mutex_);
@@ -1774,9 +1791,10 @@ class OrdersAndLineItemGenerator {
size_t& out_batch_offset) {
ThreadLocalData& tld = thread_local_data_[thread_index];
if (tld.lineitem[ibatch][column].kind() == Datum::NONE) {
+ ARROW_DCHECK(ibatch != 0 || tld.first_batch_offset == 0);
int32_t byte_width = arrow::internal::GetByteWidth(*kLineitemTypes[column]);
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
- AllocateBuffer(batch_size_ * byte_width));
+ AllocateResizableBuffer(batch_size_ * byte_width));
ArrayData ad(kLineitemTypes[column], batch_size_, {nullptr, std::move(buff)});
tld.lineitem[ibatch][column] = std::move(ad);
out_batch_offset = 0;
@@ -1786,6 +1804,17 @@ class OrdersAndLineItemGenerator {
return Status::OK();
}
+ Status SetLineItemColumnSize(size_t thread_index, size_t ibatch, int column,
+ size_t new_size) {
+ ThreadLocalData& tld = thread_local_data_[thread_index];
+ int32_t byte_width = arrow::internal::GetByteWidth(*kLineitemTypes[column]);
+ tld.lineitem[ibatch][column].array()->length = static_cast<int64_t>(new_size);
+ ResizableBuffer* buff = checked_cast<ResizableBuffer*>(
+ tld.lineitem[ibatch][column].array()->buffers[1].get());
+ return buff->Resize(static_cast<int64_t>(new_size * byte_width),
+ /*shrink_to_fit=*/false);
+ }
+
Status L_ORDERKEY(size_t thread_index) {
ThreadLocalData& tld = thread_local_data_[thread_index];
if (!tld.generated_lineitem[LINEITEM::L_ORDERKEY]) {
@@ -1817,8 +1846,8 @@ class OrdersAndLineItemGenerator {
}
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_ORDERKEY].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_ORDERKEY,
+ batch_offset));
}
}
return Status::OK();
@@ -1847,8 +1876,8 @@ class OrdersAndLineItemGenerator {
l_partkey[batch_offset] = dist(tld.rng);
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_PARTKEY].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_PARTKEY,
+ batch_offset));
}
}
return Status::OK();
@@ -1885,8 +1914,8 @@ class OrdersAndLineItemGenerator {
(partkey + (supplier * ((S / 4) + (partkey - 1) / S))) % S + 1;
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_SUPPKEY].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_SUPPKEY,
+ batch_offset));
}
}
return Status::OK();
@@ -1922,8 +1951,8 @@ class OrdersAndLineItemGenerator {
}
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_LINENUMBER].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_LINENUMBER,
+ batch_offset));
}
}
return Status::OK();
@@ -1954,8 +1983,8 @@ class OrdersAndLineItemGenerator {
l_quantity[batch_offset++] = {quantity};
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_QUANTITY].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_QUANTITY,
+ batch_offset));
}
}
return Status::OK();
@@ -1998,8 +2027,8 @@ class OrdersAndLineItemGenerator {
l_extendedprice[batch_offset] = {extended_price};
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_EXTENDEDPRICE].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch,
+ LINEITEM::L_EXTENDEDPRICE, batch_offset));
}
}
return Status::OK();
@@ -2027,8 +2056,8 @@ class OrdersAndLineItemGenerator {
for (int64_t i = 0; i < next_run; i++, batch_offset++)
l_discount[batch_offset] = {dist(tld.rng)};
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_DISCOUNT].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_DISCOUNT,
+ batch_offset));
}
}
return Status::OK();
@@ -2052,8 +2081,8 @@ class OrdersAndLineItemGenerator {
for (int64_t i = 0; i < next_run; i++, batch_offset++)
l_tax[batch_offset] = {dist(tld.rng)};
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_TAX].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(
+ SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_TAX, batch_offset));
}
}
return Status::OK();
@@ -2093,8 +2122,8 @@ class OrdersAndLineItemGenerator {
}
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_RETURNFLAG].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_RETURNFLAG,
+ batch_offset));
}
}
return Status::OK();
@@ -2131,8 +2160,8 @@ class OrdersAndLineItemGenerator {
l_linestatus[batch_offset] = 'F';
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_LINESTATUS].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_LINESTATUS,
+ batch_offset));
}
}
return Status::OK();
@@ -2169,8 +2198,8 @@ class OrdersAndLineItemGenerator {
}
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_SHIPDATE].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_SHIPDATE,
+ batch_offset));
}
}
return Status::OK();
@@ -2207,8 +2236,8 @@ class OrdersAndLineItemGenerator {
}
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_COMMITDATE].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_COMMITDATE,
+ batch_offset));
}
}
return Status::OK();
@@ -2243,8 +2272,8 @@ class OrdersAndLineItemGenerator {
l_receiptdate[batch_offset] = l_shipdate[batch_offset] + dist(tld.rng);
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_RECEIPTDATE].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_RECEIPTDATE,
+ batch_offset));
}
}
return Status::OK();
@@ -2278,8 +2307,8 @@ class OrdersAndLineItemGenerator {
std::strncpy(l_shipinstruct + batch_offset * byte_width, str, byte_width);
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_SHIPINSTRUCT].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch,
+ LINEITEM::L_SHIPINSTRUCT, batch_offset));
}
}
return Status::OK();
@@ -2311,8 +2340,8 @@ class OrdersAndLineItemGenerator {
std::strncpy(l_shipmode + batch_offset * byte_width, str, byte_width);
}
irow += next_run;
- tld.lineitem[ibatch][LINEITEM::L_SHIPMODE].array()->length =
- static_cast<int64_t>(batch_offset);
+ RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_SHIPMODE,
+ batch_offset));
}
}
return Status::OK();
@@ -2323,16 +2352,17 @@ class OrdersAndLineItemGenerator {
if (!tld.generated_lineitem[LINEITEM::L_COMMENT]) {
tld.generated_lineitem[LINEITEM::L_COMMENT] = true;
- size_t batch_offset = tld.first_batch_offset;
size_t ibatch = 0;
for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
// Comments are kind of sneaky: we always generate the full batch and then just
// bump the length
+ size_t batch_offset = 0;
if (tld.lineitem[ibatch][LINEITEM::L_COMMENT].kind() == Datum::NONE) {
ARROW_ASSIGN_OR_RAISE(tld.lineitem[ibatch][LINEITEM::L_COMMENT],
g_text.GenerateComments(batch_size_, 10, 43, tld.rng));
batch_offset = 0;
}
+ if (irow == 0) batch_offset = tld.first_batch_offset;
int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
@@ -2694,8 +2724,10 @@ class PartGenerator : public TpchTableGenerator {
bool expected = false;
if (done_.compare_exchange_strong(expected, true))
finished_callback_(batches_outputted_.load());
+ return Status::OK();
}
- return Status::OK();
+ return schedule_callback_(
+ [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
}
ExecBatch batch = std::move(*maybe_batch);
output_callback_(std::move(batch));
@@ -2754,8 +2786,10 @@ class PartSuppGenerator : public TpchTableGenerator {
bool expected = false;
if (done_.compare_exchange_strong(expected, true))
finished_callback_(batches_outputted_.load());
+ return Status::OK();
}
- return Status::OK();
+ return schedule_callback_(
+ [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
}
ExecBatch batch = std::move(*maybe_batch);
output_callback_(std::move(batch));
@@ -3070,8 +3104,10 @@ class OrdersGenerator : public TpchTableGenerator {
bool expected = false;
if (done_.compare_exchange_strong(expected, true))
finished_callback_(batches_outputted_.load());
+ return Status::OK();
}
- return Status::OK();
+ return schedule_callback_(
+ [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
}
ExecBatch batch = std::move(*maybe_batch);
output_callback_(std::move(batch));
@@ -3130,8 +3166,11 @@ class LineitemGenerator : public TpchTableGenerator {
bool expected = false;
if (done_.compare_exchange_strong(expected, true))
finished_callback_(batches_outputted_.load());
+ return Status::OK();
}
- return Status::OK();
+ // We may have generated but not outputted all of the batches.
+ return schedule_callback_(
+ [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
}
ExecBatch batch = std::move(*maybe_batch);
output_callback_(std::move(batch));
diff --git a/cpp/src/arrow/compute/exec/tpch_node_test.cc b/cpp/src/arrow/compute/exec/tpch_node_test.cc
index 8face53eeb..fc26ce90c2 100644
--- a/cpp/src/arrow/compute/exec/tpch_node_test.cc
+++ b/cpp/src/arrow/compute/exec/tpch_node_test.cc
@@ -45,17 +45,27 @@ static constexpr uint32_t kStartDate =
static constexpr uint32_t kEndDate =
10591; // December 12, 1998 is 10591 days after January 1, 1970
-Result<std::vector<ExecBatch>> GenerateTable(
- Result<ExecNode*> (TpchGen::*table)(std::vector<std::string>),
- double scale_factor = 1.0) {
+using TableNodeFn = Result<ExecNode*> (TpchGen::*)(std::vector<std::string>);
+
+constexpr double kDefaultScaleFactor = 0.1;
+
+Status AddTableAndSinkToPlan(ExecPlan& plan, TpchGen& gen,
+ AsyncGenerator<util::optional<ExecBatch>>& sink_gen,
+ TableNodeFn table) {
+ ARROW_ASSIGN_OR_RAISE(ExecNode * table_node, ((gen.*table)({})));
+ Declaration sink("sink", {Declaration::Input(table_node)}, SinkNodeOptions{&sink_gen});
+ ARROW_RETURN_NOT_OK(sink.AddToPlan(&plan));
+ return Status::OK();
+}
+
+Result<std::vector<ExecBatch>> GenerateTable(TableNodeFn table,
+ double scale_factor = kDefaultScaleFactor) {
ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(&ctx));
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<TpchGen> gen,
TpchGen::Make(plan.get(), scale_factor));
- ARROW_ASSIGN_OR_RAISE(ExecNode * table_node, ((gen.get()->*table)({})));
AsyncGenerator<util::optional<ExecBatch>> sink_gen;
- Declaration sink("sink", {Declaration::Input(table_node)}, SinkNodeOptions{&sink_gen});
- ARROW_RETURN_NOT_OK(sink.AddToPlan(plan.get()));
+ ARROW_RETURN_NOT_OK(AddTableAndSinkToPlan(*plan, *gen, sink_gen, table));
auto fut = StartAndCollect(plan.get(), sink_gen);
return fut.MoveResult();
}
@@ -333,24 +343,15 @@ void CountModifiedComments(const Datum& d, int* good_count, int* bad_count) {
}
}
-TEST(TpchNode, ScaleFactor) {
- ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Supplier, 0.25));
-
- int64_t kExpectedRows = 2500;
- int64_t num_rows = 0;
- for (auto& batch : res) num_rows += batch.length;
- ASSERT_EQ(num_rows, kExpectedRows);
-}
-
-TEST(TpchNode, Supplier) {
- ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Supplier));
- int64_t kExpectedRows = 10000;
+void VerifySupplier(const std::vector<ExecBatch>& batches,
+ double scale_factor = kDefaultScaleFactor) {
+ int64_t kExpectedRows = static_cast<int64_t>(10000 * scale_factor);
int64_t num_rows = 0;
std::unordered_set<int32_t> seen_suppkey;
int good_count = 0;
int bad_count = 0;
- for (auto& batch : res) {
+ for (auto& batch : batches) {
ValidateBatch(batch);
VerifyUniqueKey(&seen_suppkey, batch[0],
/*min=*/1,
@@ -365,18 +366,22 @@ TEST(TpchNode, Supplier) {
}
ASSERT_EQ(seen_suppkey.size(), kExpectedRows);
ASSERT_EQ(num_rows, kExpectedRows);
- ASSERT_EQ(good_count, 5);
- ASSERT_EQ(bad_count, 5);
+ ASSERT_EQ(good_count, static_cast<int64_t>(5 * scale_factor));
+ ASSERT_EQ(bad_count, static_cast<int64_t>(5 * scale_factor));
}
-TEST(TpchNode, Part) {
- ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Part));
+TEST(TpchNode, Supplier) {
+ ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Supplier));
+ VerifySupplier(res);
+}
- int64_t kExpectedRows = 200000;
+void VerifyPart(const std::vector<ExecBatch>& batches,
+ double scale_factor = kDefaultScaleFactor) {
+ int64_t kExpectedRows = static_cast<int64_t>(200000 * scale_factor);
int64_t num_rows = 0;
std::unordered_set<int32_t> seen_partkey;
- for (auto& batch : res) {
+ for (auto& batch : batches) {
ValidateBatch(batch);
VerifyUniqueKey(&seen_partkey, batch[0],
/*min=*/1,
@@ -401,14 +406,18 @@ TEST(TpchNode, Part) {
ASSERT_EQ(num_rows, kExpectedRows);
}
-TEST(TpchNode, PartSupp) {
- ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::PartSupp));
+TEST(TpchNode, Part) {
+ ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Part));
+ VerifyPart(res);
+}
- constexpr int64_t kExpectedRows = 800000;
+void VerifyPartSupp(const std::vector<ExecBatch>& batches,
+ double scale_factor = kDefaultScaleFactor) {
+ const int64_t kExpectedRows = static_cast<int64_t>(800000 * scale_factor);
int64_t num_rows = 0;
std::unordered_map<int32_t, int32_t> counts;
- for (auto& batch : res) {
+ for (auto& batch : batches) {
ValidateBatch(batch);
CountInstances(&counts, batch[0]);
VerifyAllBetween(batch[2], 1, 9999);
@@ -423,14 +432,18 @@ TEST(TpchNode, PartSupp) {
ASSERT_EQ(num_rows, kExpectedRows);
}
-TEST(TpchNode, Customer) {
- ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Customer));
+TEST(TpchNode, PartSupp) {
+ ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::PartSupp));
+ VerifyPartSupp(res);
+}
- const int64_t kExpectedRows = 150000;
+void VerifyCustomer(const std::vector<ExecBatch>& batches,
+ double scale_factor = kDefaultScaleFactor) {
+ const int64_t kExpectedRows = static_cast<int64_t>(150000 * scale_factor);
int64_t num_rows = 0;
std::unordered_set<int32_t> seen_custkey;
- for (auto& batch : res) {
+ for (auto& batch : batches) {
ValidateBatch(batch);
VerifyUniqueKey(&seen_custkey, batch[0],
/*min=*/1,
@@ -449,14 +462,18 @@ TEST(TpchNode, Customer) {
ASSERT_EQ(num_rows, kExpectedRows);
}
-TEST(TpchNode, Orders) {
- ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Orders));
+TEST(TpchNode, Customer) {
+ ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Customer));
+ VerifyCustomer(res);
+}
- constexpr int64_t kExpectedRows = 1500000;
+void VerifyOrders(const std::vector<ExecBatch>& batches,
+ double scale_factor = kDefaultScaleFactor) {
+ const int64_t kExpectedRows = static_cast<int64_t>(1500000 * scale_factor);
int64_t num_rows = 0;
std::unordered_set<int32_t> seen_orderkey;
- for (auto& batch : res) {
+ for (auto& batch : batches) {
ValidateBatch(batch);
VerifyUniqueKey(&seen_orderkey, batch[0],
/*min=*/1,
@@ -484,14 +501,19 @@ TEST(TpchNode, Orders) {
ASSERT_EQ(num_rows, kExpectedRows);
}
-TEST(TpchNode, Lineitem) {
- ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Lineitem));
+TEST(TpchNode, Orders) {
+ ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Orders));
+ VerifyOrders(res);
+}
+void VerifyLineitem(const std::vector<ExecBatch>& batches,
+ double scale_factor = kDefaultScaleFactor) {
std::unordered_map<int32_t, int32_t> counts;
- for (auto& batch : res) {
+ for (auto& batch : batches) {
ValidateBatch(batch);
CountInstances(&counts, batch[0]);
- VerifyAllBetween(batch[1], /*min=*/1, /*max=*/200000);
+ VerifyAllBetween(batch[1], /*min=*/1,
+ /*max=*/static_cast<int32_t>(200000 * scale_factor));
VerifyAllBetween(batch[3], /*min=*/1, /*max=*/7);
VerifyDecimalsBetween(batch[4], /*min=*/100, /*max=*/5000);
VerifyDecimalsBetween(batch[6], /*min=*/0, /*max=*/10);
@@ -527,14 +549,18 @@ TEST(TpchNode, Lineitem) {
}
}
-TEST(TpchNode, Nation) {
- ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Nation));
+TEST(TpchNode, Lineitem) {
+ ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Lineitem));
+ VerifyLineitem(res);
+}
+void VerifyNation(const std::vector<ExecBatch>& batches,
+ double scale_factor = kDefaultScaleFactor) {
constexpr int64_t kExpectedRows = 25;
int64_t num_rows = 0;
std::unordered_set<int32_t> seen_nationkey;
- for (auto& batch : res) {
+ for (auto& batch : batches) {
ValidateBatch(batch);
VerifyUniqueKey(&seen_nationkey, batch[0], 0, kExpectedRows - 1);
VerifyOneOf(
@@ -551,14 +577,18 @@ TEST(TpchNode, Nation) {
ASSERT_EQ(num_rows, kExpectedRows);
}
-TEST(TpchNode, Region) {
- ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Region));
+TEST(TpchNode, Nation) {
+ ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Nation));
+ VerifyNation(res);
+}
+void VerifyRegion(const std::vector<ExecBatch>& batches,
+ double scale_factor = kDefaultScaleFactor) {
constexpr int64_t kExpectedRows = 5;
int64_t num_rows = 0;
std::unordered_set<int32_t> seen_regionkey;
- for (auto& batch : res) {
+ for (auto& batch : batches) {
ValidateBatch(batch);
VerifyUniqueKey(&seen_regionkey, batch[0], 0, kExpectedRows - 1);
VerifyOneOf(batch[1],
@@ -570,6 +600,45 @@ TEST(TpchNode, Region) {
ASSERT_EQ(num_rows, 5);
}
+TEST(TpchNode, Region) {
+ ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Region));
+ VerifyRegion(res);
+}
+
+TEST(TpchNode, AllTables) {
+ constexpr double kScaleFactor = 0.05;
+ constexpr int kNumTables = 8;
+ std::array<TableNodeFn, kNumTables> tables = {
+ &TpchGen::Supplier, &TpchGen::Part, &TpchGen::PartSupp, &TpchGen::Customer,
+ &TpchGen::Orders, &TpchGen::Lineitem, &TpchGen::Nation, &TpchGen::Region,
+ };
+ using VerifyFn = void(const std::vector<ExecBatch>&, double);
+ std::array<VerifyFn*, kNumTables> verify_fns = {
+ &VerifySupplier, &VerifyPart, &VerifyPartSupp, &VerifyCustomer,
+ &VerifyOrders, &VerifyLineitem, &VerifyNation, &VerifyRegion,
+ };
+
+ std::array<AsyncGenerator<util::optional<ExecBatch>>, kNumTables> gens;
+ ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(&ctx));
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<TpchGen> gen,
+ TpchGen::Make(plan.get(), kScaleFactor));
+ for (int i = 0; i < kNumTables; i++) {
+ ASSERT_OK(AddTableAndSinkToPlan(*plan, *gen, gens[i], tables[i]));
+ }
+
+ ASSERT_OK(plan->Validate());
+ ASSERT_OK(plan->StartProducing());
+ ASSERT_OK(plan->finished().status());
+ for (int i = 0; i < kNumTables; i++) {
+ auto fut = CollectAsyncGenerator(gens[i]);
+ ASSERT_OK_AND_ASSIGN(auto maybe_batches, fut.MoveResult());
+ std::vector<ExecBatch> batches;
+ for (auto& maybe_batch : maybe_batches) batches.emplace_back(std::move(*maybe_batch));
+ verify_fns[i](batches, kScaleFactor);
+ }
+}
+
} // namespace internal
} // namespace compute
} // namespace arrow