You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/04/22 04:08:31 UTC

[arrow] branch master updated: ARROW-16148: [C++] TPC-H generator cleanup

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b48083f1 ARROW-16148: [C++] TPC-H generator cleanup
24b48083f1 is described below

commit 24b48083f102ef9366adb0797f5f8fa53c188a7b
Author: Sasha Krassovsky <kr...@gmail.com>
AuthorDate: Thu Apr 21 18:08:21 2022 -1000

    ARROW-16148: [C++] TPC-H generator cleanup
    
    We'd occasionally get a hang in the multi-column generators. This was due to us creating an insufficient number of tasks, so we'd never actually finish outputting. This fixes it.
    This PR also adds a test to exercise it: it generates TPC-H SF 1 all in one plan and verifies the result.
    
    Closes #12843 from save-buffer/sasha_tpch_fix
    
    Authored-by: Sasha Krassovsky <kr...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/compute/exec/tpch_node.cc      | 135 ++++++++++++++--------
 cpp/src/arrow/compute/exec/tpch_node_test.cc | 163 +++++++++++++++++++--------
 2 files changed, 203 insertions(+), 95 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/tpch_node.cc b/cpp/src/arrow/compute/exec/tpch_node.cc
index c7397970d7..711af0cbd2 100644
--- a/cpp/src/arrow/compute/exec/tpch_node.cc
+++ b/cpp/src/arrow/compute/exec/tpch_node.cc
@@ -1092,12 +1092,23 @@ class PartAndPartSupplierGenerator {
     ThreadLocalData& tld = thread_local_data_[thread_index];
     int32_t byte_width = arrow::internal::GetByteWidth(*kPartsuppTypes[column]);
     ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
-                          AllocateBuffer(batch_size_ * byte_width));
+                          AllocateResizableBuffer(batch_size_ * byte_width));
     ArrayData ad(kPartsuppTypes[column], batch_size_, {nullptr, std::move(buff)});
     tld.partsupp[ibatch][column] = std::move(ad);
     return Status::OK();
   }
 
+  Status SetPartSuppColumnSize(size_t thread_index, size_t ibatch, int column,
+                               size_t new_size) {
+    ThreadLocalData& tld = thread_local_data_[thread_index];
+    int32_t byte_width = arrow::internal::GetByteWidth(*kPartsuppTypes[column]);
+    tld.partsupp[ibatch][column].array()->length = static_cast<int64_t>(new_size);
+    ResizableBuffer* buff = checked_cast<ResizableBuffer*>(
+        tld.partsupp[ibatch][column].array()->buffers[1].get());
+    return buff->Resize(static_cast<int64_t>(new_size * byte_width),
+                        /*shrink_to_fit=*/false);
+  }
+
   Status PS_PARTKEY(size_t thread_index) {
     ThreadLocalData& tld = thread_local_data_[thread_index];
     if (!tld.generated_partsupp[PARTSUPP::PS_PARTKEY]) {
@@ -1129,8 +1140,9 @@ class PartAndPartSupplierGenerator {
             ipart++;
           }
         }
+        RETURN_NOT_OK(
+            SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_PARTKEY, next_run));
         irow += next_run;
-        tld.partsupp[ibatch][PARTSUPP::PS_PARTKEY].array()->length = batch_offset;
       }
     }
     return Status::OK();
@@ -1173,8 +1185,9 @@ class PartAndPartSupplierGenerator {
             ipart++;
           }
         }
+        RETURN_NOT_OK(
+            SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_SUPPKEY, next_run));
         irow += next_run;
-        tld.partsupp[ibatch][PARTSUPP::PS_SUPPKEY].array()->length = batch_offset;
       }
     }
     return Status::OK();
@@ -1197,7 +1210,8 @@ class PartAndPartSupplierGenerator {
         int64_t next_run = std::min(batch_size_, ps_to_generate - irow);
         for (int64_t irun = 0; irun < next_run; irun++) ps_availqty[irun] = dist(tld.rng);
 
-        tld.partsupp[ibatch][PARTSUPP::PS_AVAILQTY].array()->length = next_run;
+        RETURN_NOT_OK(
+            SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_AVAILQTY, next_run));
         irow += next_run;
       }
     }
