You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/01/31 06:41:29 UTC

[impala] branch master updated: IMPALA-4080 [part 1]: Move codegen code from aggregation exec nodes to their plan nodes

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ba00551  IMPALA-4080 [part 1]: Move codegen code from aggregation exec nodes to their plan nodes
ba00551 is described below

commit ba00551581b9ba87fc478eaa16ecaab043d378f5
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Mon Jan 13 11:52:04 2020 -0800

    IMPALA-4080 [part 1]: Move codegen code from aggregation exec nodes to
    their plan nodes
    
    Refactored code to move codegen code from aggregation exec nodes to
    their plan nodes. Added some TODOs that will be fixed in the next few
    patch.
    
    Testing:
    - Ran queries and confirmed manually that the codegened code works.
    - Ran all e2e tests for agg nodes and partition joins.
    
    Change-Id: I58f52a262ac7d0af259d5bcda72ada93a851d3b2
    Reviewed-on: http://gerrit.cloudera.org:8080/15053
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/aggregation-node-base.cc         |  10 +-
 be/src/exec/aggregator.cc                    |  23 ++--
 be/src/exec/aggregator.h                     |  54 +++++----
 be/src/exec/grouping-aggregator.cc           |  61 ++++++----
 be/src/exec/grouping-aggregator.h            |  71 ++++++++----
 be/src/exec/hash-table.cc                    | 163 +++++++++++++++++++--------
 be/src/exec/hash-table.h                     |  69 ++++++++++--
 be/src/exec/non-grouping-aggregator.cc       |  28 +++--
 be/src/exec/non-grouping-aggregator.h        |  42 +++++--
 be/src/exec/partitioned-hash-join-builder.cc |  44 +++++---
 be/src/exec/partitioned-hash-join-builder.h  |   9 +-
 be/src/exec/partitioned-hash-join-node.cc    |  20 +++-
 be/src/exprs/expr-test.cc                    |  12 +-
 be/src/exprs/scalar-expr.cc                  |  31 +++--
 be/src/exprs/scalar-expr.h                   |  35 +++---
 15 files changed, 448 insertions(+), 224 deletions(-)

diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc
index c2da44f..3f8512a 100644
--- a/be/src/exec/aggregation-node-base.cc
+++ b/be/src/exec/aggregation-node-base.cc
@@ -35,9 +35,9 @@ Status AggregationPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
     const TAggregator& agg = tnode_->agg_node.aggregators[i];
     AggregatorConfig* node = nullptr;
     if (agg.grouping_exprs.empty()) {
-      node = state->obj_pool()->Add(new AggregatorConfig(agg, state, this));
+      node = state->obj_pool()->Add(new NonGroupingAggregatorConfig(agg, state, this, i));
     } else {
-      node = state->obj_pool()->Add(new GroupingAggregatorConfig(agg, state, this));
+      node = state->obj_pool()->Add(new GroupingAggregatorConfig(agg, state, this, i));
     }
     DCHECK(node != nullptr);
     aggs_.push_back(node);
@@ -67,13 +67,15 @@ AggregationNodeBase::AggregationNodeBase(
     const AggregatorConfig* agg = pnode.aggs_[i];
     unique_ptr<Aggregator> node;
     if (agg->grouping_exprs_.empty()) {
-      node.reset(new NonGroupingAggregator(this, pool_, *agg, i));
+      const NonGroupingAggregatorConfig* non_grouping_config =
+          static_cast<const NonGroupingAggregatorConfig*>(agg);
+      node.reset(new NonGroupingAggregator(this, pool_, *non_grouping_config));
     } else {
       const GroupingAggregatorConfig* grouping_config =
           static_cast<const GroupingAggregatorConfig*>(agg);
       DCHECK(grouping_config != nullptr);
       node.reset(new GroupingAggregator(this, pool_, *grouping_config,
-          pnode.tnode_->agg_node.estimated_input_cardinality, i));
+          pnode.tnode_->agg_node.estimated_input_cardinality));
     }
     aggs_.push_back(std::move(node));
     runtime_profile_->AddChild(aggs_[i]->runtime_profile());
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index 939888f..b5e88c7 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -42,8 +42,9 @@
 namespace impala {
 
 AggregatorConfig::AggregatorConfig(
-    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode)
-  : intermediate_tuple_id_(taggregator.intermediate_tuple_id),
+    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx)
+  : agg_idx_(agg_idx),
+    intermediate_tuple_id_(taggregator.intermediate_tuple_id),
     intermediate_tuple_desc_(
         state->desc_tbl().GetTupleDescriptor(intermediate_tuple_id_)),
     output_tuple_id_(taggregator.output_tuple_id),
@@ -75,10 +76,10 @@ Status AggregatorConfig::Init(
 const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
 
 Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool,
-    const AggregatorConfig& config, const std::string& name, int agg_idx)
+    const AggregatorConfig& config, const std::string& name)
   : id_(exec_node->id()),
     exec_node_(exec_node),
-    agg_idx_(agg_idx),
+    agg_idx_(config.agg_idx_),
     pool_(pool),
     intermediate_tuple_id_(config.intermediate_tuple_id_),
     intermediate_tuple_desc_(config.intermediate_tuple_desc_),
@@ -299,7 +300,7 @@ Status Aggregator::QueryMaintenance(RuntimeState* state) {
 //   ret void
 // }
 //
-Status Aggregator::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
+Status AggregatorConfig::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
     SlotDescriptor* slot_desc, llvm::Function** fn) {
   llvm::PointerType* agg_fn_eval_type = codegen->GetStructPtrType<AggFnEvaluator>();
   llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
@@ -327,7 +328,7 @@ Status Aggregator::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
       IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg,
       "input_evals_vector");
 
-  AggFn* agg_fn = agg_fns_[agg_fn_idx];
+  AggFn* agg_fn = aggregate_functions_[agg_fn_idx];
   const int num_inputs = agg_fn->GetNumChildren();
   DCHECK_GE(num_inputs, 1);
   vector<CodegenAnyVal> input_vals;
@@ -470,7 +471,7 @@ Status Aggregator::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
   return Status::OK();
 }
 
