You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/04/15 16:57:52 UTC

[impala] 01/02: IMPALA-9422: Re-visit and improve join node and builder's counters

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bd4458b7a92910178a3f92cec888e83172899408
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Mon Apr 6 13:40:15 2020 -0700

    IMPALA-9422: Re-visit and improve join node and builder's counters
    
    This patch makes the following changes:
    - All code executed inside the builder that gets executed exactly once
    during query execution is attributed to the builder. This also include
    public calls to the builder that are used as synchronization points
    for shared builds. The serial execution phase in these methods are
    always executed once regardless of the builder execution mode (namely
    single-threaded, parallel execution, separate build sink).
    - Also makes sure there is no double counting of total time in builder.
    - BuildTime counter has been removed from the join node's profile in
    favor of the builder's total time.
    - BuildRowsPartitioned from the builder is equivalent to BuildRows in
    the join node and hence that counter has been moved to the builders.
    
    - Also fixed a bug in RuntimeProfile where 'non-child' and '%non-child'
    were not computed after a profile object was created from its thrift
    representation.
    
    An example of the new profiles:
    
    HASH_JOIN_NODE (id=2):(Total: 147.531ms, non-child: 2.282ms, % non-child: 0.80%)
      ExecOption: Codegen Disabled: disabled due to optimization hints, Join Build-Side Prepared Asynchronously
      Node Lifecycle Event Timeline: 148.284ms
         - Open Started: 1.511ms (1.511ms)
         - Open Finished: 147.923ms (146.411ms)
         - First Batch Requested: 147.946ms (23.488us)
         - First Batch Returned: 148.137ms (190.470us)
         - Last Batch Returned: 148.207ms (69.869us)
         - Closed: 148.284ms (77.131us)
       - PeakMemoryUsage: 1.98 MB (2074880)
       - ProbeRows: 31 (31)
       - ProbeRowsPartitioned: 0 (0)
       - ProbeTime: 25.579us
       - RowsReturned: 31 (31)
       - RowsReturnedRate: 210.00 /sec
      Buffer pool:
         - AllocTime: 31.986us
         - CompressionTime: 0.000ns
         - CumulativeAllocationBytes: 1.00 MB (1048576)
         - CumulativeAllocations: 16 (16)
         - EncryptionTime: 0.000ns
         - PeakReservation: 1.94 MB (2031616)
         - PeakUnpinnedBytes: 0
         - PeakUsedReservation: 1.00 MB (1048576)
         - ReadIoBytes: 0
         - ReadIoOps: 0 (0)
         - ReadIoWaitTime: 0.000ns
         - SystemAllocTime: 23.944us
         - WriteIoBytes: 0
         - WriteIoOps: 0 (0)
         - WriteIoWaitTime: 0.000ns
      Hash Join Builder (join_node_id=2):(Total: 1.617ms, non-child: 1.617ms, % non-child: 100.00%)
        ExecOption: Codegen Disabled: disabled due to optimization hints
        Runtime filters: 1 of 1 Runtime Filter Published
         - BuildRows: 31 (31)
         - BuildRowsPartitionTime: 153.742us
         - HashTablesBuildTime: 361.306us
         - LargestPartitionPercent: 9 (9)
         - MaxPartitionLevel: 0 (0)
         - NumHashTableBuildsSkipped: 0 (0)
         - NumRepartitions: 0 (0)
         - PartitionsCreated: 16 (16)
         - PeakMemoryUsage: 17.12 KB (17536)
         - RepartitionTime: 0.000ns
         - SpilledPartitions: 0 (0)
        Hash Table:
           - HashBuckets: 48 (48)
           - HashCollisions: 0 (0)
           - Probes: 62 (62)
           - Resizes: 0 (0)
           - Travel: 42 (42)
      EXCHANGE_NODE (id=4):(Total: 138.859ms, non-child: 81.310us, % non-child: 0.06%)
    
    NESTED_LOOP_JOIN_NODE (id=3):(Total: 1s915ms, non-child: 906.707ms, % non-child: 47.34%)
      ExecOption: Join Build-Side Prepared Asynchronously
      Node Lifecycle Event Timeline: 2s190ms
         - Open Started: 254.677ms (254.677ms)
         - Open Finished: 2s016ms (1s762ms)
         - First Batch Requested: 2s016ms (5.231us)
         - First Batch Returned: 2s017ms (497.212us)
         - Last Batch Returned: 2s186ms (169.010ms)
         - Closed: 2s190ms (3.570ms)
       - PeakMemoryUsage: 51.75 MB (54263808)
       - ProbeRows: 3 (3)
       - ProbeTime: 0.000ns
       - RowsReturned: 301.15K (301146)
       - RowsReturnedRate: 157.24 K/sec
      Nested Loop Join Builder:(Total: 753.036ms, non-child: 753.036ms, % non-child: 100.00%)
         - BuildRows: 100.38K (100382)
         - PeakMemoryUsage: 51.72 MB (54235136)
    
    Change-Id: I604075a2c8efcff26705fb39672f29f309b2ed97
    Reviewed-on: http://gerrit.cloudera.org:8080/15663
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/blocking-join-node.cc            | 24 ++----------
 be/src/exec/blocking-join-node.h             |  2 -
 be/src/exec/join-builder.h                   |  4 ++
 be/src/exec/nested-loop-join-builder.cc      |  3 ++
 be/src/exec/partitioned-hash-join-builder.cc | 57 +++++++++++++++++++++-------
 be/src/exec/partitioned-hash-join-builder.h  | 37 +++++++++++-------
 be/src/exec/partitioned-hash-join-node.cc    | 16 ++++----
 be/src/util/runtime-profile.cc               |  2 +
 8 files changed, 87 insertions(+), 58 deletions(-)

diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 97c6fe6..7447c93 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -75,9 +75,7 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
 
-  build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
   probe_timer_ = ADD_TIMER(runtime_profile(), "ProbeTime");
-  build_row_counter_ = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT);
   probe_row_counter_ = ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT);
 
   // The right child (if present) must match the build row layout.
@@ -245,7 +243,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     RETURN_IF_ERROR(AcquireResourcesForBuild(state));
     {
       SCOPED_TIMER(runtime_profile_->inactive_timer());
-      events_->MarkEvent("Waiting for builder");
+      events_->MarkEvent("Waiting for initial build");
       RETURN_IF_ERROR(build_sink->WaitForInitialBuild(state));
       events_->MarkEvent("Initial build available");
     }
@@ -333,35 +331,21 @@ template <bool ASYNC_BUILD>
 Status BlockingJoinNode::SendBuildInputToSink(
     RuntimeState* state, JoinBuilder* build_sink) {
   DCHECK(!UseSeparateBuild(state->query_options()));
-  {
-    SCOPED_TIMER(build_timer_);
-    RETURN_IF_ERROR(build_sink->Open(state));
-  }
-
+  RETURN_IF_ERROR(build_sink->Open(state));
   DCHECK_EQ(build_batch_->num_rows(), 0);
   bool eos = false;
   do {
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
-
     {
       CONDITIONAL_SCOPED_CONCURRENT_STOP_WATCH(
           &built_probe_overlap_stop_watch_, ASYNC_BUILD);
       RETURN_IF_ERROR(child(1)->GetNext(state, build_batch_.get(), &eos));
     }
-    COUNTER_ADD(build_row_counter_, build_batch_->num_rows());
-
-    {
-      SCOPED_TIMER(build_timer_);
-      RETURN_IF_ERROR(build_sink->Send(state, build_batch_.get()));
-    }
+    RETURN_IF_ERROR(build_sink->Send(state, build_batch_.get()));
     build_batch_->Reset();
   } while (!eos);
-
-  {
-    SCOPED_TIMER(build_timer_);
-    RETURN_IF_ERROR(build_sink->FlushFinal(state));
-  }
+  RETURN_IF_ERROR(build_sink->FlushFinal(state));
   return Status::OK();
 }
 
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 39211de..eb4f53d 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -142,9 +142,7 @@ class BlockingJoinNode : public ExecNode {
   /// so this tuple is temporarily assembled for evaluating the conjuncts.
   TupleRow* semi_join_staging_row_;
 
-  RuntimeProfile::Counter* build_timer_;   // time to prepare build side
   RuntimeProfile::Counter* probe_timer_;   // time to process the probe (left child) batch
-  RuntimeProfile::Counter* build_row_counter_;   // num build rows
   RuntimeProfile::Counter* probe_row_counter_;   // num probe (left child) rows
 
   /// Stopwatch that measures the build child's Open/GetNext time that overlaps
diff --git a/be/src/exec/join-builder.h b/be/src/exec/join-builder.h
index ef7b7a8..c4ed213 100644
--- a/be/src/exec/join-builder.h
+++ b/be/src/exec/join-builder.h
@@ -151,6 +151,10 @@ class JoinBuilder : public DataSink {
   /// is embedded in a PartitionedHashJoinNode.
   const bool is_separate_build_;
 
+
+  /// Number of build rows. Initialized in Prepare().
+  RuntimeProfile::Counter* num_build_rows_ = nullptr;
+
   /////////////////////////////////////////////////////////////////////
   /// BEGIN: Members that are used only when is_separate_build_ is true
 
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index 3b9cd11..56fdeaa 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -71,6 +71,7 @@ NljBuilder::~NljBuilder() {}
 
 Status NljBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
+  num_build_rows_ = ADD_COUNTER(profile(), "BuildRows", TUnit::UNIT);
   return Status::OK();
 }
 
@@ -81,6 +82,7 @@ Status NljBuilder::Open(RuntimeState* state) {
 
 Status NljBuilder::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
+  int num_input_rows = batch->num_rows();
   // Swap the contents of the batch into a batch owned by the builder.
   RowBatch* build_batch = GetNextEmptyBatch();
   build_batch->AcquireState(batch);
@@ -99,6 +101,7 @@ Status NljBuilder::Send(RuntimeState* state, RowBatch* batch) {
     // is fixed.
     RETURN_IF_ERROR(DeepCopyBuildBatches(state));
   }
+  COUNTER_ADD(num_build_rows_, num_input_rows);
   return Status::OK();
 }
 
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 2459db2..6666ede 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -257,8 +257,7 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker)
       profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT);
   max_partition_level_ =
       profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT);