@@ -1223,7 +1237,8 @@ class PartAndPartSupplierGenerator {
         for (int64_t irun = 0; irun < next_run; irun++)
           ps_supplycost[irun] = {dist(tld.rng)};
 
-        tld.partsupp[ibatch][PARTSUPP::PS_SUPPLYCOST].array()->length = next_run;
+        RETURN_NOT_OK(SetPartSuppColumnSize(thread_index, ibatch, PARTSUPP::PS_SUPPLYCOST,
+                                            next_run));
         irow += next_run;
       }
     }
@@ -1232,7 +1247,8 @@ class PartAndPartSupplierGenerator {
 
   Status PS_COMMENT(size_t thread_index) {
     ThreadLocalData& tld = thread_local_data_[thread_index];
-    if (tld.part[PARTSUPP::PS_COMMENT].kind() == Datum::NONE) {
+    if (!tld.generated_partsupp[PARTSUPP::PS_COMMENT]) {
+      tld.generated_partsupp[PARTSUPP::PS_COMMENT] = true;
       int64_t irow = 0;
       int64_t ps_to_generate = kPartSuppRowsPerPart * tld.part_to_generate;
       for (size_t ibatch = 0; ibatch < tld.partsupp.size(); ibatch++) {
@@ -1327,13 +1343,14 @@ class OrdersAndLineItemGenerator {
             std::min(batch_size_, orders_rows_to_generate_ - orders_rows_generated_);
         orders_rows_generated_ += tld.orders_to_generate;
         orders_batches_generated_.fetch_add(1);
+        tld.first_batch_offset = 0;
+        RETURN_NOT_OK(GenerateRowCounts(thread_index));
+        lineitem_batches_generated_.fetch_add(static_cast<int64_t>(tld.lineitem.size()));
         ARROW_DCHECK(orders_rows_generated_ <= orders_rows_to_generate_);
       }
     }
     tld.orders.resize(ORDERS::kNumCols);
     std::fill(tld.orders.begin(), tld.orders.end(), Datum());
-    RETURN_NOT_OK(GenerateRowCounts(thread_index));
-    tld.first_batch_offset = 0;
     tld.generated_lineitem.reset();
 
     for (int col : orders_cols_) RETURN_NOT_OK(kOrdersGenerators[col](thread_index));
@@ -1395,15 +1412,16 @@ class OrdersAndLineItemGenerator {
       tld.orders_to_generate =
           std::min(batch_size_, orders_rows_to_generate_ - orders_rows_generated_);
       orders_rows_generated_ += tld.orders_to_generate;
-      orders_batches_generated_.fetch_add(1ll);
+      orders_batches_generated_.fetch_add(1);
+      RETURN_NOT_OK(GenerateRowCounts(thread_index));
+      lineitem_batches_generated_.fetch_add(
+          static_cast<int64_t>(tld.lineitem.size() - from_queue));
       ARROW_DCHECK(orders_rows_generated_ <= orders_rows_to_generate_);
     }
     tld.orders.resize(ORDERS::kNumCols);
     std::fill(tld.orders.begin(), tld.orders.end(), Datum());
-    RETURN_NOT_OK(GenerateRowCounts(thread_index));
     tld.generated_lineitem.reset();
     if (from_queue) {
-      lineitem_batches_generated_.fetch_sub(1);
       for (size_t i = 0; i < lineitem_cols_.size(); i++)
         if (tld.lineitem[0][lineitem_cols_[i]].kind() == Datum::NONE)
           tld.lineitem[0][lineitem_cols_[i]] = std::move(queued[i]);
@@ -1435,7 +1453,6 @@ class OrdersAndLineItemGenerator {
       ARROW_ASSIGN_OR_RAISE(ExecBatch eb, ExecBatch::Make(std::move(lineitem_result)));
       lineitem_results.emplace_back(std::move(eb));
     }
-    lineitem_batches_generated_.fetch_add(static_cast<int64_t>(lineitem_results.size()));
     // Return the first batch, enqueue the rest.
     {
       std::lock_guard<std::mutex> lock(lineitem_output_queue_mutex_);
@@ -1774,9 +1791,10 @@ class OrdersAndLineItemGenerator {
                                         size_t& out_batch_offset) {
     ThreadLocalData& tld = thread_local_data_[thread_index];
     if (tld.lineitem[ibatch][column].kind() == Datum::NONE) {
+      ARROW_DCHECK(ibatch != 0 || tld.first_batch_offset == 0);
       int32_t byte_width = arrow::internal::GetByteWidth(*kLineitemTypes[column]);
       ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> buff,
-                            AllocateBuffer(batch_size_ * byte_width));
+                            AllocateResizableBuffer(batch_size_ * byte_width));
       ArrayData ad(kLineitemTypes[column], batch_size_, {nullptr, std::move(buff)});
       tld.lineitem[ibatch][column] = std::move(ad);
       out_batch_offset = 0;
@@ -1786,6 +1804,17 @@ class OrdersAndLineItemGenerator {
     return Status::OK();
   }
 
+  Status SetLineItemColumnSize(size_t thread_index, size_t ibatch, int column,
+                               size_t new_size) {
+    ThreadLocalData& tld = thread_local_data_[thread_index];
+    int32_t byte_width = arrow::internal::GetByteWidth(*kLineitemTypes[column]);
+    tld.lineitem[ibatch][column].array()->length = static_cast<int64_t>(new_size);
+    ResizableBuffer* buff = checked_cast<ResizableBuffer*>(
+        tld.lineitem[ibatch][column].array()->buffers[1].get());
+    return buff->Resize(static_cast<int64_t>(new_size * byte_width),
+                        /*shrink_to_fit=*/false);
+  }
+
   Status L_ORDERKEY(size_t thread_index) {
     ThreadLocalData& tld = thread_local_data_[thread_index];
     if (!tld.generated_lineitem[LINEITEM::L_ORDERKEY]) {
@@ -1817,8 +1846,8 @@ class OrdersAndLineItemGenerator {
           }
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_ORDERKEY].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_ORDERKEY,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -1847,8 +1876,8 @@ class OrdersAndLineItemGenerator {
           l_partkey[batch_offset] = dist(tld.rng);
 
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_PARTKEY].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_PARTKEY,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -1885,8 +1914,8 @@ class OrdersAndLineItemGenerator {
               (partkey + (supplier * ((S / 4) + (partkey - 1) / S))) % S + 1;
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_SUPPKEY].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_SUPPKEY,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -1922,8 +1951,8 @@ class OrdersAndLineItemGenerator {
           }
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_LINENUMBER].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_LINENUMBER,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -1954,8 +1983,8 @@ class OrdersAndLineItemGenerator {
           l_quantity[batch_offset++] = {quantity};
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_QUANTITY].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_QUANTITY,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -1998,8 +2027,8 @@ class OrdersAndLineItemGenerator {
           l_extendedprice[batch_offset] = {extended_price};
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_EXTENDEDPRICE].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch,
+                                            LINEITEM::L_EXTENDEDPRICE, batch_offset));
       }
     }
     return Status::OK();
@@ -2027,8 +2056,8 @@ class OrdersAndLineItemGenerator {
         for (int64_t i = 0; i < next_run; i++, batch_offset++)
           l_discount[batch_offset] = {dist(tld.rng)};
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_DISCOUNT].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_DISCOUNT,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -2052,8 +2081,8 @@ class OrdersAndLineItemGenerator {
         for (int64_t i = 0; i < next_run; i++, batch_offset++)
           l_tax[batch_offset] = {dist(tld.rng)};
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_TAX].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(
+            SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_TAX, batch_offset));
       }
     }
     return Status::OK();