-Status Aggregator::CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder,
+Status AggregatorConfig::CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder,
     AggFn* agg_fn, llvm::Value* agg_fn_ctx_val, const vector<CodegenAnyVal>& input_vals,
     const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) {
   llvm::Function* uda_fn;
@@ -540,7 +541,7 @@ Status Aggregator::CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder,
 //   ret void
 // }
 //
-Status Aggregator::CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) {
+Status AggregatorConfig::CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) {
   for (const SlotDescriptor* slot_desc : intermediate_tuple_desc_->slots()) {
     if (slot_desc->type().type == TYPE_CHAR) {
       return Status::Expected("Aggregator::CodegenUpdateTuple(): cannot "
@@ -582,9 +583,9 @@ Status Aggregator::CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn)
   // Loop over each expr and generate the IR for that slot.  If the expr is not
   // count(*), generate a helper IR function to update the slot and call that.
   int j = GetNumGroupingExprs();
-  for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
+  for (int i = 0; i < aggregate_functions_.size(); ++i, ++j) {
     SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
-    AggFn* agg_fn = agg_fns_[i];
+    AggFn* agg_fn = aggregate_functions_[i];
     if (agg_fn->is_count_star()) {
       // TODO: we should be able to hoist this up to the loop over the batch and just
       // increment the slot by the number of rows in the batch.
@@ -613,7 +614,7 @@ Status Aggregator::CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn)
 
   // Avoid inlining big UpdateTuple function into outer loop - we're unlikely to get
   // any benefit from it since the function call overhead will be amortized.
-  if (agg_fns_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
+  if (aggregate_functions_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
     codegen->SetNoInline(*fn);
   }
 
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index e2fd326..15f43cd 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -58,12 +58,19 @@ class TupleRow;
 /// structure. It serves as an input for creating instances of the Aggregator class.
 class AggregatorConfig {
  public:
+  /// 'agg_idx' is the index of 'TAggregator' in the parent TAggregationNode.
   AggregatorConfig(
-      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode);
+      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx);
   virtual Status Init(
       const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode);
+  virtual Status Codegen(RuntimeState* state) = 0;
   virtual ~AggregatorConfig() {}
 
+  /// The index of this Aggregator within the AggregationNode which is also equivalent to
+  /// its corresponding TAggregator in the parent TAggregationNode. When returning output,
+  /// this Aggregator should only write tuples at 'agg_idx_' within the row.
+  const int agg_idx_;
+
   /// Tuple into which Update()/Merge()/Serialize() results are stored.
   TupleId intermediate_tuple_id_;
   TupleDescriptor* intermediate_tuple_desc_;
@@ -91,6 +98,27 @@ class AggregatorConfig {
 
   /// The list of all aggregate operations for this aggregator.
   std::vector<AggFn*> aggregate_functions_;
+
+ protected:
+  /// Codegen for updating aggregate expressions aggregate_functions_[agg_fn_idx]
+  /// and returns the IR function in 'fn'. Returns non-OK status if codegen
+  /// is unsuccessful.
+  Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
+      SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT;
+
+  /// Codegen a call to a function implementing the UDA interface with input values
+  /// from 'input_vals'. 'dst_val' should contain the previous value of the aggregate
+  /// function, and 'updated_dst_val' is set to the new value after the Update or Merge
+  /// operation is applied. The instruction sequence for the UDA call is inserted at
+  /// the insert position of 'builder'.
+  Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* agg_fn,
+      llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& input_vals,
+      const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) WARN_UNUSED_RESULT;
+
+  /// Codegen Aggregator::UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
+  Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT;
+
+  virtual int GetNumGroupingExprs() = 0;
 };
 
 /// Base class for aggregating rows. Used in the AggregationNode and
@@ -100,9 +128,8 @@ class AggregatorConfig {
 /// be called and the results can be fetched with GetNext().
 class Aggregator {
  public:
-  /// 'agg_idx' is the index of 'taggregator' in the parent TAggregationNode.
   Aggregator(ExecNode* exec_node, ObjectPool* pool, const AggregatorConfig& config,
-      const std::string& name, int agg_idx);
+      const std::string& name);
   virtual ~Aggregator();
 
   /// Aggregators follow the same lifecycle as ExecNodes, except that after Open() and
@@ -229,7 +256,8 @@ class Aggregator {
   /// is_merge() == true.
   /// This function is replaced by codegen (which is why we don't use a vector argument
   /// for agg_fn_evals).. Any var-len data is allocated from the FunctionContexts.
-  /// TODO: Fix the arguments order. Need to update CodegenUpdateTuple() too.
+  /// TODO: Fix the arguments order. Need to update AggregatorConfig::CodegenUpdateTuple()
+  /// too.
   void UpdateTuple(AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row,
       bool is_merge = false) noexcept;
 
@@ -250,24 +278,6 @@ class Aggregator {
   /// should not be called outside the main execution thread.
   /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more details.
   Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
-
-  /// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx]
-  /// and returns the IR function in 'fn'. Returns non-OK status if codegen
-  /// is unsuccessful.
-  Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
-      SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT;
-
-  /// Codegen a call to a function implementing the UDA interface with input values
-  /// from 'input_vals'. 'dst_val' should contain the previous value of the aggregate
-  /// function, and 'updated_dst_val' is set to the new value after the Update or Merge
-  /// operation is applied. The instruction sequence for the UDA call is inserted at
-  /// the insert position of 'builder'.
-  Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* agg_fn,
-      llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& input_vals,
-      const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) WARN_UNUSED_RESULT;
-
-  /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
-  Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT;
 };
 } // namespace impala
 
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 6374f80..aeeaf30 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -83,8 +83,8 @@ static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
 };
 
 GroupingAggregatorConfig::GroupingAggregatorConfig(
-    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode)
-  : AggregatorConfig(taggregator, state, pnode),
+    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx)
+  : AggregatorConfig(taggregator, state, pnode, agg_idx),
     intermediate_row_desc_(intermediate_tuple_desc_, false),
     is_streaming_preagg_(taggregator.use_streaming_preaggregation),
     resource_profile_(taggregator.resource_profile){};
@@ -113,6 +113,9 @@ Status GroupingAggregatorConfig::Init(
   for (int i = 0; i < aggregate_functions_.size(); ++i) {
     needs_serialize_ |= aggregate_functions_[i]->SupportsSerialize();
   }
+
+  hash_table_config_ = state->obj_pool()->Add(new HashTableConfig(
+      build_exprs_, grouping_exprs_, true, vector<bool>(build_exprs_.size(), true)));
   return Status::OK();
 }
 
@@ -120,10 +123,11 @@ static const int STREAMING_HT_MIN_REDUCTION_SIZE =
     sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
 GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-    const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
-    int agg_idx)
+    const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality)
   : Aggregator(
-        exec_node, pool, config, Substitute("GroupingAggregator $0", agg_idx), agg_idx),
+        exec_node, pool, config, Substitute("GroupingAggregator $0", config.agg_idx_)),
+    agg_config_(config),
+    hash_table_config_(*config.hash_table_config_),
     intermediate_row_desc_(config.intermediate_row_desc_),
     is_streaming_preagg_(config.is_streaming_preagg_),
     needs_serialize_(config.needs_serialize_),
@@ -133,6 +137,8 @@ GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
     resource_profile_(config.resource_profile_),
     is_in_subplan_(exec_node->IsInSubplan()),
     limit_(exec_node->limit()),
+    add_batch_impl_fn_(config.add_batch_impl_fn_),
+    add_batch_streaming_impl_fn_(config.add_batch_streaming_impl_fn_),
     estimated_input_cardinality_(estimated_input_cardinality),
     partition_pool_(new ObjectPool()) {
   DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
@@ -183,13 +189,18 @@ Status GroupingAggregator::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
-void GroupingAggregator::Codegen(RuntimeState* state) {
+Status GroupingAggregatorConfig::Codegen(RuntimeState* state) {
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != nullptr);
   TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-  Status codegen_status = is_streaming_preagg_ ?
-      CodegenAddBatchStreamingImpl(codegen, prefetch_mode) :
-      CodegenAddBatchImpl(codegen, prefetch_mode);
+  return is_streaming_preagg_ ? CodegenAddBatchStreamingImpl(codegen, prefetch_mode) :
+                                CodegenAddBatchImpl(codegen, prefetch_mode);
+}
+
+void GroupingAggregator::Codegen(RuntimeState* state) {
+  // TODO: This const cast will be removed once codegen call is moved before FIS creation
+  Status codegen_status =
+      const_cast<GroupingAggregatorConfig&>(agg_config_).Codegen(state);
   runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
 }
 
@@ -984,7 +995,7 @@ BufferPool::ClientHandle* GroupingAggregator::buffer_pool_client() {
   return reservation_manager_.buffer_pool_client();
 }
 
-Status GroupingAggregator::CodegenAddBatchImpl(
+Status GroupingAggregatorConfig::CodegenAddBatchImpl(
     LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
   llvm::Function* update_tuple_fn;
   RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
@@ -1004,15 +1015,18 @@ Status GroupingAggregator::CodegenAddBatchImpl(
   // The codegen'd AddBatchImpl function is only used in Open() with level_ = 0,
   // so don't use murmur hash
   llvm::Function* hash_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, /* use murmur */ false, &hash_fn));
+  RETURN_IF_ERROR(
+      HashTableCtx::CodegenHashRow(codegen, false, *hash_table_config_, &hash_fn));
 
   // Codegen HashTable::Equals<true>
   llvm::Function* build_equals_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn));
+  RETURN_IF_ERROR(HashTableCtx::CodegenEquals(
+      codegen, true, *hash_table_config_, &build_equals_fn));
 
   // Codegen for evaluating input rows
   llvm::Function* eval_grouping_expr_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_grouping_expr_fn));
+  RETURN_IF_ERROR(HashTableCtx::CodegenEvalRow(
+      codegen, false, *hash_table_config_, &eval_grouping_expr_fn));
 
   // Replace call sites
   replaced =
@@ -1027,8 +1041,9 @@ Status GroupingAggregator::CodegenAddBatchImpl(
 
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = false;
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(
-      codegen, stores_duplicates, 1, add_batch_impl_fn, &replaced_constants));
+  RETURN_IF_ERROR(
+      HashTableCtx::ReplaceHashTableConstants(codegen, *hash_table_config_,
+          stores_duplicates, 1, add_batch_impl_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
   DCHECK_GE(replaced_constants.finds_some_nulls, 1);
   DCHECK_GE(replaced_constants.stores_duplicates, 1);
@@ -1048,7 +1063,7 @@ Status GroupingAggregator::CodegenAddBatchImpl(
   return Status::OK();
 }
 
-Status GroupingAggregator::CodegenAddBatchStreamingImpl(
+Status GroupingAggregatorConfig::CodegenAddBatchStreamingImpl(
     LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
   DCHECK(is_streaming_preagg_);
 
@@ -1073,15 +1088,18 @@ Status GroupingAggregator::CodegenAddBatchStreamingImpl(
 
   // We only use the top-level hash function for streaming aggregations.
   llvm::Function* hash_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, false, &hash_fn));
+  RETURN_IF_ERROR(
+      HashTableCtx::CodegenHashRow(codegen, false, *hash_table_config_, &hash_fn));
 
   // Codegen HashTable::Equals
   llvm::Function* equals_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &equals_fn));
+  RETURN_IF_ERROR(
+      HashTableCtx::CodegenEquals(codegen, true, *hash_table_config_, &equals_fn));
 
   // Codegen for evaluating input rows
   llvm::Function* eval_grouping_expr_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_grouping_expr_fn));