-  num_build_rows_partitioned_ =
-      ADD_COUNTER(profile(), "BuildRowsPartitioned", TUnit::UNIT);
+  num_build_rows_ = ADD_COUNTER(profile(), "BuildRows", TUnit::UNIT);
   ht_stats_profile_ = HashTable::AddHashTableCounters(profile());
   num_spilled_partitions_ = ADD_COUNTER(profile(), "SpilledPartitions", TUnit::UNIT);
   num_repartitions_ = ADD_COUNTER(profile(), "NumRepartitions", TUnit::UNIT);
@@ -305,6 +304,12 @@ Status PhjBuilder::Open(RuntimeState* state) {
 Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
   SCOPED_TIMER(partition_build_rows_timer_);
+  RETURN_IF_ERROR(AddBatch(batch));
+  COUNTER_ADD(num_build_rows_, batch->num_rows());
+  return Status::OK();
+}
+
+Status PhjBuilder::AddBatch(RowBatch* batch) {
   bool build_filters = ht_ctx_->level() == 0 && filter_ctxs_.size() > 0;
   if (process_build_batch_fn_ == nullptr) {
     RETURN_IF_ERROR(ProcessBuildBatch(batch, ht_ctx_.get(), build_filters,
@@ -313,23 +318,24 @@ Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) {
   } else {
     DCHECK(process_build_batch_fn_level0_ != nullptr);
     if (ht_ctx_->level() == 0) {
-      RETURN_IF_ERROR(
-          process_build_batch_fn_level0_(this, batch, ht_ctx_.get(), build_filters,
-              join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN));
+      RETURN_IF_ERROR(process_build_batch_fn_level0_(this, batch, ht_ctx_.get(),
+          build_filters, join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN));
     } else {
       RETURN_IF_ERROR(process_build_batch_fn_(this, batch, ht_ctx_.get(), build_filters,
           join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN));
     }
   }
-
   // Free any expr result allocations made during partitioning.
   expr_results_pool_->Clear();
-  COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows());
   return Status::OK();
 }
 
 Status PhjBuilder::FlushFinal(RuntimeState* state) {
   SCOPED_TIMER(profile()->total_time_counter());
+  return FinalizeBuild(state);
+}
+
+Status PhjBuilder::FinalizeBuild(RuntimeState* state) {
   int64_t num_build_rows = 0;
   for (const unique_ptr<Partition>& partition : hash_partitions_) {
     num_build_rows += partition->build_rows()->num_rows();
@@ -692,7 +698,7 @@ int PhjBuilder::GetNumSpilledPartitions(const vector<unique_ptr<Partition>>& par
 
 Status PhjBuilder::DoneProbingHashPartitions(
     const int64_t num_spilled_probe_rows[PARTITION_FANOUT],
-    BufferPool::ClientHandle* probe_client,
+    BufferPool::ClientHandle* probe_client, RuntimeProfile* probe_profile,
     deque<unique_ptr<Partition>>* output_partitions, RowBatch* batch) {
   DCHECK_EQ(is_separate_build_, probe_client != buffer_pool_client_);
   DCHECK(output_partitions->empty());
@@ -710,6 +716,7 @@ Status PhjBuilder::DoneProbingHashPartitions(
   }
 
   if (num_probe_threads_ > 1) {
+    SCOPED_TIMER(probe_profile->inactive_timer());
     // TODO: IMPALA-9411: consider reworking to attach buffers to all output batches.
     RETURN_IF_ERROR(probe_barrier_->Wait([&]() {
       CleanUpHashPartitions(output_partitions, nullptr);
@@ -717,7 +724,12 @@ Status PhjBuilder::DoneProbingHashPartitions(
           << "Cannot share build for join modes that return rows from build partitions";
       return Status::OK();
     }));
+  } else if (is_separate_build_) {
+    SCOPED_TIMER(probe_profile->inactive_timer());
+    CleanUpHashPartitions(output_partitions, batch);
   } else {
+    // No need to activate probe's inactive timer, since the builder will be a child of
+    // the probe and its time will be subtracted from probe's total time.
     CleanUpHashPartitions(output_partitions, batch);
   }
 
@@ -732,6 +744,7 @@ Status PhjBuilder::DoneProbingHashPartitions(
 
 void PhjBuilder::CleanUpHashPartitions(
     deque<unique_ptr<Partition>>* output_partitions, RowBatch* batch) {
+  SCOPED_TIMER(profile()->total_time_counter());
   if (state_ == HashJoinState::REPARTITIONING_PROBE) {
     // Finished repartitioning this partition. Discard before pushing more spilled
     // partitions onto 'spilled_partitions_'.
@@ -767,13 +780,15 @@ void PhjBuilder::CleanUpHashPartitions(
 }
 
 Status PhjBuilder::DoneProbingSinglePartition(BufferPool::ClientHandle* probe_client,
-    deque<unique_ptr<Partition>>* output_partitions, RowBatch* batch) {
+    RuntimeProfile* probe_profile, deque<unique_ptr<Partition>>* output_partitions,
+    RowBatch* batch) {
   VLOG(3) << "PHJ(node_id=" << join_node_id_ << ") done probing single partition.";
   DCHECK_EQ(is_separate_build_, probe_client != buffer_pool_client_);
   // Calculate before popping off the last 'spilled_partition_'.
   int64_t probe_reservation = CalcProbeStreamReservation(state_);
   DCHECK_GE(probe_client->GetUnusedReservation(), probe_reservation);
   if (num_probe_threads_ > 1) {
+    SCOPED_TIMER(probe_profile->inactive_timer());
     // TODO: IMPALA-9411: consider reworking to attach buffers to all output batches.
     RETURN_IF_ERROR(probe_barrier_->Wait([&]() {
       CleanUpSinglePartition(output_partitions, nullptr);
@@ -781,7 +796,12 @@ Status PhjBuilder::DoneProbingSinglePartition(BufferPool::ClientHandle* probe_cl
           << "Cannot share build for join modes that return rows from build partitions";
       return Status::OK();
     }));
+  } else if (is_separate_build_) {
+    SCOPED_TIMER(probe_profile->inactive_timer());
+    CleanUpSinglePartition(output_partitions, batch);
   } else {
+    // No need to activate probe's inactive timer, since the builder will be a child of
+    // the probe and its time will be subtracted from probe's total time.
     CleanUpSinglePartition(output_partitions, batch);
   }
   if (is_separate_build_) {
@@ -795,6 +815,7 @@ Status PhjBuilder::DoneProbingSinglePartition(BufferPool::ClientHandle* probe_cl
 
 void PhjBuilder::CleanUpSinglePartition(
     deque<unique_ptr<Partition>>* output_partitions, RowBatch* batch) {
+  SCOPED_TIMER(profile()->total_time_counter());
   if (NeedToProcessUnmatchedBuildRows(join_op_)) {
     DCHECK_LE(num_probe_threads_, 1)
         << "Don't support returning build partitions with shared build";
@@ -893,15 +914,22 @@ void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
   }
 }
 
-Status PhjBuilder::BeginSpilledProbe(
-    BufferPool::ClientHandle* probe_client, bool* repartitioned,
-     Partition** input_partition, HashPartitions* new_partitions) {
+Status PhjBuilder::BeginSpilledProbe(BufferPool::ClientHandle* probe_client,
+    RuntimeProfile* probe_profile, bool* repartitioned, Partition** input_partition,
+    HashPartitions* new_partitions) {
   DCHECK_EQ(is_separate_build_, probe_client != buffer_pool_client_);
   DCHECK(!spilled_partitions_.empty());
   DCHECK_EQ(0, hash_partitions_.size());
+
   if (num_probe_threads_ > 1) {
+    SCOPED_TIMER(probe_profile->inactive_timer());
     RETURN_IF_ERROR(probe_barrier_->Wait([&]() { return BeginSpilledProbeSerial(); }));
+  } else if (is_separate_build_) {
+    SCOPED_TIMER(probe_profile->inactive_timer());
+    RETURN_IF_ERROR(BeginSpilledProbeSerial());
   } else {
+    // No need to activate probe's inactive timer, since the builder will be a child of
+    // the probe and its time will be subtracted from probe's total time.
     RETURN_IF_ERROR(BeginSpilledProbeSerial());
   }
 
@@ -919,6 +947,7 @@ Status PhjBuilder::BeginSpilledProbe(
 }
 
 Status PhjBuilder::BeginSpilledProbeSerial() {
+  SCOPED_TIMER(profile()->total_time_counter());
   DCHECK_EQ(0, probe_stream_reservation_.GetReservation());
   if (is_separate_build_ || join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     DCHECK_EQ(0, buffer_pool_client_->GetUsedReservation())
@@ -1030,13 +1059,13 @@ Status PhjBuilder::RepartitionBuildInput(Partition* input_partition) {
     RETURN_IF_ERROR(state->CheckQueryState());
 
     RETURN_IF_ERROR(build_rows->GetNext(&build_batch, &eos));
-    RETURN_IF_ERROR(Send(state, &build_batch));
+    RETURN_IF_ERROR(AddBatch(&build_batch));
     build_batch.Reset();
   }
 
   // Done reading the input, we can safely close it now to free memory.
   input_partition->Close(nullptr);
-  RETURN_IF_ERROR(FlushFinal(state));
+  RETURN_IF_ERROR(FinalizeBuild(state));
   return Status::OK();
 }
 
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 22a4127..d3ee104 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -331,10 +331,12 @@ class PhjBuilder : public JoinBuilder {
   /// previous hash partitions must have been cleared with DoneProbingHashPartitions().
   /// The new hash partitions are returned in 'new_partitions'.
   ///
-  /// This is a synchronization point for shared join build. All probe threads must
-  /// call this function before continuing the next phase of the hash join algorithm.
-  Status BeginSpilledProbe(BufferPool::ClientHandle* probe_client, bool* repartitioned,
-      Partition** input_partition, HashPartitions* new_partitions);
+  /// This is a synchronization point for shared join build. The time elapsed during the
+  /// serial execution phase is attributed to the builder. All probe threads must call
+  /// this function before continuing the next phase of the hash join algorithm.
+  Status BeginSpilledProbe(BufferPool::ClientHandle* probe_client,
+      RuntimeProfile* probe_profile, bool* repartitioned, Partition** input_partition,
+      HashPartitions* new_partitions);
 
   /// Called after probing of the hash partitions returned by BeginInitialProbe() or
   /// BeginSpilledProbe() (when *repartitioned is true) is complete, i.e. all of the
@@ -351,10 +353,11 @@ class PhjBuilder : public JoinBuilder {
   ///
   /// Returns an error if an error was encountered or if the query was cancelled.
   ///
-  /// This is a synchronization point for shared join build. All probe threads must
-  /// call this function before continuing the next phase of the hash join algorithm.
+  /// This is a synchronization point for shared join build. The time elapsed during the
+  /// serial execution phase is attributed to the builder. All probe threads must call
+  /// this function before continuing the next phase of the hash join algorithm.
   Status DoneProbingHashPartitions(const int64_t num_spilled_probe_rows[PARTITION_FANOUT],
-      BufferPool::ClientHandle* probe_client,
+      BufferPool::ClientHandle* probe_client, RuntimeProfile* probe_profile,
       std::deque<std::unique_ptr<Partition>>* output_partitions, RowBatch* batch);
 
   /// Called after probing of a single spilled partition returned by
@@ -374,9 +377,11 @@ class PhjBuilder : public JoinBuilder {
   ///
   /// Returns an error if an error was encountered or if the query was cancelled.
   ///
-  /// This is a synchronization point for shared join build. All probe threads must
-  /// call this function before continuing the next phase of the hash join algorithm.
+  /// This is a synchronization point for shared join build. The time elapsed during the
+  /// serial execution phase is attributed to the builder. All probe threads must call
+  /// this function before continuing the next phase of the hash join algorithm.
   Status DoneProbingSinglePartition(BufferPool::ClientHandle* probe_client,
+      RuntimeProfile* probe_profile,
       std::deque<std::unique_ptr<Partition>>* output_partitions, RowBatch* batch);
 
   /// Close the null aware partition (if there is one) and set it to NULL.
@@ -574,8 +579,15 @@ class PhjBuilder : public JoinBuilder {
   /// 'build_filters' is true, runtime filters are populated. 'is_null_aware' is
   /// set to true if the join type is a null aware join.
   Status ProcessBuildBatch(
-      RowBatch* build_batch, HashTableCtx* ctx, bool build_filters,
-      bool is_null_aware) WARN_UNUSED_RESULT;
+      RowBatch* build_batch, HashTableCtx* ctx, bool build_filters, bool is_null_aware);
+
+  /// Helper method for Send() that that does the actual work apart from updating the
+  /// counters. Also used by RepartitionBuildInput().
+  Status AddBatch(RowBatch* build_batch);
+
+  /// Helper method for FlushFinal() that does the actual work. Also used by
+  /// RepartitionBuildInput().
+  Status FinalizeBuild(RuntimeState* state);
 
   /// Append 'row' to 'stream'. In the common case, appending the row to the stream
   /// immediately succeeds. Otherwise this function falls back to the slower path of
@@ -757,9 +769,6 @@ class PhjBuilder : public JoinBuilder {
   /// Level of max partition (i.e. number of repartitioning steps).
   RuntimeProfile::HighWaterMarkCounter* max_partition_level_ = nullptr;
 
-  /// Number of build rows that have been partitioned.
-  RuntimeProfile::Counter* num_build_rows_partitioned_ = nullptr;
-
   /// Number of partitions that have been spilled.
   RuntimeProfile::Counter* num_spilled_partitions_ = nullptr;
 
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index bdc5a4e..76fd3a3 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -436,8 +436,8 @@ Status PartitionedHashJoinNode::BeginSpilledProbe() {
 
   PhjBuilder::Partition* build_input_partition;
   bool repartitioned;
-  RETURN_IF_ERROR(builder_->BeginSpilledProbe(buffer_pool_client(), &repartitioned,
-      &build_input_partition, &build_hash_partitions_));
+  RETURN_IF_ERROR(builder_->BeginSpilledProbe(buffer_pool_client(), runtime_profile(),
+      &repartitioned, &build_input_partition, &build_hash_partitions_));
 
   auto it = spilled_partitions_.find(build_input_partition->id());
   DCHECK(it != spilled_partitions_.end())
@@ -1163,9 +1163,9 @@ Status PartitionedHashJoinNode::DoneProbing(RuntimeState* state, RowBatch* batch
   if (builder_->state() == HashJoinState::PROBING_SPILLED_PARTITION) {
     // Need to clean up single in-memory build partition instead of hash partitions.
     DCHECK(build_hash_partitions_.hash_partitions == nullptr);
-    RETURN_IF_ERROR(builder_->DoneProbingSinglePartition(
-          buffer_pool_client(), &output_build_partitions_,
-        IsLeftSemiJoin(join_op_) ? nullptr : batch));
+    RETURN_IF_ERROR(
+        builder_->DoneProbingSinglePartition(buffer_pool_client(), runtime_profile(),
+            &output_build_partitions_, IsLeftSemiJoin(join_op_) ? nullptr : batch));
   } else {
     // Walk the partitions that had hash tables built for the probe phase and either
     // close them or move them to 'spilled_partitions_'.
@@ -1207,9 +1207,9 @@ Status PartitionedHashJoinNode::DoneProbing(RuntimeState* state, RowBatch* batch
     }
     probe_hash_partitions_.clear();
     build_hash_partitions_.Reset();
-    RETURN_IF_ERROR(
-        builder_->DoneProbingHashPartitions(num_spilled_probe_rows, buffer_pool_client(),
-          &output_build_partitions_, IsLeftSemiJoin(join_op_) ? nullptr : batch));
+    RETURN_IF_ERROR(builder_->DoneProbingHashPartitions(num_spilled_probe_rows,
+        buffer_pool_client(), runtime_profile(), &output_build_partitions_,
+        IsLeftSemiJoin(join_op_) ? nullptr : batch));
   }
   if (!output_build_partitions_.empty()) {
     DCHECK(output_unmatched_batch_iter_.get() == nullptr);
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 923a919..c8605a1 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -233,6 +233,8 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
     bool indent = nodes[*idx].indent;
     profile->AddChild(RuntimeProfile::CreateFromThrift(pool, nodes, idx), indent);
   }
+  // Compute timers that are not serialized to the thrift.
+  profile->ComputeTimeInProfile();
   return profile;
 }