@@ -2093,8 +2122,8 @@ class OrdersAndLineItemGenerator {
           }
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_RETURNFLAG].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_RETURNFLAG,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -2131,8 +2160,8 @@ class OrdersAndLineItemGenerator {
             l_linestatus[batch_offset] = 'F';
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_LINESTATUS].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_LINESTATUS,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -2169,8 +2198,8 @@ class OrdersAndLineItemGenerator {
           }
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_SHIPDATE].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_SHIPDATE,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -2207,8 +2236,8 @@ class OrdersAndLineItemGenerator {
           }
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_COMMITDATE].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_COMMITDATE,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -2243,8 +2272,8 @@ class OrdersAndLineItemGenerator {
           l_receiptdate[batch_offset] = l_shipdate[batch_offset] + dist(tld.rng);
 
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_RECEIPTDATE].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_RECEIPTDATE,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -2278,8 +2307,8 @@ class OrdersAndLineItemGenerator {
           std::strncpy(l_shipinstruct + batch_offset * byte_width, str, byte_width);
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_SHIPINSTRUCT].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch,
+                                            LINEITEM::L_SHIPINSTRUCT, batch_offset));
       }
     }
     return Status::OK();
@@ -2311,8 +2340,8 @@ class OrdersAndLineItemGenerator {
           std::strncpy(l_shipmode + batch_offset * byte_width, str, byte_width);
         }
         irow += next_run;