+  RETURN_IF_ERROR(HashTableCtx::CodegenEvalRow(
+      codegen, false, *hash_table_config_, &eval_grouping_expr_fn));
 
   // Replace call sites
   int replaced = codegen->ReplaceCallSites(
@@ -1100,8 +1118,9 @@ Status GroupingAggregator::CodegenAddBatchStreamingImpl(
 
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = false;
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(
-      codegen, stores_duplicates, 1, add_batch_streaming_impl_fn, &replaced_constants));
+  RETURN_IF_ERROR(
+      HashTableCtx::ReplaceHashTableConstants(codegen, *hash_table_config_,
+          stores_duplicates, 1, add_batch_streaming_impl_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
   DCHECK_GE(replaced_constants.finds_some_nulls, 1);
   DCHECK_GE(replaced_constants.stores_duplicates, 1);
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index 3671327..cc4c9c9 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -33,10 +33,12 @@
 namespace impala {
 
 class AggFnEvaluator;
+class GroupingAggregator;
 class PlanNode;
 class LlvmCodeGen;
 class RowBatch;
 class RuntimeState;
+struct ScalarExprsResultsRowLayout;
 class TAggregator;
 class Tuple;
 
@@ -117,10 +119,11 @@ class Tuple;
 class GroupingAggregatorConfig : public AggregatorConfig {
  public:
   GroupingAggregatorConfig(
-      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode);
-  virtual Status Init(
+      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx);
+  Status Init(
       const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) override;
-  ~GroupingAggregatorConfig() {}
+  Status Codegen(RuntimeState* state) override;
+  ~GroupingAggregatorConfig() override {}
 
   /// Row with the intermediate tuple as its only tuple.
   /// Construct a new row desc for preparing the build exprs because neither the child's
@@ -147,13 +150,44 @@ class GroupingAggregatorConfig : public AggregatorConfig {
   /// We need to do more work for var-len expressions when allocating and spilling rows.
   /// All var-len grouping exprs have type string.
   std::vector<int> string_grouping_exprs_;
+
+  /// Used for codegening hash table specific methods and to create the corresponding
+  /// instance of HashTableCtx.
+  const HashTableConfig* hash_table_config_;
+
+  typedef Status (*AddBatchImplFn)(
+      GroupingAggregator*, RowBatch*, TPrefetchMode::type, HashTableCtx*);
+  /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
+  AddBatchImplFn add_batch_impl_fn_ = nullptr;
+
+  typedef Status (*AddBatchStreamingImplFn)(GroupingAggregator*, int, bool,
+      TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, int[]);
+  /// Jitted AddBatchStreamingImpl function pointer. Null if codegen is disabled.
+  AddBatchStreamingImplFn add_batch_streaming_impl_fn_ = nullptr;
+
+ protected:
+  int GetNumGroupingExprs() override { return grouping_exprs_.size(); }
+
+ private:
+  /// Codegen the non-streaming add row batch loop. The loop has already been compiled to
+  /// IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR.
+  /// This function will modify the loop subsituting the statically compiled functions
+  /// with codegen'd ones. 'add_batch_impl_fn_' will be updated with the codegened
+  /// function.
+  /// Assumes AGGREGATED_ROWS = false.
+  Status CodegenAddBatchImpl(
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
+
+  /// Codegen the materialization loop for streaming preaggregations.
+  /// 'add_batch_streaming_impl_fn_' will be updated with the codegened function.
+  Status CodegenAddBatchStreamingImpl(
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
 };
 
 class GroupingAggregator : public Aggregator {
  public:
   GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-      const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
-      int agg_idx);
+      const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality);
 
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
@@ -177,6 +211,15 @@ class GroupingAggregator : public Aggregator {
  private:
   struct Partition;
 
+  /// TODO: Remove reference once codegen is performed before FIS creation.
+  /// Reference to the config object and only used to call Codegen().
+  const GroupingAggregatorConfig& agg_config_;
+
+  /// Reference to the hash table config which is a part of the GroupingAggregatorConfig
+  /// that was used to create this object. Its used to create an instance of the
+  /// HashTableCtx in Prepare(). Not Owned.
+  const HashTableConfig& hash_table_config_;
+
   /// Number of initial partitions to create. Must be a power of 2.
   static const int PARTITION_FANOUT = 16;
 
@@ -267,12 +310,12 @@ class GroupingAggregator : public Aggregator {
   typedef Status (*AddBatchImplFn)(
       GroupingAggregator*, RowBatch*, TPrefetchMode::type, HashTableCtx*);
   /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
-  AddBatchImplFn add_batch_impl_fn_ = nullptr;
+  const AddBatchImplFn& add_batch_impl_fn_;
 
   typedef Status (*AddBatchStreamingImplFn)(GroupingAggregator*, int, bool,
       TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, int[PARTITION_FANOUT]);
   /// Jitted AddBatchStreamingImpl function pointer.  Null if codegen is disabled.
-  AddBatchStreamingImplFn add_batch_streaming_impl_fn_ = nullptr;
+  const AddBatchStreamingImplFn& add_batch_streaming_impl_fn_ ;
 
   /// Total time spent resizing hash tables.
   RuntimeProfile::Counter* ht_resize_timer_ = nullptr;
@@ -632,20 +675,6 @@ class GroupingAggregator : public Aggregator {
   void CleanupHashTbl(
       const std::vector<AggFnEvaluator*>& agg_fn_evals, HashTable::Iterator it);
 
-  /// Codegen the non-streaming add row batch loop. The loop has already been compiled to
-  /// IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR.
-  /// This function will modify the loop subsituting the statically compiled functions
-  /// with codegen'd ones. 'add_batch_impl_fn_' will be updated with the codegened
-  // function.
-  /// Assumes AGGREGATED_ROWS = false.
-  Status CodegenAddBatchImpl(
-      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
-
-  /// Codegen the materialization loop for streaming preaggregations.
-  /// 'add_batch_streaming_impl_fn_' will be updated with the codegened function.
-  Status CodegenAddBatchStreamingImpl(
-      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
-
   /// Compute minimum buffer reservation for grouping aggregations.
   /// We need one buffer per partition, which is used either as the write buffer for the
   /// aggregated stream or the unaggregated stream. We need an additional buffer to read
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index bec3c4d..2e1060c 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -84,12 +84,27 @@ static int64_t NULL_VALUE[] = {
 static_assert(sizeof(NULL_VALUE) >= ColumnType::MAX_CHAR_LENGTH,
     "NULL_VALUE must be at least as large as the largest possible slot");
 
+HashTableConfig::HashTableConfig(const std::vector<ScalarExpr*>& build_exprs,
+    const std::vector<ScalarExpr*>& probe_exprs, const bool stores_nulls,
+    const std::vector<bool>& finds_nulls)
+  : build_exprs(build_exprs),
+    probe_exprs(probe_exprs),
+    stores_nulls(stores_nulls),
+    finds_nulls(finds_nulls),
+    finds_some_nulls(std::accumulate(
+        finds_nulls.begin(), finds_nulls.end(), false, std::logical_or<bool>())),
+    build_exprs_results_row_layout(build_exprs) {
+  DCHECK_EQ(build_exprs.size(), finds_nulls.size());
+  DCHECK_EQ(build_exprs.size(), probe_exprs.size());
+}
+
 HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
     const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
     const std::vector<bool>& finds_nulls, int32_t initial_seed,
     int max_levels, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
     MemPool* probe_expr_results_pool)
     : build_exprs_(build_exprs),
+      build_exprs_results_row_layout_(build_exprs_),
       probe_exprs_(probe_exprs),
       stores_nulls_(stores_nulls),
       finds_nulls_(finds_nulls),
@@ -117,6 +132,37 @@ HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
   }
 }
 
+HashTableCtx::HashTableCtx(const HashTableConfig& config, int32_t initial_seed,
+    int max_levels, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+    MemPool* probe_expr_results_pool)
+  : build_exprs_(config.build_exprs),
+    build_exprs_results_row_layout_(config.build_exprs_results_row_layout),
+    probe_exprs_(config.probe_exprs),
+    stores_nulls_(config.stores_nulls),
+    finds_nulls_(config.finds_nulls),
+    finds_some_nulls_(config.finds_some_nulls),
+    level_(0),
+    scratch_row_(NULL),
+    expr_perm_pool_(expr_perm_pool),
+    build_expr_results_pool_(build_expr_results_pool),
+    probe_expr_results_pool_(probe_expr_results_pool) {
+  DCHECK(!finds_some_nulls_ || stores_nulls_);
+  // Compute the layout and buffer size to store the evaluated expr results
+  DCHECK_EQ(build_exprs_.size(), probe_exprs_.size());
+  DCHECK_EQ(build_exprs_.size(), finds_nulls_.size());
+  DCHECK(!build_exprs_.empty());
+
+  // Populate the seeds to use for all the levels. TODO: revisit how we generate these.
+  DCHECK_GE(max_levels, 0);
+  DCHECK_LT(max_levels, sizeof(SEED_PRIMES) / sizeof(SEED_PRIMES[0]));
+  DCHECK_NE(initial_seed, 0);
+  seeds_.resize(max_levels + 1);
+  seeds_[0] = initial_seed;
+  for (int i = 1; i <= max_levels; ++i) {
+    seeds_[i] = seeds_[i - 1] * SEED_PRIMES[i];
+  }
+}
+
 Status HashTableCtx::Init(ObjectPool* pool, RuntimeState* state, int num_build_tuples) {
   int scratch_row_size = sizeof(Tuple*) * num_build_tuples;
   scratch_row_ = reinterpret_cast<TupleRow*>(malloc(scratch_row_size));
@@ -130,7 +176,8 @@ Status HashTableCtx::Init(ObjectPool* pool, RuntimeState* state, int num_build_t
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool, expr_perm_pool_,
       probe_expr_results_pool_, &probe_expr_evals_));
   DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size());
-  return expr_values_cache_.Init(state, expr_perm_pool_->mem_tracker(), build_exprs_);
+  return expr_values_cache_.Init(
+      state, expr_perm_pool_->mem_tracker(), build_exprs_results_row_layout_);
 }
 
 Status HashTableCtx::Create(ObjectPool* pool, RuntimeState* state,
@@ -145,6 +192,15 @@ Status HashTableCtx::Create(ObjectPool* pool, RuntimeState* state,
   return (*ht_ctx)->Init(pool, state, num_build_tuples);
 }
 
+Status HashTableCtx::Create(ObjectPool* pool, RuntimeState* state,
+    const HashTableConfig& config, int32_t initial_seed, int max_levels,
+    int num_build_tuples, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+    MemPool* probe_expr_results_pool, boost::scoped_ptr<HashTableCtx>* ht_ctx) {
+  ht_ctx->reset(new HashTableCtx(config, initial_seed, max_levels, expr_perm_pool,
+      build_expr_results_pool, probe_expr_results_pool));
+  return (*ht_ctx)->Init(pool, state, num_build_tuples);
+}
+
 Status HashTableCtx::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(build_expr_evals_, state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(probe_expr_evals_, state));
@@ -292,13 +348,14 @@ HashTableCtx::ExprValuesCache::ExprValuesCache()
     expr_values_hash_array_(NULL),
     null_bitmap_(0) {}
 
-Status HashTableCtx::ExprValuesCache::Init(RuntimeState* state,
-    MemTracker* tracker, const std::vector<ScalarExpr*>& build_exprs) {
+Status HashTableCtx::ExprValuesCache::Init(RuntimeState* state, MemTracker* tracker,
+    const ScalarExprsResultsRowLayout& exprs_results_row_layout) {
   // Initialize the number of expressions.
-  num_exprs_ = build_exprs.size();
+  num_exprs_ = exprs_results_row_layout.expr_values_offsets.size();
   // Compute the layout of evaluated values of a row.
-  expr_values_bytes_per_row_ = ScalarExpr::ComputeResultsLayout(build_exprs,
-      &expr_values_offsets_, &var_result_offset_);
+  expr_values_bytes_per_row_ = exprs_results_row_layout.expr_values_bytes_per_row;
+  expr_values_offsets_ = exprs_results_row_layout.expr_values_offsets;
+  var_result_offset_ = exprs_results_row_layout.var_results_begin_offset;
   if (expr_values_bytes_per_row_ == 0) {
     DCHECK_EQ(num_exprs_, 0);
     return Status::OK();
@@ -736,9 +793,12 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen, LlvmBuilder* builder,
 // Both the null and not null branch into the continue block.  The continue block
 // becomes the start of the next block for codegen (either the next expr or just the
 // end of the function).
-Status HashTableCtx::CodegenEvalRow(
-    LlvmCodeGen* codegen, bool build, llvm::Function** fn) {
-  const vector<ScalarExpr*>& exprs = build ? build_exprs_ : probe_exprs_;
+Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build_row,
+    const HashTableConfig& config, llvm::Function** fn) {
+  const std::vector<ScalarExpr*>& exprs =
+      build_row ? config.build_exprs : config.probe_exprs;
+  const ScalarExprsResultsRowLayout& result_row_layout =
+      config.build_exprs_results_row_layout;
   for (int i = 0; i < exprs.size(); ++i) {
     // Disable codegen for CHAR
     if (exprs[i]->type().type == TYPE_CHAR) {
@@ -749,8 +809,8 @@ Status HashTableCtx::CodegenEvalRow(
   // Get types to generate function prototype
   llvm::PointerType* this_ptr_type = codegen->GetStructPtrType<HashTableCtx>();
   llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
-  LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : "EvalProbeRow",
-      codegen->bool_type());
+  LlvmCodeGen::FnPrototype prototype(
+      codegen, build_row ? "EvalBuildRow" : "EvalProbeRow", codegen->bool_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_values", codegen->ptr_type()));
@@ -769,20 +829,21 @@ Status HashTableCtx::CodegenEvalRow(
 
   // evaluator_vector = build_expr_evals_.data() / probe_expr_evals_.data()
   llvm::Value* eval_vector = codegen->CodegenCallFunction(&builder,
-      build ? IRFunction::HASH_TABLE_GET_BUILD_EXPR_EVALUATORS :
-              IRFunction::HASH_TABLE_GET_PROBE_EXPR_EVALUATORS,
+      build_row ? IRFunction::HASH_TABLE_GET_BUILD_EXPR_EVALUATORS :
+                  IRFunction::HASH_TABLE_GET_PROBE_EXPR_EVALUATORS,
       this_ptr, "eval_vector");
 
+  DCHECK_EQ(exprs.size(), result_row_layout.expr_values_offsets.size());
   for (int i = 0; i < exprs.size(); ++i) {
     // TODO: refactor this to somewhere else?  This is not hash table specific except for
     // the null handling bit and would be used for anyone that needs to materialize a
     // vector of exprs
     // Convert result buffer to llvm ptr type
-    int offset = expr_values_cache_.expr_values_offsets(i);
+    int offset = result_row_layout.expr_values_offsets[i];
     llvm::Value* loc = builder.CreateInBoundsGEP(
         NULL, expr_values, codegen->GetI32Constant(offset), "loc_addr");
-    llvm::Value* llvm_loc = builder.CreatePointerCast(loc,
-        codegen->GetSlotPtrType(exprs[i]->type()), "loc");
+    llvm::Value* llvm_loc =
+        builder.CreatePointerCast(loc, codegen->GetSlotPtrType(exprs[i]->type()), "loc");
 
     llvm::BasicBlock* null_block = llvm::BasicBlock::Create(context, "null", *fn);
     llvm::BasicBlock* not_null_block = llvm::BasicBlock::Create(context, "not_null", *fn);
@@ -816,7 +877,7 @@ Status HashTableCtx::CodegenEvalRow(
 
     // Null block
     builder.SetInsertPoint(null_block);
-    if (!stores_nulls_) {
+    if (!config.stores_nulls) {
       // hash table doesn't store nulls, no reason to keep evaluating exprs
       builder.CreateRet(codegen->true_value());
     } else {
@@ -834,7 +895,7 @@ Status HashTableCtx::CodegenEvalRow(
 
     // Continue block
     builder.SetInsertPoint(continue_block);
-    if (stores_nulls_) {
+    if (config.stores_nulls) {
       // Update has_null
       llvm::PHINode* is_null_phi =
           builder.CreatePHI(codegen->bool_type(), 2, "is_null_phi");
@@ -892,11 +953,14 @@ Status HashTableCtx::CodegenEvalRow(
 //   %hash_phi = phi i32 [ %string_hash, %not_null ], [ %str_null, %null ]
 //   ret i32 %hash_phi
 // }
-Status HashTableCtx::CodegenHashRow(
-    LlvmCodeGen* codegen, bool use_murmur, llvm::Function** fn) {
-  for (int i = 0; i < build_exprs_.size(); ++i) {
+Status HashTableCtx::CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur,
+      const HashTableConfig& config, llvm::Function** fn) {
+  const std::vector<ScalarExpr*>& exprs = config.build_exprs;
+  const ScalarExprsResultsRowLayout& result_row_layout =
+      config.build_exprs_results_row_layout;
+  for (int i = 0; i < exprs.size(); ++i) {
     // Disable codegen for CHAR
-    if (build_exprs_[i]->type().type == TYPE_CHAR) {
+    if (exprs[i]->type().type == TYPE_CHAR) {
       return Status("HashTableCtx::CodegenHashRow(): CHAR NYI");
     }
   }
@@ -924,8 +988,8 @@ Status HashTableCtx::CodegenHashRow(
       &builder, IRFunction::HASH_TABLE_GET_HASH_SEED, this_arg, "seed");
 
   llvm::Value* hash_result = seed;
-  const int var_result_offset = expr_values_cache_.var_result_offset();
-  const int expr_values_bytes_per_row = expr_values_cache_.expr_values_bytes_per_row();
+  const int var_result_offset = result_row_layout.var_results_begin_offset;
+  const int expr_values_bytes_per_row = result_row_layout.expr_values_bytes_per_row;
   if (var_result_offset == -1) {
     // No variable length slots, just hash what is in 'expr_expr_values_cache_'
     if (expr_values_bytes_per_row > 0) {
@@ -947,9 +1011,9 @@ Status HashTableCtx::CodegenHashRow(
     }
 
     // Hash string slots
-    for (int i = 0; i < build_exprs_.size(); ++i) {
-      if (build_exprs_[i]->type().type != TYPE_STRING &&
-          build_exprs_[i]->type().type != TYPE_VARCHAR) {
+    for (int i = 0; i < exprs.size(); ++i) {
+      if (exprs[i]->type().type != TYPE_STRING &&
+          exprs[i]->type().type != TYPE_VARCHAR) {
         continue;
       }
 
@@ -958,13 +1022,13 @@ Status HashTableCtx::CodegenHashRow(
       llvm::BasicBlock* continue_block = NULL;
       llvm::Value* str_null_result = NULL;
 
-      int offset = expr_values_cache_.expr_values_offsets(i);
+      int offset = result_row_layout.expr_values_offsets[i];
       llvm::Value* llvm_loc = builder.CreateInBoundsGEP(
           NULL, expr_values, codegen->GetI32Constant(offset), "loc_addr");
 
       // If the hash table stores nulls, we need to check if the stringval
       // evaluated to NULL
-      if (stores_nulls_) {
+      if (config.stores_nulls) {
         null_block = llvm::BasicBlock::Create(context, "null", *fn);
         not_null_block = llvm::BasicBlock::Create(context, "not_null", *fn);
         continue_block = llvm::BasicBlock::Create(context, "continue", *fn);
@@ -1005,7 +1069,7 @@ Status HashTableCtx::CodegenHashRow(
       llvm::Value* string_hash_result = builder.CreateCall(general_hash_fn,
           llvm::ArrayRef<llvm::Value*>({ptr, len, hash_result}), "string_hash");
 
-      if (stores_nulls_) {
+      if (config.stores_nulls) {
         builder.CreateBr(continue_block);
         builder.SetInsertPoint(continue_block);
         // Use phi node to reconcile that we could have come from the string-null
@@ -1024,7 +1088,7 @@ Status HashTableCtx::CodegenHashRow(
   builder.CreateRet(hash_result);
 
   // Avoid inlining into caller if there are many exprs.
-  if (build_exprs_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
+  if (exprs.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
     codegen->SetNoInline(*fn);
   }
   *fn = codegen->FinalizeFunction(*fn);
@@ -1101,11 +1165,12 @@ Status HashTableCtx::CodegenHashRow(
 //        %"struct.impala_udf::StringVal"* %8, %"struct.impala::StringValue"* %row_val8)
 //   br i1 %cmp_raw10, label %continue3, label %false_block
 // }
-Status HashTableCtx::CodegenEquals(
-    LlvmCodeGen* codegen, bool inclusive_equality, llvm::Function** fn) {
-  for (int i = 0; i < build_exprs_.size(); ++i) {
+Status HashTableCtx::CodegenEquals(LlvmCodeGen* codegen, bool inclusive_equality,
+    const HashTableConfig& config, llvm::Function** fn) {
+  const std::vector<ScalarExpr*>& exprs = config.build_exprs;
+  for (int i = 0; i < exprs.size(); ++i) {
     // Disable codegen for CHAR
-    if (build_exprs_[i]->type().type == TYPE_CHAR) {
+    if (exprs[i]->type().type == TYPE_CHAR) {
       return Status("HashTableCtx::CodegenEquals(): CHAR NYI");
     }
   }
@@ -1135,20 +1200,20 @@ Status HashTableCtx::CodegenEquals(
       IRFunction::HASH_TABLE_GET_BUILD_EXPR_EVALUATORS, this_ptr, "eval_vector");
 
   llvm::BasicBlock* false_block = llvm::BasicBlock::Create(context, "false_block", *fn);
-  for (int i = 0; i < build_exprs_.size(); ++i) {
+  for (int i = 0; i < exprs.size(); ++i) {
     llvm::BasicBlock* null_block = llvm::BasicBlock::Create(context, "null", *fn);
     llvm::BasicBlock* not_null_block = llvm::BasicBlock::Create(context, "not_null", *fn);
     llvm::BasicBlock* continue_block = llvm::BasicBlock::Create(context, "continue", *fn);
 
     // call GetValue on build_exprs[i]
     llvm::Function* expr_fn;
-    Status status = build_exprs_[i]->GetCodegendComputeFn(codegen, false, &expr_fn);
+    Status status = exprs[i]->GetCodegendComputeFn(codegen, false, &expr_fn);
     if (!status.ok()) {
       *fn = NULL;
       return Status(
           Substitute("Problem with HashTableCtx::CodegenEquals: $0", status.GetDetail()));
     }
-    if (build_exprs_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
+    if (exprs.size() > LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
       // Avoid bloating function by inlining too many exprs into it.
       codegen->SetNoInline(expr_fn);
     }
@@ -1156,8 +1221,8 @@ Status HashTableCtx::CodegenEquals(
     // Load ScalarExprEvaluator*: eval = eval_vector[i];
     llvm::Value* eval_arg = codegen->CodegenArrayAt(&builder, eval_vector, i, "eval");
     // Evaluate the expression.
-    CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
-        build_exprs_[i]->type(), expr_fn, {eval_arg, row}, "result");
+    CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(
+        codegen, &builder, exprs[i]->type(), expr_fn, {eval_arg, row}, "result");
     llvm::Value* is_null = result.GetIsNull();
 
     // Determine if row is null (i.e. expr_values_null[i] == true). In
@@ -1166,7 +1231,7 @@ Status HashTableCtx::CodegenEquals(
 
     // We consider null values equal if we are comparing build rows or if the join
     // predicate is <=>
-    if (inclusive_equality || finds_nulls_[i]) {
+    if (inclusive_equality || config.finds_nulls[i]) {
       llvm::Value* llvm_null_byte_loc = builder.CreateInBoundsGEP(
           NULL, expr_values_null, codegen->GetI32Constant(i), "null_byte_loc");
       llvm::Value* null_byte = builder.CreateLoad(llvm_null_byte_loc);
@@ -1175,11 +1240,11 @@ Status HashTableCtx::CodegenEquals(
     if (inclusive_equality) result.ConvertToCanonicalForm();
 
     // Get llvm value for row_val from 'expr_values'
-    int offset = expr_values_cache_.expr_values_offsets(i);
+    int offset = config.build_exprs_results_row_layout.expr_values_offsets[i];
     llvm::Value* loc = builder.CreateInBoundsGEP(
         NULL, expr_values, codegen->GetI32Constant(offset), "loc");
     llvm::Value* row_val = builder.CreatePointerCast(
-        loc, codegen->GetSlotPtrType(build_exprs_[i]->type()), "row_val");
+        loc, codegen->GetSlotPtrType(exprs[i]->type()), "row_val");
 
     // Branch for GetValue() returning NULL
     builder.CreateCondBr(is_null, null_block, not_null_block);
@@ -1190,7 +1255,7 @@ Status HashTableCtx::CodegenEquals(
 
     // Not-null block
     builder.SetInsertPoint(not_null_block);
-    if (stores_nulls_) {
+    if (config.stores_nulls) {
       llvm::BasicBlock* cmp_block = llvm::BasicBlock::Create(context, "cmp", *fn);
       // First need to compare that row expr[i] is not null
       builder.CreateCondBr(row_is_null, false_block, cmp_block);
@@ -1208,7 +1273,7 @@ Status HashTableCtx::CodegenEquals(
   builder.CreateRet(codegen->false_value());
 
   // Avoid inlining into caller if it is large.
-  if (build_exprs_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
+  if (exprs.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
     codegen->SetNoInline(*fn);
   }
   *fn = codegen->FinalizeFunction(*fn);
@@ -1220,12 +1285,12 @@ Status HashTableCtx::CodegenEquals(
 }
 
 Status HashTableCtx::ReplaceHashTableConstants(LlvmCodeGen* codegen,
-    bool stores_duplicates, int num_build_tuples, llvm::Function* fn,
-    HashTableReplacedConstants* replacement_counts) {
+    const HashTableConfig& config, bool stores_duplicates, int num_build_tuples,
+    llvm::Function* fn, HashTableReplacedConstants* replacement_counts) {
   replacement_counts->stores_nulls = codegen->ReplaceCallSitesWithBoolConst(
-      fn, stores_nulls(), "stores_nulls");
+      fn, config.stores_nulls, "stores_nulls");
   replacement_counts->finds_some_nulls = codegen->ReplaceCallSitesWithBoolConst(
-      fn, finds_some_nulls(), "finds_some_nulls");
+      fn, config.finds_some_nulls, "finds_some_nulls");
   replacement_counts->stores_tuples = codegen->ReplaceCallSitesWithBoolConst(
       fn, num_build_tuples == 1, "stores_tuples");
   replacement_counts->stores_duplicates = codegen->ReplaceCallSitesWithBoolConst(
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 03dd850..2d33a65 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -27,6 +27,7 @@
 #include "codegen/impala-ir.h"
 #include "common/compiler-util.h"
 #include "common/logging.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/buffered-tuple-stream.h"
 #include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/bufferpool/buffer-pool.h"
@@ -52,6 +53,7 @@ class Tuple;
 class TupleRow;
 class HashTable;
 struct HashTableStatsProfile;
+struct ScalarExprsResultsRowLayout;
 
 /// Linear or quadratic probing hash table implementation tailored to the usage pattern
 /// for partitioned hash aggregation and hash joins. The hash table stores TupleRows and
@@ -113,6 +115,35 @@ struct HashTableStatsProfile;
 /// TODO: Batched interface for inserts and finds.
 /// TODO: as an optimization, compute variable-length data size for the agg node.
 
+/// Collection of variables required to create instances of HashTableCtx and to codegen
+/// hash table methods.
+struct HashTableConfig {
+  HashTableConfig() = delete;
+  HashTableConfig(const std::vector<ScalarExpr*>& build_exprs,
+      const std::vector<ScalarExpr*>& probe_exprs, const bool stores_nulls,
+      const std::vector<bool>& finds_nulls);
+
+  /// The exprs used to evaluate rows for inserting rows into hash table.
+  /// Also used when matching hash table entries against probe rows. Not Owned.
+  const std::vector<ScalarExpr*>& build_exprs;
+
+  /// The exprs used to evaluate rows for look-up in the hash table. Not Owned.
+  const std::vector<ScalarExpr*>& probe_exprs;
+
+  /// If false, TupleRows with nulls are ignored during Insert
+  const bool stores_nulls;
+
+  /// if finds_nulls[i] is false, FindProbeRow() return BUCKET_NOT_FOUND for TupleRows
+  /// with nulls in position i even if stores_nulls is true.
+  const std::vector<bool> finds_nulls;
+
+  /// finds_some_nulls_ is just the logical OR of finds_nulls_.
+  const bool finds_some_nulls;
+
+  /// The memory efficient layout for storing the results of evaluating build expressions.
+  const ScalarExprsResultsRowLayout build_exprs_results_row_layout;
+};
+
 /// Control block for a hash table. This class contains the logic as well as the variables
 /// needed by a thread to operate on a hash table.
 class HashTableCtx {
@@ -129,6 +160,11 @@ class HashTableCtx {
       int num_build_tuples, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
       MemPool* probe_expr_results_pool, boost::scoped_ptr<HashTableCtx>* ht_ctx);
 
+  static Status Create(ObjectPool* pool, RuntimeState* state,
+      const HashTableConfig& config, int32_t initial_seed, int max_levels,
+      int num_build_tuples, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+      MemPool* probe_expr_results_pool, boost::scoped_ptr<HashTableCtx>* ht_ctx);
+
   /// Initialize the build and probe expression evaluators.
   Status Open(RuntimeState* state);
 
@@ -179,20 +215,22 @@ class HashTableCtx {
   /// Codegen for evaluating a tuple row. Codegen'd function matches the signature
   /// for EvalBuildRow and EvalTupleRow.
   /// If build_row is true, the codegen uses the build_exprs, otherwise the probe_exprs.
-  Status CodegenEvalRow(LlvmCodeGen* codegen, bool build_row, llvm::Function** fn);
+  static Status CodegenEvalRow(LlvmCodeGen* codegen, bool build_row,
+      const HashTableConfig& config, llvm::Function** fn);
 
   /// Codegen for evaluating a TupleRow and comparing equality. Function signature
   /// matches HashTable::Equals(). 'inclusive_equality' is true if the generated
   /// equality function should treat all NULLs as equal and all NaNs as equal.
   /// See the template parameter to HashTable::Equals().
-  Status CodegenEquals(LlvmCodeGen* codegen, bool inclusive_equality,
-      llvm::Function** fn);
+  static Status CodegenEquals(LlvmCodeGen* codegen, bool inclusive_equality,
+      const HashTableConfig& config, llvm::Function** fn);
 
   /// Codegen for hashing expr values. Function prototype matches HashRow identically.
   /// Unlike HashRow(), the returned function only uses a single hash function, rather
   /// than switching based on level_. If 'use_murmur' is true, murmur hash is used,
   /// otherwise CRC is used if the hardware supports it (see hash-util.h).
-  Status CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur, llvm::Function** fn);
+  static Status CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur,
+      const HashTableConfig& config, llvm::Function** fn);
 
   /// Struct that returns the number of constants replaced by ReplaceConstants().
   struct HashTableReplacedConstants {
@@ -206,9 +244,9 @@ class HashTableCtx {
   /// Replace hash table parameters with constants in 'fn'. Updates 'replacement_counts'
   /// with the number of replacements made. 'num_build_tuples' and 'stores_duplicates'
   /// correspond to HashTable parameters with the same name.
-  Status ReplaceHashTableConstants(LlvmCodeGen* codegen, bool stores_duplicates,
-      int num_build_tuples, llvm::Function* fn,
-      HashTableReplacedConstants* replacement_counts);
+  static Status ReplaceHashTableConstants(LlvmCodeGen* codegen,
+      const HashTableConfig& config, bool stores_duplicates, int num_build_tuples,
+      llvm::Function* fn, HashTableReplacedConstants* replacement_counts);
 
   static const char* LLVM_CLASS_NAME;
 
@@ -251,7 +289,7 @@ class HashTableCtx {
     /// if memory allocation leads to the memory limits of the exec node to be exceeded.
     /// 'tracker' is the memory tracker of the exec node which owns this HashTableCtx.
     Status Init(RuntimeState* state, MemTracker* tracker,
-        const std::vector<ScalarExpr*>& build_exprs);
+        const ScalarExprsResultsRowLayout& exprs_results_row_layout);
 
     /// Frees up various resources and updates memory tracker with proper accounting.
     /// 'tracker' should be the same memory tracker which was passed in for Init().
@@ -404,8 +442,8 @@ class HashTableCtx {
   ///  - build_exprs are the exprs that should be used to evaluate rows during Insert().
   ///  - probe_exprs are used during FindProbeRow()
   ///  - stores_nulls: if false, TupleRows with nulls are ignored during Insert
-  ///  - finds_nulls: if finds_nulls[i] is false, FindProbeRow() returns End() for
-  ///        TupleRows with nulls in position i even if stores_nulls is true.
+  ///  - finds_nulls: if finds_nulls[i] is false, FindProbeRow() returns BUCKET_NOT_FOUND
+  ///        for TupleRows with nulls in position i even if stores_nulls is true.
   ///  - initial_seed: initial seed value to use when computing hashes for rows with
   ///        level 0. Other levels have their seeds derived from this seed.
   ///  - max_levels: the max lhashevels we will hash with.
@@ -426,10 +464,16 @@ class HashTableCtx {
   ///       with '<=>' and others with '=', stores_nulls could distinguish between columns
   ///       in which nulls are stored and columns in which they are not, which could save
   ///       space by not storing some rows we know will never match.
+  /// TODO: remove this constructor once all client classes switch to using
+  ///       HashTableConfig to create instances of this class.
   HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
       const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
-      const std::vector<bool>& finds_nulls, int32_t initial_seed,
-      int max_levels, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+      const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
+      MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+      MemPool* probe_expr_results_pool);
+
+  HashTableCtx(const HashTableConfig& config, int32_t initial_seed, int max_levels,
+      MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
       MemPool* probe_expr_results_pool);
 
   /// Allocate various buffers for storing expression evaluation results, hash values,
@@ -505,6 +549,7 @@ class HashTableCtx {
   /// The exprs used to evaluate rows for inserting rows into hash table.
   /// Also used when matching hash table entries against probe rows.
   const std::vector<ScalarExpr*>& build_exprs_;
+  const ScalarExprsResultsRowLayout build_exprs_results_row_layout_;
   std::vector<ScalarExprEvaluator*> build_expr_evals_;
 
   /// The exprs used to evaluate rows for look-up in the hash table.
diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc
index e2e8c05..a2c4b0a 100644
--- a/be/src/exec/non-grouping-aggregator.cc
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -35,10 +35,23 @@
 
 namespace impala {
 
+NonGroupingAggregatorConfig::NonGroupingAggregatorConfig(
+    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx)
+  : AggregatorConfig(taggregator, state, pnode, agg_idx) {}
+
+Status NonGroupingAggregatorConfig::Codegen(RuntimeState* state) {
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != nullptr);
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  return CodegenAddBatchImpl(codegen, prefetch_mode);
+}
+
 NonGroupingAggregator::NonGroupingAggregator(
-    ExecNode* exec_node, ObjectPool* pool, const AggregatorConfig& config, int agg_idx)
-  : Aggregator(exec_node, pool, config, Substitute("NonGroupingAggregator $0", agg_idx),
-        agg_idx) {}
+    ExecNode* exec_node, ObjectPool* pool, const NonGroupingAggregatorConfig& config)
+  : Aggregator(
+        exec_node, pool, config, Substitute("NonGroupingAggregator $0", config.agg_idx_)),
+    agg_config(config),
+    add_batch_impl_fn_(config.add_batch_impl_fn_) {}
 
 Status NonGroupingAggregator::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(Aggregator::Prepare(state));
@@ -47,10 +60,9 @@ Status NonGroupingAggregator::Prepare(RuntimeState* state) {
 }
 
 void NonGroupingAggregator::Codegen(RuntimeState* state) {
-  LlvmCodeGen* codegen = state->codegen();
-  DCHECK(codegen != nullptr);
-  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-  Status codegen_status = CodegenAddBatchImpl(codegen, prefetch_mode);
+  // TODO: This const cast will be removed once codegen call is moved before FIS creation
+  Status codegen_status =
+      const_cast<NonGroupingAggregatorConfig&>(agg_config).Codegen(state);
   runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
 }
 
@@ -155,7 +167,7 @@ void NonGroupingAggregator::DebugString(int indentation_level, stringstream* out
   *out << ")";
 }
 
-Status NonGroupingAggregator::CodegenAddBatchImpl(
+Status NonGroupingAggregatorConfig::CodegenAddBatchImpl(
     LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
   llvm::Function* update_tuple_fn;
   RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h
index bad0263..3ffda63 100644
--- a/be/src/exec/non-grouping-aggregator.h
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -31,19 +31,44 @@ class AggregationPlanNode;
 class DescriptorTbl;
 class ExecNode;
 class LlvmCodeGen;
+class NonGroupingAggregator;
 class ObjectPool;
 class RowBatch;
 class RuntimeState;
 class TAggregator;
 class Tuple;
 
+class NonGroupingAggregatorConfig : public AggregatorConfig {
+ public:
+  NonGroupingAggregatorConfig(const TAggregator& taggregator, RuntimeState* state,
+      PlanNode* pnode, int agg_idx);
+  Status Codegen(RuntimeState* state) override;
+  ~NonGroupingAggregatorConfig() override {}
+
+  typedef Status (*AddBatchImplFn)(NonGroupingAggregator*, RowBatch*);
+  /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
+  AddBatchImplFn add_batch_impl_fn_ = nullptr;
+
+ protected:
+  int GetNumGroupingExprs() override { return 0; }
+
+ private:
+  /// Codegen the non-streaming add row batch loop in NonGroupingAggregator::AddBatch()
+  /// (Assuming AGGREGATED_ROWS = false). The loop has already been compiled to IR and
+  /// loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR. This
+  /// function will modify the loop subsituting the statically compiled functions with
+  /// codegen'd ones. 'add_batch_impl_fn_' will be updated with the codegened function.
+  Status CodegenAddBatchImpl(
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
+};
+
 /// Aggregator for doing non-grouping aggregations. Input is passed to the aggregator
 /// through AddBatch(), which generates the single output row. This Aggregator does
 /// not support streaming preaggregation.
 class NonGroupingAggregator : public Aggregator {
  public:
   NonGroupingAggregator(
-      ExecNode* exec_node, ObjectPool* pool, const AggregatorConfig& config, int agg_idx);
+      ExecNode* exec_node, ObjectPool* pool, const NonGroupingAggregatorConfig& config);
 
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
@@ -73,6 +98,10 @@ class NonGroupingAggregator : public Aggregator {
   virtual void DebugString(int indentation_level, std::stringstream* out) const override;
 
  private:
+  /// TODO: Remove reference once codegen is performed before FIS creation.
+  /// Reference to the config object and only used to call Codegen().
+  const NonGroupingAggregatorConfig& agg_config;
+
   /// MemPool used to allocate memory for 'singleton_output_tuple_'. The ownership of the
   /// pool's memory is transferred to the output batch on eos. The pool should not be
   /// Reset() to allow amortizing memory allocation over a series of
@@ -81,7 +110,7 @@ class NonGroupingAggregator : public Aggregator {
 
   typedef Status (*AddBatchImplFn)(NonGroupingAggregator*, RowBatch*);
   /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
-  AddBatchImplFn add_batch_impl_fn_ = nullptr;
+  const AddBatchImplFn& add_batch_impl_fn_;
 
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
@@ -105,15 +134,6 @@ class NonGroupingAggregator : public Aggregator {
 
   /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
   void GetSingletonOutput(RowBatch* row_batch);
-
-  /// Codegen the non-streaming add row batch loop. The loop has already been compiled to
-  /// IR and loaded into the codegen object. UpdateAggTuple has also been codegen'd to IR.
-  /// This function will modify the loop subsituting the statically compiled functions
-  /// with codegen'd ones. 'add_batch_impl_fn_' will be updated with the codegened
-  /// function.
-  /// Assumes AGGREGATED_ROWS = false.
-  Status CodegenAddBatchImpl(
-      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
 };
 } // namespace impala
 
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index c6752d6..f9ce844 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -980,15 +980,21 @@ void PhjBuilder::Codegen(LlvmCodeGen* codegen) {
   Status insert_codegen_status;
   Status codegen_status;
 
+  // Context required to generate hash table codegened methods.
+  HashTableConfig hash_table_config(
+      build_exprs_, build_exprs_, HashTableStoresNulls(), is_not_distinct_from_);
   // Codegen for hashing rows with the builder's hash table context.
   llvm::Function* hash_fn;
-  codegen_status = ht_ctx_->CodegenHashRow(codegen, false, &hash_fn);
+  codegen_status =
+      HashTableCtx::CodegenHashRow(codegen, false, hash_table_config, &hash_fn);
   llvm::Function* murmur_hash_fn;
-  codegen_status.MergeStatus(ht_ctx_->CodegenHashRow(codegen, true, &murmur_hash_fn));
+  codegen_status.MergeStatus(
+      HashTableCtx::CodegenHashRow(codegen, true, hash_table_config, &murmur_hash_fn));
 
   // Codegen for evaluating build rows
   llvm::Function* eval_build_row_fn;
-  codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(codegen, true, &eval_build_row_fn));
+  codegen_status.MergeStatus(
+      HashTableCtx::CodegenEvalRow(codegen, true, hash_table_config, &eval_build_row_fn));
 
   llvm::Function* insert_filters_fn;
   codegen_status.MergeStatus(
@@ -996,17 +1002,17 @@ void PhjBuilder::Codegen(LlvmCodeGen* codegen) {
 
   if (codegen_status.ok()) {
     TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
-    build_codegen_status = CodegenProcessBuildBatch(
-        codegen, hash_fn, murmur_hash_fn, eval_build_row_fn, insert_filters_fn);
-    insert_codegen_status = CodegenInsertBatch(codegen, hash_fn, murmur_hash_fn,
-        eval_build_row_fn, prefetch_mode);
+    build_codegen_status = CodegenProcessBuildBatch(codegen, hash_table_config, hash_fn,
+        murmur_hash_fn, eval_build_row_fn, insert_filters_fn);
+    insert_codegen_status = CodegenInsertBatch(codegen, hash_table_config, hash_fn,
+        murmur_hash_fn, eval_build_row_fn, prefetch_mode);
   } else {
     build_codegen_status = codegen_status;
     insert_codegen_status = codegen_status;
   }
   profile()->AddCodegenMsg(build_codegen_status.ok(), build_codegen_status, "Build Side");
-  profile()->AddCodegenMsg(insert_codegen_status.ok(), insert_codegen_status,
-      "Hash Table Construction");
+  profile()->AddCodegenMsg(
+      insert_codegen_status.ok(), insert_codegen_status, "Hash Table Construction");
 }
 
 string PhjBuilder::DebugString() const {
@@ -1027,7 +1033,8 @@ string PhjBuilder::DebugString() const {
   return ss.str();
 }
 
-Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
+Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
+    HashTableConfig& hash_table_config, llvm::Function* hash_fn,
     llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
     llvm::Function* insert_filters_fn) {
   llvm::Function* process_build_batch_fn =
@@ -1043,12 +1050,12 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function
       process_build_batch_fn, insert_filters_fn, "InsertRuntimeFilters");
   DCHECK_REPLACE_COUNT(replaced, 1);
 
-  // Replace some hash table parameters with constants.
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
   const int num_build_tuples = row_desc_->tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates,
-      num_build_tuples, process_build_batch_fn, &replaced_constants));
+  // Replace some hash table parameters with constants.
+  RETURN_IF_ERROR(HashTableCtx::ReplaceHashTableConstants(codegen, hash_table_config,
+      stores_duplicates, num_build_tuples, process_build_batch_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
   DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
   DCHECK_EQ(replaced_constants.stores_duplicates, 0);
@@ -1108,13 +1115,16 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function
   return Status::OK();
 }
 
-Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
+Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen,
+    HashTableConfig& hash_table_config, llvm::Function* hash_fn,
     llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
     TPrefetchMode::type prefetch_mode) {
   llvm::Function* insert_batch_fn =
       codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true);
+  // Context required to generate hash table codegened methods.
   llvm::Function* build_equals_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn));
+  RETURN_IF_ERROR(
+      HashTableCtx::CodegenEquals(codegen, true, hash_table_config, &build_equals_fn));
 
   // Replace the parameter 'prefetch_mode' with constant.
   llvm::Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
@@ -1134,8 +1144,8 @@ Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
   const int num_build_tuples = row_desc_->tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates,
-      num_build_tuples, insert_batch_fn, &replaced_constants));
+  RETURN_IF_ERROR(HashTableCtx::ReplaceHashTableConstants(codegen, hash_table_config,
+      stores_duplicates, num_build_tuples, insert_batch_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
   DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
   DCHECK_GE(replaced_constants.stores_duplicates, 1);
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index fc8d075..9e3f27b 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -550,15 +550,16 @@ class PhjBuilder : public DataSink {
 
   /// Codegen processing build batches. Identical signature to ProcessBuildBatch().
   /// Returns non-OK status if codegen was not possible.
-  Status CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
+  Status CodegenProcessBuildBatch(LlvmCodeGen* codegen,
+      HashTableConfig& hash_table_config, llvm::Function* hash_fn,
       llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
       llvm::Function* insert_filters_fn) WARN_UNUSED_RESULT;
 
   /// Codegen inserting batches into a partition's hash table. Identical signature to
   /// Partition::InsertBatch(). Returns non-OK if codegen was not possible.
-  Status CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
-      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
-      TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
+  Status CodegenInsertBatch(LlvmCodeGen* codegen, HashTableConfig& hash_table_config,
+      llvm::Function* hash_fn, llvm::Function* murmur_hash_fn,
+      llvm::Function* eval_row_fn, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
 
   /// Codegen inserting rows into runtime filters. Identical signature to
   /// InsertRuntimeFilters(). Returns non-OK if codegen was not possible.
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 32b4cd5..4a4bed6 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -1359,8 +1359,14 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
   // Codegen for hashing rows
   llvm::Function* hash_fn;
   llvm::Function* murmur_hash_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, false, &hash_fn));
-  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, true, &murmur_hash_fn));
+  // Context required to generate hash table codegened methods.
+  HashTableConfig hash_table_config(build_exprs_, probe_exprs_,
+      builder_->HashTableStoresNulls(), builder_->is_not_distinct_from());
+
+  RETURN_IF_ERROR(
+      HashTableCtx::CodegenHashRow(codegen, false, hash_table_config, &hash_fn));
+  RETURN_IF_ERROR(
+      HashTableCtx::CodegenHashRow(codegen, true, hash_table_config, &murmur_hash_fn));
 
   // Get cross compiled function
   IRFunction::Type ir_fn = IRFunction::FN_END;
@@ -1412,11 +1418,13 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
 
   // Codegen HashTable::Equals
   llvm::Function* probe_equals_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, false, &probe_equals_fn));
+  RETURN_IF_ERROR(
+      HashTableCtx::CodegenEquals(codegen, false, hash_table_config, &probe_equals_fn));
 
   // Codegen for evaluating probe rows
   llvm::Function* eval_row_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_row_fn));
+  RETURN_IF_ERROR(
+      HashTableCtx::CodegenEvalRow(codegen, false, hash_table_config, &eval_row_fn));
 
   // Codegen CreateOutputRow
   llvm::Function* create_output_row_fn;
@@ -1478,8 +1486,8 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
   const int num_build_tuples = child(1)->row_desc()->tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates,
-      num_build_tuples, process_probe_batch_fn, &replaced_constants));
+  RETURN_IF_ERROR(HashTableCtx::ReplaceHashTableConstants(codegen, hash_table_config,
+      stores_duplicates, num_build_tuples, process_probe_batch_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
   DCHECK_GE(replaced_constants.finds_some_nulls, 1);
   DCHECK_GE(replaced_constants.stores_duplicates, 1);
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 69dfb50..b8bef30 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -8618,21 +8618,19 @@ TEST_P(ExprTest, ConditionalFunctionIsNotFalse) {
   TestError("isnotfalse(-9999999999999999999999999999999999999.9)");
 }
 
-// Validates that Expr::ComputeResultsLayout() for 'exprs' is correct.
+// Validates that the constructor ScalarExprsResultsRowLayout() for 'exprs' is correct.
 //   - expected_byte_size: total byte size to store all results for exprs
 //   - expected_var_begin: byte offset where variable length types begin
 //   - expected_offsets: mapping of byte sizes to a set valid offsets
 //     exprs that have the same byte size can end up in a number of locations
 void ValidateLayout(const vector<ScalarExpr*>& exprs, int expected_byte_size,
     int expected_var_begin, const map<int, set<int>>& expected_offsets) {
-  vector<int> offsets;
   set<int> offsets_found;
 
-  int var_begin;
-  int byte_size = ScalarExpr::ComputeResultsLayout(exprs, &offsets, &var_begin);
+  ScalarExprsResultsRowLayout row_layout(exprs);
 
-  EXPECT_EQ(expected_byte_size, byte_size);
-  EXPECT_EQ(expected_var_begin, var_begin);
+  EXPECT_EQ(expected_byte_size, row_layout.expr_values_bytes_per_row);
+  EXPECT_EQ(expected_var_begin, row_layout.var_results_begin_offset);
 
   // Walk the computed offsets and make sure the resulting sets match expected_offsets
   for (int i = 0; i < exprs.size(); ++i) {
@@ -8641,7 +8639,7 @@ void ValidateLayout(const vector<ScalarExpr*>& exprs, int expected_byte_size,
     EXPECT_TRUE(iter != expected_offsets.end());
 
     const set<int>& possible_offsets = iter->second;
-    int computed_offset = offsets[i];
+    int computed_offset = row_layout.expr_values_offsets[i];
     // The computed offset has to be one of the possible.  Exprs types with the
     // same size are not ordered wrt each other.
     EXPECT_TRUE(possible_offsets.find(computed_offset) != possible_offsets.end());
diff --git a/be/src/exprs/scalar-expr.cc b/be/src/exprs/scalar-expr.cc
index 54b4d76..b634f9e 100644
--- a/be/src/exprs/scalar-expr.cc
+++ b/be/src/exprs/scalar-expr.cc
@@ -245,14 +245,14 @@ struct MemLayoutData {
   }
 };
 
-int ScalarExpr::ComputeResultsLayout(const vector<ScalarExpr*>& exprs,
-    vector<int>* offsets, int* var_result_begin) {
+ScalarExprsResultsRowLayout::ScalarExprsResultsRowLayout(
+    const vector<ScalarExpr*>& exprs) {
   if (exprs.size() == 0) {
-    *var_result_begin = -1;
-    return 0;
+    var_results_begin_offset = -1;
+    expr_values_bytes_per_row = 0;
+    return;
   }
 
-
   vector<MemLayoutData> data;
   data.resize(exprs.size());
 
@@ -263,26 +263,23 @@ int ScalarExpr::ComputeResultsLayout(const vector<ScalarExpr*>& exprs,
     data[i].byte_size = exprs[i]->type().GetSlotSize();
     DCHECK_GT(data[i].byte_size, 0);
     data[i].variable_length = exprs[i]->type().IsVarLenStringType();
-
   }
 
   sort(data.begin(), data.end());
 
-  int byte_offset = 0;
-  offsets->resize(exprs.size());
-  *var_result_begin = -1;
+  expr_values_bytes_per_row = 0;
+  expr_values_offsets.resize(exprs.size());
+  var_results_begin_offset = -1;
 
   for (int i = 0; i < data.size(); ++i) {
-
-    (*offsets)[data[i].expr_idx] = byte_offset;
-    if (data[i].variable_length && *var_result_begin == -1) {
-      *var_result_begin = byte_offset;
+    expr_values_offsets[data[i].expr_idx] = expr_values_bytes_per_row;
+    if (data[i].variable_length && var_results_begin_offset == -1) {
+      var_results_begin_offset = expr_values_bytes_per_row;
     }
-    DCHECK(!(i == 0 && byte_offset > 0)) << "first value should be at start of layout";
-    byte_offset += data[i].byte_size;
+    DCHECK(!(i == 0 && expr_values_bytes_per_row > 0))
+        << "first value should be at start of layout";
+    expr_values_bytes_per_row += data[i].byte_size;
   }
-
-  return byte_offset;
 }
 
 Status ScalarExpr::Init(
diff --git a/be/src/exprs/scalar-expr.h b/be/src/exprs/scalar-expr.h
index 0da79d0..fceb83d 100644
--- a/be/src/exprs/scalar-expr.h
+++ b/be/src/exprs/scalar-expr.h
@@ -70,6 +70,27 @@ class TExprNode;
 class Tuple;
 class TupleRow;
 
+/// Describes the memory efficient layout for storing the results of evaluating a list
+/// of scalar expressions.
+/// The constructor computes a memory efficient layout for storing the results of
+/// evaluating 'exprs'. The results are assumed to be void* slot types (vs AnyVal types).
+/// Varlen data is not included (e.g. there will be space for a StringValue, but not the
+/// data referenced by it). Variable length types are guaranteed to be at the end.
+struct ScalarExprsResultsRowLayout {
+  ScalarExprsResultsRowLayout() = delete;
+  ScalarExprsResultsRowLayout(const vector<ScalarExpr*>& exprs);
+
+  /// The number of bytes necessary to store all the results.
+  int expr_values_bytes_per_row;
+
+  /// Maps from expression index to the byte offset into the row of expression values.
+  std::vector<int> expr_values_offsets;
+
+  /// Byte offset from where the variable length results for a row begins. If -1, there
+  /// are no variable length slots.
+  int var_results_begin_offset;
+};
+
 /// --- ScalarExpr overview
 ///
 /// ScalarExpr is an expression which returns a value for each input tuple row.
@@ -182,20 +203,6 @@ class ScalarExpr : public Expr {
   static std::string DebugString(const std::vector<ScalarExpr*>& exprs);
   std::string DebugString(const std::string& expr_name) const;
 
-  /// Computes a memory efficient layout for storing the results of evaluating 'exprs'.
-  /// The results are assumed to be void* slot types (vs AnyVal types). Varlen data is
-  /// not included (e.g. there will be space for a StringValue, but not the data
-  /// referenced by it).
-  ///
-  /// Returns the number of bytes necessary to store all the results and offsets
-  /// where the result for each expr should be stored.
-  ///
-  /// Variable length types are guaranteed to be at the end and 'var_result_begin'
-  /// will be set the beginning byte offset where variable length results begin.
-  /// 'var_result_begin' will be set to -1 if there are no variable len types.
-  static int ComputeResultsLayout(const vector<ScalarExpr*>& exprs, vector<int>* offsets,
-      int* var_result_begin);
-
   /// Releases cache entries to libCache for all nodes in the ScalarExpr tree.
   virtual void Close();