-        tld.lineitem[ibatch][LINEITEM::L_SHIPMODE].array()->length =
-            static_cast<int64_t>(batch_offset);
+        RETURN_NOT_OK(SetLineItemColumnSize(thread_index, ibatch, LINEITEM::L_SHIPMODE,
+                                            batch_offset));
       }
     }
     return Status::OK();
@@ -2323,16 +2352,17 @@ class OrdersAndLineItemGenerator {
     if (!tld.generated_lineitem[LINEITEM::L_COMMENT]) {
       tld.generated_lineitem[LINEITEM::L_COMMENT] = true;
 
-      size_t batch_offset = tld.first_batch_offset;
       size_t ibatch = 0;
       for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) {
         // Comments are kind of sneaky: we always generate the full batch and then just
         // bump the length
+        size_t batch_offset = 0;
         if (tld.lineitem[ibatch][LINEITEM::L_COMMENT].kind() == Datum::NONE) {
           ARROW_ASSIGN_OR_RAISE(tld.lineitem[ibatch][LINEITEM::L_COMMENT],
                                 g_text.GenerateComments(batch_size_, 10, 43, tld.rng));
           batch_offset = 0;
         }
+        if (irow == 0) batch_offset = tld.first_batch_offset;
 
         int64_t remaining_in_batch = static_cast<int64_t>(batch_size_ - batch_offset);
         int64_t next_run = std::min(tld.lineitem_to_generate - irow, remaining_in_batch);
@@ -2694,8 +2724,10 @@ class PartGenerator : public TpchTableGenerator {
         bool expected = false;
         if (done_.compare_exchange_strong(expected, true))
           finished_callback_(batches_outputted_.load());
+        return Status::OK();
       }
-      return Status::OK();
+      return schedule_callback_(
+          [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
     }
     ExecBatch batch = std::move(*maybe_batch);
     output_callback_(std::move(batch));
@@ -2754,8 +2786,10 @@ class PartSuppGenerator : public TpchTableGenerator {
         bool expected = false;
         if (done_.compare_exchange_strong(expected, true))
           finished_callback_(batches_outputted_.load());
+        return Status::OK();
       }
-      return Status::OK();
+      return schedule_callback_(
+          [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
     }
     ExecBatch batch = std::move(*maybe_batch);
     output_callback_(std::move(batch));
@@ -3070,8 +3104,10 @@ class OrdersGenerator : public TpchTableGenerator {
         bool expected = false;
         if (done_.compare_exchange_strong(expected, true))
           finished_callback_(batches_outputted_.load());
+        return Status::OK();
       }
-      return Status::OK();
+      return schedule_callback_(
+          [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
     }
     ExecBatch batch = std::move(*maybe_batch);
     output_callback_(std::move(batch));
@@ -3130,8 +3166,11 @@ class LineitemGenerator : public TpchTableGenerator {
         bool expected = false;
         if (done_.compare_exchange_strong(expected, true))
           finished_callback_(batches_outputted_.load());
+        return Status::OK();
       }
-      return Status::OK();
+      // We may have generated but not outputted all of the batches.
+      return schedule_callback_(
+          [this](size_t thread_index) { return this->ProduceCallback(thread_index); });
     }
     ExecBatch batch = std::move(*maybe_batch);
     output_callback_(std::move(batch));
diff --git a/cpp/src/arrow/compute/exec/tpch_node_test.cc b/cpp/src/arrow/compute/exec/tpch_node_test.cc
index 8face53eeb..fc26ce90c2 100644
--- a/cpp/src/arrow/compute/exec/tpch_node_test.cc
+++ b/cpp/src/arrow/compute/exec/tpch_node_test.cc
@@ -45,17 +45,27 @@ static constexpr uint32_t kStartDate =
 static constexpr uint32_t kEndDate =
     10591;  // December 12, 1998 is 10591 days after January 1, 1970
 
-Result<std::vector<ExecBatch>> GenerateTable(
-    Result<ExecNode*> (TpchGen::*table)(std::vector<std::string>),
-    double scale_factor = 1.0) {
+using TableNodeFn = Result<ExecNode*> (TpchGen::*)(std::vector<std::string>);
+
+constexpr double kDefaultScaleFactor = 0.1;
+
+Status AddTableAndSinkToPlan(ExecPlan& plan, TpchGen& gen,
+                             AsyncGenerator<util::optional<ExecBatch>>& sink_gen,
+                             TableNodeFn table) {
+  ARROW_ASSIGN_OR_RAISE(ExecNode * table_node, ((gen.*table)({})));
+  Declaration sink("sink", {Declaration::Input(table_node)}, SinkNodeOptions{&sink_gen});
+  ARROW_RETURN_NOT_OK(sink.AddToPlan(&plan));
+  return Status::OK();
+}
+
+Result<std::vector<ExecBatch>> GenerateTable(TableNodeFn table,
+                                             double scale_factor = kDefaultScaleFactor) {
   ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(&ctx));
   ARROW_ASSIGN_OR_RAISE(std::unique_ptr<TpchGen> gen,
                         TpchGen::Make(plan.get(), scale_factor));
-  ARROW_ASSIGN_OR_RAISE(ExecNode * table_node, ((gen.get()->*table)({})));
   AsyncGenerator<util::optional<ExecBatch>> sink_gen;
-  Declaration sink("sink", {Declaration::Input(table_node)}, SinkNodeOptions{&sink_gen});
-  ARROW_RETURN_NOT_OK(sink.AddToPlan(plan.get()));
+  ARROW_RETURN_NOT_OK(AddTableAndSinkToPlan(*plan, *gen, sink_gen, table));
   auto fut = StartAndCollect(plan.get(), sink_gen);
   return fut.MoveResult();
 }
@@ -333,24 +343,15 @@ void CountModifiedComments(const Datum& d, int* good_count, int* bad_count) {
   }
 }
 
-TEST(TpchNode, ScaleFactor) {
-  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Supplier, 0.25));
-
-  int64_t kExpectedRows = 2500;
-  int64_t num_rows = 0;
-  for (auto& batch : res) num_rows += batch.length;
-  ASSERT_EQ(num_rows, kExpectedRows);
-}
-
-TEST(TpchNode, Supplier) {
-  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Supplier));
-  int64_t kExpectedRows = 10000;
+void VerifySupplier(const std::vector<ExecBatch>& batches,
+                    double scale_factor = kDefaultScaleFactor) {
+  int64_t kExpectedRows = static_cast<int64_t>(10000 * scale_factor);
   int64_t num_rows = 0;
 
   std::unordered_set<int32_t> seen_suppkey;
   int good_count = 0;
   int bad_count = 0;
-  for (auto& batch : res) {
+  for (auto& batch : batches) {
     ValidateBatch(batch);
     VerifyUniqueKey(&seen_suppkey, batch[0],
                     /*min=*/1,
@@ -365,18 +366,22 @@ TEST(TpchNode, Supplier) {
   }
   ASSERT_EQ(seen_suppkey.size(), kExpectedRows);
   ASSERT_EQ(num_rows, kExpectedRows);
-  ASSERT_EQ(good_count, 5);
-  ASSERT_EQ(bad_count, 5);
+  ASSERT_EQ(good_count, static_cast<int64_t>(5 * scale_factor));
+  ASSERT_EQ(bad_count, static_cast<int64_t>(5 * scale_factor));
 }
 
-TEST(TpchNode, Part) {
-  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Part));
+TEST(TpchNode, Supplier) {
+  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Supplier));
+  VerifySupplier(res);
+}
 
-  int64_t kExpectedRows = 200000;
+void VerifyPart(const std::vector<ExecBatch>& batches,
+                double scale_factor = kDefaultScaleFactor) {
+  int64_t kExpectedRows = static_cast<int64_t>(200000 * scale_factor);
   int64_t num_rows = 0;
 
   std::unordered_set<int32_t> seen_partkey;
-  for (auto& batch : res) {
+  for (auto& batch : batches) {
     ValidateBatch(batch);
     VerifyUniqueKey(&seen_partkey, batch[0],
                     /*min=*/1,
@@ -401,14 +406,18 @@ TEST(TpchNode, Part) {
   ASSERT_EQ(num_rows, kExpectedRows);
 }
 
-TEST(TpchNode, PartSupp) {
-  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::PartSupp));
+TEST(TpchNode, Part) {
+  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Part));
+  VerifyPart(res);
+}
 
-  constexpr int64_t kExpectedRows = 800000;
+void VerifyPartSupp(const std::vector<ExecBatch>& batches,
+                    double scale_factor = kDefaultScaleFactor) {
+  const int64_t kExpectedRows = static_cast<int64_t>(800000 * scale_factor);
   int64_t num_rows = 0;
 
   std::unordered_map<int32_t, int32_t> counts;
-  for (auto& batch : res) {
+  for (auto& batch : batches) {
     ValidateBatch(batch);
     CountInstances(&counts, batch[0]);
     VerifyAllBetween(batch[2], 1, 9999);
@@ -423,14 +432,18 @@ TEST(TpchNode, PartSupp) {
   ASSERT_EQ(num_rows, kExpectedRows);
 }
 
-TEST(TpchNode, Customer) {
-  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Customer));
+TEST(TpchNode, PartSupp) {
+  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::PartSupp));
+  VerifyPartSupp(res);
+}
 
-  const int64_t kExpectedRows = 150000;
+void VerifyCustomer(const std::vector<ExecBatch>& batches,
+                    double scale_factor = kDefaultScaleFactor) {
+  const int64_t kExpectedRows = static_cast<int64_t>(150000 * scale_factor);
   int64_t num_rows = 0;
 
   std::unordered_set<int32_t> seen_custkey;
-  for (auto& batch : res) {
+  for (auto& batch : batches) {
     ValidateBatch(batch);
     VerifyUniqueKey(&seen_custkey, batch[0],
                     /*min=*/1,
@@ -449,14 +462,18 @@ TEST(TpchNode, Customer) {
   ASSERT_EQ(num_rows, kExpectedRows);
 }
 
-TEST(TpchNode, Orders) {
-  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Orders));
+TEST(TpchNode, Customer) {
+  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Customer));
+  VerifyCustomer(res);
+}
 
-  constexpr int64_t kExpectedRows = 1500000;
+void VerifyOrders(const std::vector<ExecBatch>& batches,
+                  double scale_factor = kDefaultScaleFactor) {
+  const int64_t kExpectedRows = static_cast<int64_t>(1500000 * scale_factor);
   int64_t num_rows = 0;
 
   std::unordered_set<int32_t> seen_orderkey;
-  for (auto& batch : res) {
+  for (auto& batch : batches) {
     ValidateBatch(batch);
     VerifyUniqueKey(&seen_orderkey, batch[0],
                     /*min=*/1,
@@ -484,14 +501,19 @@ TEST(TpchNode, Orders) {
   ASSERT_EQ(num_rows, kExpectedRows);
 }
 
-TEST(TpchNode, Lineitem) {
-  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Lineitem));
+TEST(TpchNode, Orders) {
+  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Orders));
+  VerifyOrders(res);
+}
 
+void VerifyLineitem(const std::vector<ExecBatch>& batches,
+                    double scale_factor = kDefaultScaleFactor) {
   std::unordered_map<int32_t, int32_t> counts;
-  for (auto& batch : res) {
+  for (auto& batch : batches) {
     ValidateBatch(batch);
     CountInstances(&counts, batch[0]);
-    VerifyAllBetween(batch[1], /*min=*/1, /*max=*/200000);
+    VerifyAllBetween(batch[1], /*min=*/1,
+                     /*max=*/static_cast<int32_t>(200000 * scale_factor));
     VerifyAllBetween(batch[3], /*min=*/1, /*max=*/7);
     VerifyDecimalsBetween(batch[4], /*min=*/100, /*max=*/5000);
     VerifyDecimalsBetween(batch[6], /*min=*/0, /*max=*/10);
@@ -527,14 +549,18 @@ TEST(TpchNode, Lineitem) {
   }
 }
 
-TEST(TpchNode, Nation) {
-  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Nation));
+TEST(TpchNode, Lineitem) {
+  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Lineitem));
+  VerifyLineitem(res);
+}
 
+void VerifyNation(const std::vector<ExecBatch>& batches,
+                  double scale_factor = kDefaultScaleFactor) {
   constexpr int64_t kExpectedRows = 25;
   int64_t num_rows = 0;
 
   std::unordered_set<int32_t> seen_nationkey;
-  for (auto& batch : res) {
+  for (auto& batch : batches) {
     ValidateBatch(batch);
     VerifyUniqueKey(&seen_nationkey, batch[0], 0, kExpectedRows - 1);
     VerifyOneOf(
@@ -551,14 +577,18 @@ TEST(TpchNode, Nation) {
   ASSERT_EQ(num_rows, kExpectedRows);
 }
 
-TEST(TpchNode, Region) {
-  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Region));
+TEST(TpchNode, Nation) {
+  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Nation));
+  VerifyNation(res);
+}
 
+void VerifyRegion(const std::vector<ExecBatch>& batches,
+                  double scale_factor = kDefaultScaleFactor) {
   constexpr int64_t kExpectedRows = 5;
   int64_t num_rows = 0;
 
   std::unordered_set<int32_t> seen_regionkey;
-  for (auto& batch : res) {
+  for (auto& batch : batches) {
     ValidateBatch(batch);
     VerifyUniqueKey(&seen_regionkey, batch[0], 0, kExpectedRows - 1);
     VerifyOneOf(batch[1],
@@ -570,6 +600,45 @@ TEST(TpchNode, Region) {
   ASSERT_EQ(num_rows, 5);
 }
 
+TEST(TpchNode, Region) {
+  ASSERT_OK_AND_ASSIGN(auto res, GenerateTable(&TpchGen::Region));
+  VerifyRegion(res);
+}
+
+TEST(TpchNode, AllTables) {
+  constexpr double kScaleFactor = 0.05;
+  constexpr int kNumTables = 8;
+  std::array<TableNodeFn, kNumTables> tables = {
+      &TpchGen::Supplier, &TpchGen::Part,     &TpchGen::PartSupp, &TpchGen::Customer,
+      &TpchGen::Orders,   &TpchGen::Lineitem, &TpchGen::Nation,   &TpchGen::Region,
+  };
+  using VerifyFn = void(const std::vector<ExecBatch>&, double);
+  std::array<VerifyFn*, kNumTables> verify_fns = {
+      &VerifySupplier, &VerifyPart,     &VerifyPartSupp, &VerifyCustomer,
+      &VerifyOrders,   &VerifyLineitem, &VerifyNation,   &VerifyRegion,
+  };
+
+  std::array<AsyncGenerator<util::optional<ExecBatch>>, kNumTables> gens;
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(&ctx));
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<TpchGen> gen,
+                       TpchGen::Make(plan.get(), kScaleFactor));
+  for (int i = 0; i < kNumTables; i++) {
+    ASSERT_OK(AddTableAndSinkToPlan(*plan, *gen, gens[i], tables[i]));
+  }
+
+  ASSERT_OK(plan->Validate());
+  ASSERT_OK(plan->StartProducing());
+  ASSERT_OK(plan->finished().status());
+  for (int i = 0; i < kNumTables; i++) {
+    auto fut = CollectAsyncGenerator(gens[i]);
+    ASSERT_OK_AND_ASSIGN(auto maybe_batches, fut.MoveResult());
+    std::vector<ExecBatch> batches;
+    for (auto& maybe_batch : maybe_batches) batches.emplace_back(std::move(*maybe_batch));
+    verify_fns[i](batches, kScaleFactor);
+  }
+}
+
 }  // namespace internal
 }  // namespace compute
 }  // namespace arrow