You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/25 20:15:49 UTC

[02/33] incubator-impala git commit: IMPALA-4080, IMPALA-3638: Introduce ExecNode::Codegen()

IMPALA-4080, IMPALA-3638: Introduce ExecNode::Codegen()

This patch is mostly mechanical move of codegen related logic
from each exec node's Prepare() to its Codegen() function.
After this change, code generation will no longer happen in
Prepare(). Instead, it will happen after Prepare() completes in
PlanFragmentExecutor. This is an intermediate step towards the
final goal of sharing compiled code among fragment instances in
multi-threading.

As part of the clean up, this change also removes the logic for
lazy codegen object creation. In other words, if codegen is enabled,
the codegen object will always be created. This simplifies some
of the logic in ScalarFnCall::Prepare() and various Codegen()
functions by reducing error checking needed. This change also
removes the logic added for tackling IMPALA-1755 as it's not
needed anymore after the clean up.

The clean up also rectifies a not so well documented situation.
Previously, even if a user explicitly sets DISABLE_CODEGEN to true,
we may still codegen a UDF if it was written in LLVM IR or if it
has more than 8 arguments. This patch enforces the query option
by failing the query in both cases. To run the query, the user
must enable codegen. This change also extends the number of
arguments supported in the interpretation path of ScalarFn to 20.

Change-Id: I207566bc9f4c6a159271ecdbc4bbdba3d78c6651
Reviewed-on: http://gerrit.cloudera.org:8080/4651
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b15d992a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b15d992a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b15d992a

Branch: refs/heads/hadoop-next
Commit: b15d992abe09bc841f6e2112d47099eb15f8454f
Parents: ee2a06d
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Oct 5 20:05:24 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Oct 19 08:18:37 2016 +0000

----------------------------------------------------------------------
 be/src/exec/aggregation-node.cc                 |  61 +++--
 be/src/exec/aggregation-node.h                  |  11 +-
 be/src/exec/exchange-node.cc                    |   5 +-
 be/src/exec/exchange-node.h                     |   4 +
 be/src/exec/exec-node.cc                        |  21 +-
 be/src/exec/exec-node.h                         |  13 +-
 be/src/exec/hash-join-node.cc                   |  51 ++--
 be/src/exec/hash-join-node.h                    |   6 +-
 be/src/exec/hash-table.cc                       |  22 +-
 be/src/exec/hash-table.h                        |   8 +-
 be/src/exec/hdfs-avro-scanner.cc                |  16 +-
 be/src/exec/hdfs-avro-scanner.h                 |   2 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  10 +-
 be/src/exec/hdfs-scan-node-base.cc              |  28 +--
 be/src/exec/hdfs-scan-node-base.h               |   1 +
 be/src/exec/hdfs-scanner.cc                     |   2 +-
 be/src/exec/hdfs-sequence-scanner.cc            |   8 +-
 be/src/exec/hdfs-text-scanner.cc                |   8 +-
 be/src/exec/old-hash-table.cc                   |  30 +--
 be/src/exec/old-hash-table.h                    |   6 +-
 be/src/exec/partitioned-aggregation-node.cc     |  76 +++---
 be/src/exec/partitioned-aggregation-node.h      |  13 +-
 be/src/exec/partitioned-hash-join-builder.cc    |  83 +++----
 be/src/exec/partitioned-hash-join-builder.h     |  19 +-
 be/src/exec/partitioned-hash-join-node.cc       |  56 +++--
 be/src/exec/partitioned-hash-join-node.h        |   4 +-
 be/src/exec/sort-node.cc                        |  23 +-
 be/src/exec/sort-node.h                         |   4 +
 be/src/exec/topn-node.cc                        |  94 +++----
 be/src/exec/topn-node.h                         |   4 +-
 be/src/exprs/case-expr.cc                       |   6 +-
 be/src/exprs/case-expr.h                        |   2 +-
 be/src/exprs/compound-predicates.cc             |  37 ++-
 be/src/exprs/compound-predicates.h              |  10 +-
 be/src/exprs/conditional-functions.cc           |   4 +-
 be/src/exprs/conditional-functions.h            |   8 +-
 be/src/exprs/expr.cc                            |  14 +-
 be/src/exprs/expr.h                             |   8 +-
 be/src/exprs/hive-udf-call.cc                   |   4 +-
 be/src/exprs/hive-udf-call.h                    |   2 +-
 be/src/exprs/is-not-empty-predicate.cc          |   4 +-
 be/src/exprs/is-not-empty-predicate.h           |   2 +-
 be/src/exprs/literal.cc                         |   4 +-
 be/src/exprs/literal.h                          |   2 +-
 be/src/exprs/null-literal.cc                    |   4 +-
 be/src/exprs/null-literal.h                     |   2 +-
 be/src/exprs/scalar-fn-call.cc                  | 246 +++++++------------
 be/src/exprs/scalar-fn-call.h                   |   4 +-
 be/src/exprs/slot-ref.cc                        |   5 +-
 be/src/exprs/slot-ref.h                         |   2 +-
 be/src/exprs/tuple-is-null-predicate.cc         |   6 +-
 be/src/exprs/tuple-is-null-predicate.h          |   2 +-
 be/src/runtime/plan-fragment-executor.cc        |  16 +-
 be/src/runtime/runtime-state.cc                 |   8 -
 be/src/runtime/runtime-state.h                  |  27 +-
 be/src/runtime/sorted-run-merger.h              |   2 +-
 be/src/runtime/sorter.cc                        |   2 +-
 be/src/runtime/sorter.h                         |   2 +-
 be/src/runtime/tuple.cc                         |   6 +-
 be/src/runtime/tuple.h                          |   2 +-
 be/src/service/fe-support.cc                    |  36 ++-
 be/src/service/query-options.cc                 |   1 +
 be/src/testutil/test-udfs.cc                    |  29 +++
 be/src/util/tuple-row-compare.cc                |  13 +-
 be/src/util/tuple-row-compare.h                 |   2 +-
 common/thrift/PlanNodes.thrift                  |   7 +-
 .../org/apache/impala/planner/HdfsScanNode.java |  40 ---
 .../queries/QueryTest/udf-errors.test           |  48 ++++
 .../functional-query/queries/QueryTest/udf.test |  14 ++
 tests/query_test/test_udfs.py                   |  18 ++
 70 files changed, 647 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 909d42b..67928ca 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -165,25 +165,29 @@ Status AggregationNode::Prepare(RuntimeState* state) {
     hash_tbl_->Insert(singleton_intermediate_tuple_);
     output_iterator_ = hash_tbl_->Begin();
   }
+  if (!state->codegen_enabled()) {
+    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
+  }
+  return Status::OK();
+}
 
+void AggregationNode::Codegen(RuntimeState* state) {
+  DCHECK(state->codegen_enabled());
   bool codegen_enabled = false;
-  if (state->codegen_enabled()) {
-    LlvmCodeGen* codegen;
-    RETURN_IF_ERROR(state->GetCodegen(&codegen));
-    Function* update_tuple_fn = CodegenUpdateTuple(state);
-    if (update_tuple_fn != NULL) {
-      codegen_process_row_batch_fn_ =
-          CodegenProcessRowBatch(state, update_tuple_fn);
-      if (codegen_process_row_batch_fn_ != NULL) {
-        // Update to using codegen'd process row batch.
-        codegen->AddFunctionToJit(codegen_process_row_batch_fn_,
-            reinterpret_cast<void**>(&process_row_batch_fn_));
-        codegen_enabled = true;
-      }
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != NULL);
+  Function* update_tuple_fn = CodegenUpdateTuple(codegen);
+  if (update_tuple_fn != NULL) {
+    codegen_process_row_batch_fn_ = CodegenProcessRowBatch(codegen, update_tuple_fn);
+    if (codegen_process_row_batch_fn_ != NULL) {
+      // Update to using codegen'd process row batch.
+      codegen->AddFunctionToJit(codegen_process_row_batch_fn_,
+          reinterpret_cast<void**>(&process_row_batch_fn_));
+      codegen_enabled = true;
     }
   }
   runtime_profile()->AddCodegenMsg(codegen_enabled);
-  return Status::OK();
+  ExecNode::Codegen(state);
 }
 
 Status AggregationNode::Open(RuntimeState* state) {
@@ -525,11 +529,8 @@ IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) {
 // ret:                                              ; preds = %src_not_null, %entry
 //   ret void
 // }
-llvm::Function* AggregationNode::CodegenUpdateSlot(
-    RuntimeState* state, AggFnEvaluator* evaluator, SlotDescriptor* slot_desc) {
-  LlvmCodeGen* codegen;
-  if (!state->GetCodegen(&codegen).ok()) return NULL;
-
+llvm::Function* AggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
+    AggFnEvaluator* evaluator, SlotDescriptor* slot_desc) {
   // TODO: Fix this DCHECK and Init() once CodegenUpdateSlot() can handle AggFnEvaluator
   // with multiple input expressions (e.g. group_concat).
   DCHECK_EQ(evaluator->input_expr_ctxs().size(), 1);
@@ -538,7 +539,7 @@ llvm::Function* AggregationNode::CodegenUpdateSlot(
   // TODO: implement timestamp
   if (input_expr->type().type == TYPE_TIMESTAMP) return NULL;
   Function* agg_expr_fn;
-  Status status = input_expr->GetCodegendComputeFn(state, &agg_expr_fn);
+  Status status = input_expr->GetCodegendComputeFn(codegen, &agg_expr_fn);
   if (!status.ok()) {
     VLOG_QUERY << "Could not codegen UpdateSlot(): " << status.GetDetail();
     return NULL;
@@ -715,9 +716,7 @@ llvm::Function* AggregationNode::CodegenUpdateSlot(
 //                           %"class.impala::TupleRow"* %tuple_row)
 //   ret void
 // }
-Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) {
-  LlvmCodeGen* codegen;
-  if (!state->GetCodegen(&codegen).ok()) return NULL;
+Function* AggregationNode::CodegenUpdateTuple(LlvmCodeGen* codegen) {
   SCOPED_TIMER(codegen->codegen_timer());
 
   int j = probe_expr_ctxs_.size();
@@ -805,7 +804,7 @@ Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) {
       Value* count_inc = builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
       builder.CreateStore(count_inc, slot_ptr);
     } else {
-      Function* update_slot_fn = CodegenUpdateSlot(state, evaluator, slot_desc);
+      Function* update_slot_fn = CodegenUpdateSlot(codegen, evaluator, slot_desc);
       if (update_slot_fn == NULL) return NULL;
       // Call GetAggFnCtx() to get the function context.
       Value* get_fn_ctx_args[] = { this_arg, codegen->GetIntConstant(TYPE_INT, i) };
@@ -824,10 +823,8 @@ Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) {
   return codegen->FinalizeFunction(fn);
 }
 
-Function* AggregationNode::CodegenProcessRowBatch(
-    RuntimeState* state, Function* update_tuple_fn) {
-  LlvmCodeGen* codegen;
-  if (!state->GetCodegen(&codegen).ok()) return NULL;
+Function* AggregationNode::CodegenProcessRowBatch(LlvmCodeGen* codegen,
+    Function* update_tuple_fn) {
   SCOPED_TIMER(codegen->codegen_timer());
   DCHECK(update_tuple_fn != NULL);
 
@@ -847,19 +844,19 @@ Function* AggregationNode::CodegenProcessRowBatch(
     // Aggregation w/o grouping does not use a hash table.
 
     // Codegen for hash
-    Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(state);
+    Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
     if (hash_fn == NULL) return NULL;
 
     // Codegen HashTable::Equals
-    Function* equals_fn = hash_tbl_->CodegenEquals(state);
+    Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
     if (equals_fn == NULL) return NULL;
 
     // Codegen for evaluating build rows
-    Function* eval_build_row_fn = hash_tbl_->CodegenEvalTupleRow(state, true);
+    Function* eval_build_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
     if (eval_build_row_fn == NULL) return NULL;
 
     // Codegen for evaluating probe rows
-    Function* eval_probe_row_fn = hash_tbl_->CodegenEvalTupleRow(state, false);
+    Function* eval_probe_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
     if (eval_probe_row_fn == NULL) return NULL;
 
     // Replace call sites

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h
index 5d87d82..6f4867b 100644
--- a/be/src/exec/aggregation-node.h
+++ b/be/src/exec/aggregation-node.h
@@ -55,6 +55,7 @@ class AggregationNode : public ExecNode {
 
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
+  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state);
@@ -159,16 +160,16 @@ class AggregationNode : public ExecNode {
   /// IR and loaded into the codegen object.  UpdateAggTuple has also been
   /// codegen'd to IR.  This function will modify the loop subsituting the
   /// UpdateAggTuple function call with the (inlined) codegen'd 'update_tuple_fn'.
-  llvm::Function* CodegenProcessRowBatch(
-      RuntimeState* state, llvm::Function* update_tuple_fn);
+  llvm::Function* CodegenProcessRowBatch(LlvmCodeGen* codegen,
+      llvm::Function* update_tuple_fn);
 
   /// Codegen for updating aggregate_exprs at slot_idx. Returns NULL if unsuccessful.
   /// slot_idx is the idx into aggregate_exprs_ (does not include grouping exprs).
-  llvm::Function* CodegenUpdateSlot(
-      RuntimeState* state, AggFnEvaluator* evaluator, SlotDescriptor* slot_desc);
+  llvm::Function* CodegenUpdateSlot(LlvmCodeGen* codegen,
+      AggFnEvaluator* evaluator, SlotDescriptor* slot_desc);
 
   /// Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful.
-  llvm::Function* CodegenUpdateTuple(RuntimeState* state);
+  llvm::Function* CodegenUpdateTuple(LlvmCodeGen* codegen);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 833949b..fac2ff5 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -83,6 +83,8 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
     RETURN_IF_ERROR(sort_exec_exprs_.Prepare(
         state, row_descriptor_, row_descriptor_, expr_mem_tracker()));
     AddExprCtxsToFree(sort_exec_exprs_);
+    less_than_.reset(
+        new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_));
   }
   return Status::OK();
 }
@@ -92,10 +94,9 @@ Status ExchangeNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Open(state));
   if (is_merging_) {
     RETURN_IF_ERROR(sort_exec_exprs_.Open(state));
-    TupleRowComparator less_than(sort_exec_exprs_, is_asc_order_, nulls_first_);
     // CreateMerger() will populate its merging heap with batches from the stream_recvr_,
     // so it is not necessary to call FillInputRowBatch().
-    RETURN_IF_ERROR(stream_recvr_->CreateMerger(less_than));
+    RETURN_IF_ERROR(stream_recvr_->CreateMerger(*less_than_.get()));
   } else {
     RETURN_IF_ERROR(FillInputRowBatch(state));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/exchange-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index 16e9137..f6face4 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -27,6 +27,7 @@ namespace impala {
 
 class RowBatch;
 class DataStreamRecvr;
+class TupleRowComparator;
 
 /// Receiver node for data streams. The data stream receiver is created in Prepare()
 /// and closed in Close().
@@ -94,6 +95,9 @@ class ExchangeNode : public ExecNode {
   /// underlying stream_recvr_, and input_batch_ is not used/valid.
   bool is_merging_;
 
+  /// The TupleRowComparator based on 'sort_exec_exprs_' for merging exchange.
+  boost::scoped_ptr<TupleRowComparator> less_than_;
+
   /// Sort expressions and parameters passed to the merging receiver..
   SortExecExprs sort_exec_exprs_;
   std::vector<bool> is_asc_order_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index df491dd..510a7d9 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -158,13 +158,20 @@ Status ExecNode::Prepare(RuntimeState* state) {
 
   RETURN_IF_ERROR(Expr::Prepare(conjunct_ctxs_, state, row_desc(), expr_mem_tracker()));
   AddExprCtxsToFree(conjunct_ctxs_);
-
   for (int i = 0; i < children_.size(); ++i) {
     RETURN_IF_ERROR(children_[i]->Prepare(state));
   }
   return Status::OK();
 }
 
+void ExecNode::Codegen(RuntimeState* state) {
+  DCHECK(state->codegen_enabled());
+  DCHECK(state->codegen() != NULL);
+  for (int i = 0; i < children_.size(); ++i) {
+    children_[i]->Codegen(state);
+  }
+}
+
 Status ExecNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::OPEN, state));
   return Expr::Open(conjunct_ctxs_, state);
@@ -272,12 +279,6 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
             || state->query_options().num_scanner_threads == 1);
         *node = pool->Add(new HdfsScanNode(pool, tnode, descs));
       }
-      // If true, this node requests codegen over interpretation for conjuncts
-      // evaluation whenever possible. Turn codegen on for expr evaluation for
-      // the entire fragment.
-      if (tnode.hdfs_scan_node.codegen_conjuncts) state->SetCodegenExpr();
-      (*node)->runtime_profile()->AddCodegenMsg(
-          state->ShouldCodegenExpr(), "", "Expr Evaluation");
       break;
     case TPlanNodeType::HBASE_SCAN_NODE:
       *node = pool->Add(new HBaseScanNode(pool, tnode, descs));
@@ -492,15 +493,13 @@ void ExecNode::AddExprCtxsToFree(const SortExecExprs& sort_exec_exprs) {
 // false:                                            ; preds = %continue, %entry
 //   ret i1 false
 // }
-Status ExecNode::CodegenEvalConjuncts(RuntimeState* state,
+Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen,
     const vector<ExprContext*>& conjunct_ctxs, Function** fn, const char* name) {
   Function* conjunct_fns[conjunct_ctxs.size()];
   for (int i = 0; i < conjunct_ctxs.size(); ++i) {
     RETURN_IF_ERROR(
-        conjunct_ctxs[i]->root()->GetCodegendComputeFn(state, &conjunct_fns[i]));
+        conjunct_ctxs[i]->root()->GetCodegendComputeFn(codegen, &conjunct_fns[i]));
   }
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
 
   // Construct function signature to match
   // bool EvalConjuncts(Expr** exprs, int num_exprs, TupleRow* row)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 9283a8b..4f3f3fc 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -63,12 +63,17 @@ class ExecNode {
   /// Sets up internal structures, etc., without doing any actual work.
   /// Must be called prior to Open(). Will only be called once in this
   /// node's lifetime.
-  /// All code generation (adding functions to the LlvmCodeGen object) must happen
-  /// in Prepare().  Retrieving the jit compiled function pointer must happen in
-  /// Open().
   /// If overridden in subclass, must first call superclass's Prepare().
   virtual Status Prepare(RuntimeState* state);
 
+  /// Recursively calls Codegen() on all children.
+  /// Expected to be overriden in subclass to generate LLVM IR functions and register
+  /// them with the LlvmCodeGen object. The function pointers of the compiled IR functions
+  /// will be set up in PlanFragmentExecutor::Open(). If overridden in subclass, must also
+  /// call superclass's Codegen() before or after the code generation for this exec node.
+  /// Will only be called once in the node's lifetime.
+  virtual void Codegen(RuntimeState* state);
+
   /// Performs any preparatory work prior to calling GetNext().
   /// Caller must not be holding any io buffers. This will cause deadlock.
   /// If overridden in subclass, must first call superclass's Open().
@@ -146,7 +151,7 @@ class ExecNode {
   /// Codegen EvalConjuncts(). Returns a non-OK status if the function couldn't be
   /// codegen'd. The codegen'd version uses inlined, codegen'd GetBooleanVal() functions.
   static Status CodegenEvalConjuncts(
-      RuntimeState* state, const std::vector<ExprContext*>& conjunct_ctxs,
+      LlvmCodeGen* codegen, const std::vector<ExprContext*>& conjunct_ctxs,
       llvm::Function** fn, const char* name = "EvalConjuncts");
 
   /// Returns a string representation in DFS order of the plan rooted at this.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 9106159..f2ac928 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -143,19 +143,24 @@ Status HashJoinNode::Prepare(RuntimeState* state) {
           child(1)->row_desc().tuple_descriptors().size(), stores_nulls,
           is_not_distinct_from_, state->fragment_hash_seed(), mem_tracker(), filters_));
   build_pool_.reset(new MemPool(mem_tracker()));
+  if (!state->codegen_enabled()) {
+    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
+  }
+  return Status::OK();
+}
 
+void HashJoinNode::Codegen(RuntimeState* state) {
+  DCHECK(state->codegen_enabled());
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != NULL);
   bool build_codegen_enabled = false;
   bool probe_codegen_enabled = false;
-  if (state->codegen_enabled()) {
-    LlvmCodeGen* codegen;
-    RETURN_IF_ERROR(state->GetCodegen(&codegen));
-
-    // Codegen for hashing rows
-    Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(state);
-    if (hash_fn == NULL) return Status::OK();
 
+  // Codegen for hashing rows
+  Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
+  if (hash_fn != NULL) {
     // Codegen for build path
-    codegen_process_build_batch_fn_ = CodegenProcessBuildBatch(state, hash_fn);
+    codegen_process_build_batch_fn_ = CodegenProcessBuildBatch(codegen, hash_fn);
     if (codegen_process_build_batch_fn_ != NULL) {
       codegen->AddFunctionToJit(codegen_process_build_batch_fn_,
           reinterpret_cast<void**>(&process_build_batch_fn_));
@@ -164,7 +169,8 @@ Status HashJoinNode::Prepare(RuntimeState* state) {
 
     // Codegen for probe path (only for left joins)
     if (!match_all_build_) {
-      Function* codegen_process_probe_batch_fn = CodegenProcessProbeBatch(state, hash_fn);
+      Function* codegen_process_probe_batch_fn =
+          CodegenProcessProbeBatch(codegen, hash_fn);
       if (codegen_process_probe_batch_fn != NULL) {
         codegen->AddFunctionToJit(codegen_process_probe_batch_fn,
             reinterpret_cast<void**>(&process_probe_batch_fn_));
@@ -174,7 +180,7 @@ Status HashJoinNode::Prepare(RuntimeState* state) {
   }
   runtime_profile()->AddCodegenMsg(build_codegen_enabled, "", "Build Side");
   runtime_profile()->AddCodegenMsg(probe_codegen_enabled, "", "Probe Side");
-  return Status::OK();
+  ExecNode::Codegen(state);
 }
 
 Status HashJoinNode::Reset(RuntimeState* state) {
@@ -591,18 +597,15 @@ Function* HashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* codegen) {
   return codegen->FinalizeFunction(fn);
 }
 
-Function* HashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
+Function* HashJoinNode::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
     Function* hash_fn) {
-  LlvmCodeGen* codegen;
-  if (!state->GetCodegen(&codegen).ok()) return NULL;
-
   // Get cross compiled function
   Function* process_build_batch_fn =
       codegen->GetFunction(IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH, true);
   DCHECK(process_build_batch_fn != NULL);
 
   // Codegen for evaluating build rows
-  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(state, true);
+  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
   if (eval_row_fn == NULL) return NULL;
 
   int replaced = codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn,
@@ -615,36 +618,34 @@ Function* HashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
   return codegen->FinalizeFunction(process_build_batch_fn);
 }
 
-Function* HashJoinNode::CodegenProcessProbeBatch(RuntimeState* state, Function* hash_fn) {
-  LlvmCodeGen* codegen;
-  if (!state->GetCodegen(&codegen).ok()) return NULL;
-
+Function* HashJoinNode::CodegenProcessProbeBatch(LlvmCodeGen* codegen,
+    Function* hash_fn) {
   // Get cross compiled function
   Function* process_probe_batch_fn =
       codegen->GetFunction(IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH, true);
   DCHECK(process_probe_batch_fn != NULL);
 
-  // Codegen HashTable::Equals
-  Function* equals_fn = hash_tbl_->CodegenEquals(state);
+  // Codegen HashTable::Equals()
+  Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
   if (equals_fn == NULL) return NULL;
 
   // Codegen for evaluating build rows
-  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(state, false);
+  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
   if (eval_row_fn == NULL) return NULL;
 
-  // Codegen CreateOutputRow
+  // Codegen CreateOutputRow()
   Function* create_output_row_fn = CodegenCreateOutputRow(codegen);
   if (create_output_row_fn == NULL) return NULL;
 
   // Codegen evaluating other join conjuncts
   Function* eval_other_conjuncts_fn;
-  Status status = ExecNode::CodegenEvalConjuncts(state, other_join_conjunct_ctxs_,
+  Status status = ExecNode::CodegenEvalConjuncts(codegen, other_join_conjunct_ctxs_,
       &eval_other_conjuncts_fn, "EvalOtherConjuncts");
   if (!status.ok()) return NULL;
 
   // Codegen evaluating conjuncts
   Function* eval_conjuncts_fn;
-  status = ExecNode::CodegenEvalConjuncts(state, conjunct_ctxs_, &eval_conjuncts_fn);
+  status = ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs_, &eval_conjuncts_fn);
   if (!status.ok()) return NULL;
 
   // Replace all call sites with codegen version

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.h b/be/src/exec/hash-join-node.h
index 2e5ca6e..e65dc16 100644
--- a/be/src/exec/hash-join-node.h
+++ b/be/src/exec/hash-join-node.h
@@ -54,6 +54,7 @@ class HashJoinNode : public BlockingJoinNode {
 
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
+  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state);
@@ -63,7 +64,6 @@ class HashJoinNode : public BlockingJoinNode {
 
  protected:
   virtual void AddToDebugString(int indentation_level, std::stringstream* out) const;
-
   virtual Status ProcessBuildInput(RuntimeState* state);
 
  private:
@@ -144,13 +144,13 @@ class HashJoinNode : public BlockingJoinNode {
   /// hash_fn is the codegen'd function for computing hashes over tuple rows in the
   /// hash table.
   /// Returns NULL if codegen was not possible.
-  llvm::Function* CodegenProcessBuildBatch(RuntimeState* state, llvm::Function* hash_fn);
+  llvm::Function* CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn);
 
   /// Codegen processing probe batches.  Identical signature to ProcessProbeBatch.
   /// hash_fn is the codegen'd function for computing hashes over tuple rows in the
   /// hash table.
   /// Returns NULL if codegen was not possible.
-  llvm::Function* CodegenProcessProbeBatch(RuntimeState* state, llvm::Function* hash_fn);
+  llvm::Function* CodegenProcessProbeBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 6626b33..c39e9e9 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -684,7 +684,7 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen,
 // Both the null and not null branch into the continue block.  The continue block
 // becomes the start of the next block for codegen (either the next expr or just the
 // end of the function).
-Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function** fn) {
+Status HashTableCtx::CodegenEvalRow(LlvmCodeGen* codegen, bool build, Function** fn) {
   // TODO: CodegenAssignNullValue() can't handle TYPE_TIMESTAMP or TYPE_DECIMAL yet
   const vector<ExprContext*>& ctxs = build ? build_expr_ctxs_ : probe_expr_ctxs_;
   for (int i = 0; i < ctxs.size(); ++i) {
@@ -695,9 +695,6 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
     }
   }
 
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
-
   // Get types to generate function prototype
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
@@ -746,7 +743,7 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
 
     // Call expr
     Function* expr_fn;
-    Status status = ctxs[i]->root()->GetCodegendComputeFn(state, &expr_fn);
+    Status status = ctxs[i]->root()->GetCodegendComputeFn(codegen, &expr_fn);
     if (!status.ok()) {
       (*fn)->eraseFromParent(); // deletes function
       *fn = NULL;
@@ -837,7 +834,7 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
 //   %hash_phi = phi i32 [ %string_hash, %not_null ], [ %str_null, %null ]
 //   ret i32 %hash_phi
 // }
-Status HashTableCtx::CodegenHashRow(RuntimeState* state, bool use_murmur, Function** fn) {
+Status HashTableCtx::CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur, Function** fn) {
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     // Disable codegen for CHAR
     if (build_expr_ctxs_[i]->root()->type().type == TYPE_CHAR) {
@@ -845,9 +842,6 @@ Status HashTableCtx::CodegenHashRow(RuntimeState* state, bool use_murmur, Functi
     }
   }
 
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
-
   // Get types to generate function prototype
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
@@ -1044,7 +1038,7 @@ Status HashTableCtx::CodegenHashRow(RuntimeState* state, bool use_murmur, Functi
 //        %"struct.impala_udf::StringVal"* %8, %"struct.impala::StringValue"* %row_val8)
 //   br i1 %cmp_raw10, label %continue3, label %false_block
 // }
-Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality,
+Status HashTableCtx::CodegenEquals(LlvmCodeGen* codegen, bool force_null_equality,
     Function** fn) {
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     // Disable codegen for CHAR
@@ -1053,8 +1047,6 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
     }
   }
 
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
   // Get types to generate function prototype
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
@@ -1091,7 +1083,7 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
 
     // call GetValue on build_exprs[i]
     Function* expr_fn;
-    Status status = build_expr_ctxs_[i]->root()->GetCodegendComputeFn(state, &expr_fn);
+    Status status = build_expr_ctxs_[i]->root()->GetCodegendComputeFn(codegen, &expr_fn);
     if (!status.ok()) {
       (*fn)->eraseFromParent(); // deletes function
       *fn = NULL;
@@ -1164,11 +1156,9 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
   return Status::OK();
 }
 
-Status HashTableCtx::ReplaceHashTableConstants(RuntimeState* state,
+Status HashTableCtx::ReplaceHashTableConstants(LlvmCodeGen* codegen,
     bool stores_duplicates, int num_build_tuples, Function* fn,
     HashTableReplacedConstants* replacement_counts) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
 
   replacement_counts->stores_nulls = codegen->ReplaceCallSitesWithBoolConst(
       fn, stores_nulls(), "stores_nulls");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 4edd130..404b294 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -177,20 +177,20 @@ class HashTableCtx {
   /// Codegen for evaluating a tuple row. Codegen'd function matches the signature
   /// for EvalBuildRow and EvalTupleRow.
   /// If build_row is true, the codegen uses the build_exprs, otherwise the probe_exprs.
-  Status CodegenEvalRow(RuntimeState* state, bool build_row, llvm::Function** fn);
+  Status CodegenEvalRow(LlvmCodeGen* codegen, bool build_row, llvm::Function** fn);
 
   /// Codegen for evaluating a TupleRow and comparing equality. Function signature
   /// matches HashTable::Equals(). 'force_null_equality' is true if the generated
   /// equality function should treat all NULLs as equal. See the template parameter
   /// to HashTable::Equals().
-  Status CodegenEquals(RuntimeState* state, bool force_null_equality,
+  Status CodegenEquals(LlvmCodeGen* codegen, bool force_null_equality,
       llvm::Function** fn);
 
   /// Codegen for hashing expr values. Function prototype matches HashRow identically.
   /// Unlike HashRow(), the returned function only uses a single hash function, rather
   /// than switching based on level_. If 'use_murmur' is true, murmur hash is used,
   /// otherwise CRC is used if the hardware supports it (see hash-util.h).
-  Status CodegenHashRow(RuntimeState* state, bool use_murmur, llvm::Function** fn);
+  Status CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur, llvm::Function** fn);
 
   /// Struct that returns the number of constants replaced by ReplaceConstants().
   struct HashTableReplacedConstants {
@@ -204,7 +204,7 @@ class HashTableCtx {
   /// Replace hash table parameters with constants in 'fn'. Updates 'replacement_counts'
   /// with the number of replacements made. 'num_build_tuples' and 'stores_duplicates'
   /// correspond to HashTable parameters with the same name.
-  Status ReplaceHashTableConstants(RuntimeState* state, bool stores_duplicates,
+  Status ReplaceHashTableConstants(LlvmCodeGen* codegen, bool stores_duplicates,
       int num_build_tuples, llvm::Function* fn,
       HashTableReplacedConstants* replacement_counts);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index f511b12..88d6d3a 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -80,15 +80,13 @@ Status HdfsAvroScanner::Open(ScannerContext* context) {
 Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** decode_avro_data_fn) {
   *decode_avro_data_fn = NULL;
-  if (!node->runtime_state()->codegen_enabled()) {
-    return Status("Disabled by query option.");
-  }
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen));
+  DCHECK(node->runtime_state()->codegen_enabled());
+  LlvmCodeGen* codegen = node->runtime_state()->codegen();
+  DCHECK(codegen != NULL);
   Function* materialize_tuple_fn;
   RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, &materialize_tuple_fn));
   DCHECK(materialize_tuple_fn != NULL);
-  RETURN_IF_ERROR(CodegenDecodeAvroData(node->runtime_state(), materialize_tuple_fn,
+  RETURN_IF_ERROR(CodegenDecodeAvroData(codegen, materialize_tuple_fn,
       conjunct_ctxs, decode_avro_data_fn));
   DCHECK(*decode_avro_data_fn != NULL);
   return Status::OK();
@@ -1016,11 +1014,9 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
   return Status::OK();
 }
 
-Status HdfsAvroScanner::CodegenDecodeAvroData(RuntimeState* state,
+Status HdfsAvroScanner::CodegenDecodeAvroData(LlvmCodeGen* codegen,
     Function* materialize_tuple_fn, const vector<ExprContext*>& conjunct_ctxs,
     Function** decode_avro_data_fn) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
   SCOPED_TIMER(codegen->codegen_timer());
   DCHECK(materialize_tuple_fn != NULL);
 
@@ -1030,7 +1026,7 @@ Status HdfsAvroScanner::CodegenDecodeAvroData(RuntimeState* state,
   DCHECK_EQ(replaced, 1);
 
   Function* eval_conjuncts_fn;
-  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(state, conjunct_ctxs,
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs,
       &eval_conjuncts_fn));
 
   replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index 274cf4d..595a733 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -197,7 +197,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted
   /// functions. Stores the resulting function in 'decode_avro_data_fn' if codegen was
   /// successful or returns an error.
-  static Status CodegenDecodeAvroData(RuntimeState* state,
+  static Status CodegenDecodeAvroData(LlvmCodeGen* codegen,
       llvm::Function* materialize_tuple_fn,
       const std::vector<ExprContext*>& conjunct_ctxs,
       llvm::Function** decode_avro_data_fn);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 86dbbf8..e91a7ec 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -620,13 +620,9 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
 
 Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** process_scratch_batch_fn) {
+  DCHECK(node->runtime_state()->codegen_enabled());
   *process_scratch_batch_fn = NULL;
-  if (!node->runtime_state()->codegen_enabled()) {
-    return Status("Disabled by query option.");
-  }
-
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen));
+  LlvmCodeGen* codegen = node->runtime_state()->codegen();
   DCHECK(codegen != NULL);
   SCOPED_TIMER(codegen->codegen_timer());
 
@@ -634,7 +630,7 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
   DCHECK(fn != NULL);
 
   Function* eval_conjuncts_fn;
-  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(node->runtime_state(), conjunct_ctxs,
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs,
       &eval_conjuncts_fn));
   DCHECK(eval_conjuncts_fn != NULL);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 4acf3f5..cf6708c 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -146,7 +146,6 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
   // Add row batch conjuncts
   DCHECK(conjuncts_map_[tuple_id_].empty());
   conjuncts_map_[tuple_id_] = conjunct_ctxs_;
-
   return Status::OK();
 }
 
@@ -293,10 +292,15 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
   PrintHdfsSplitStats(per_volume_stats, &str);
   runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
+  if (!state->codegen_enabled()) {
+    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
+  }
+  return Status::OK();
+}
 
+void HdfsScanNodeBase::Codegen(RuntimeState* state) {
   // Create codegen'd functions
-  for (int format = THdfsFileFormat::TEXT;
-       format <= THdfsFileFormat::PARQUET; ++format) {
+  for (int format = THdfsFileFormat::TEXT; format <= THdfsFileFormat::PARQUET; ++format) {
     vector<HdfsFileDesc*>& file_descs =
         per_type_files_[static_cast<THdfsFileFormat::type>(format)];
 
@@ -332,20 +336,16 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
         status = Status("Not implemented for this format.");
     }
     DCHECK(fn != NULL || !status.ok());
-
     const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(format)->second;
-    if (!status.ok()) {
-      runtime_profile()->AddCodegenMsg(false, status, format_name);
-    } else {
-      runtime_profile()->AddCodegenMsg(true, status, format_name);
-      LlvmCodeGen* codegen;
-      RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
-      codegen->AddFunctionToJit(
-          fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
+    if (status.ok()) {
+      LlvmCodeGen* codegen = state->codegen();
+      DCHECK(codegen != NULL);
+      codegen->AddFunctionToJit(fn,
+          &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
     }
+    runtime_profile()->AddCodegenMsg(status.ok(), status, format_name);
   }
-
-  return Status::OK();
+  ExecNode::Codegen(state);
 }
 
 Status HdfsScanNodeBase::Open(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 7ea4b9d..3531c9e 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -119,6 +119,7 @@ class HdfsScanNodeBase : public ScanNode {
 
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
+  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status Reset(RuntimeState* state);
   virtual void Close(RuntimeState* state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 81542ec..0b6e8c5 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -530,7 +530,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
       parse_block = BasicBlock::Create(context, "parse", fn, eval_fail_block);
       Function* conjunct_fn;
       Status status =
-          conjunct_ctxs[conjunct_idx]->root()->GetCodegendComputeFn(state, &conjunct_fn);
+          conjunct_ctxs[conjunct_idx]->root()->GetCodegendComputeFn(codegen, &conjunct_fn);
       if (!status.ok()) {
         stringstream ss;
         ss << "Failed to codegen conjunct: " << status.GetDetail();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 03a07ce..fd552be 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -55,11 +55,9 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
 Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = NULL;
-  if (!node->runtime_state()->codegen_enabled()) {
-    return Status("Disabled by query option.");
-  }
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen));
+  DCHECK(node->runtime_state()->codegen_enabled());
+  LlvmCodeGen* codegen = node->runtime_state()->codegen();
+  DCHECK(codegen != NULL);
   Function* write_complete_tuple_fn;
   RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs,
       &write_complete_tuple_fn));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index f3f0a7c..cc63408 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -696,11 +696,9 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
 Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = NULL;
-  if (!node->runtime_state()->codegen_enabled()) {
-    return Status("Disabled by query option.");
-  }
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen));
+  DCHECK(node->runtime_state()->codegen_enabled());
+  LlvmCodeGen* codegen = node->runtime_state()->codegen();
+  DCHECK(codegen != NULL);
   Function* write_complete_tuple_fn;
   RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs,
       &write_complete_tuple_fn));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/old-hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table.cc b/be/src/exec/old-hash-table.cc
index 0e38924..db89782 100644
--- a/be/src/exec/old-hash-table.cc
+++ b/be/src/exec/old-hash-table.cc
@@ -255,7 +255,7 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen,
 // Both the null and not null branch into the continue block.  The continue block
 // becomes the start of the next block for codegen (either the next expr or just the
 // end of the function).
-Function* OldHashTable::CodegenEvalTupleRow(RuntimeState* state, bool build) {
+Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) {
   // TODO: CodegenAssignNullValue() can't handle TYPE_TIMESTAMP or TYPE_DECIMAL yet
   const vector<ExprContext*>& ctxs = build ? build_expr_ctxs_ : probe_expr_ctxs_;
   for (int i = 0; i < ctxs.size(); ++i) {
@@ -263,9 +263,6 @@ Function* OldHashTable::CodegenEvalTupleRow(RuntimeState* state, bool build) {
     if (type == TYPE_TIMESTAMP || type == TYPE_DECIMAL || type == TYPE_CHAR) return NULL;
   }
 
-  LlvmCodeGen* codegen;
-  if (!state->GetCodegen(&codegen).ok()) return NULL;
-
   // Get types to generate function prototype
   Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
   DCHECK(tuple_row_type != NULL);
@@ -307,12 +304,10 @@ Function* OldHashTable::CodegenEvalTupleRow(RuntimeState* state, bool build) {
 
       // Call expr
       Function* expr_fn;
-      Status status = ctxs[i]->root()->GetCodegendComputeFn(state, &expr_fn);
+      Status status = ctxs[i]->root()->GetCodegendComputeFn(codegen, &expr_fn);
       if (!status.ok()) {
-        stringstream ss;
-        ss << "Problem with codegen: " << status.GetDetail();
-        state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
         fn->eraseFromParent(); // deletes function
+        VLOG_QUERY << "Failed to codegen EvalTupleRow(): " << status.GetDetail();
         return NULL;
       }
 
@@ -352,11 +347,9 @@ Function* OldHashTable::CodegenEvalTupleRow(RuntimeState* state, bool build) {
     }
   }
   builder.CreateRet(has_null);
-
   return codegen->FinalizeFunction(fn);
 }
 
-
 uint32_t OldHashTable::HashVariableLenRow() {
   uint32_t hash = initial_seed_;
   // Hash the non-var length portions (if there are any)
@@ -410,15 +403,12 @@ uint32_t OldHashTable::HashVariableLenRow() {
 //   ret i32 %7
 // }
 // TODO: can this be cross-compiled?
-Function* OldHashTable::CodegenHashCurrentRow(RuntimeState* state) {
+Function* OldHashTable::CodegenHashCurrentRow(LlvmCodeGen* codegen) {
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     // Disable codegen for CHAR
     if (build_expr_ctxs_[i]->root()->type().type == TYPE_CHAR) return NULL;
   }
 
-  LlvmCodeGen* codegen;
-  if (!state->GetCodegen(&codegen).ok()) return NULL;
-
   // Get types to generate function prototype
   Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
@@ -592,14 +582,12 @@ bool OldHashTable::Equals(TupleRow* build_row) {
 // continue3:                                        ; preds = %not_null2, %null1
 //   ret i1 true
 // }
-Function* OldHashTable::CodegenEquals(RuntimeState* state) {
+Function* OldHashTable::CodegenEquals(LlvmCodeGen* codegen) {
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     // Disable codegen for CHAR
     if (build_expr_ctxs_[i]->root()->type().type == TYPE_CHAR) return NULL;
   }
 
-  LlvmCodeGen* codegen;
-  if (!state->GetCodegen(&codegen).ok()) return NULL;
   // Get types to generate function prototype
   Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
   DCHECK(tuple_row_type != NULL);
@@ -629,12 +617,11 @@ Function* OldHashTable::CodegenEquals(RuntimeState* state) {
 
       // call GetValue on build_exprs[i]
       Function* expr_fn;
-      Status status = build_expr_ctxs_[i]->root()->GetCodegendComputeFn(state, &expr_fn);
+      Status status =
+          build_expr_ctxs_[i]->root()->GetCodegendComputeFn(codegen, &expr_fn);
       if (!status.ok()) {
-        stringstream ss;
-        ss << "Problem with codegen: " << status.GetDetail();
-        state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
         fn->eraseFromParent(); // deletes function
+        VLOG_QUERY << "Failed to codegen Equals(): " << status.GetDetail();
         return NULL;
       }
 
@@ -690,7 +677,6 @@ Function* OldHashTable::CodegenEquals(RuntimeState* state) {
   } else {
     builder.CreateRet(codegen->true_value());
   }
-
   return codegen->FinalizeFunction(fn);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/old-hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table.h b/be/src/exec/old-hash-table.h
index 5ef24ab..3a9b5b2 100644
--- a/be/src/exec/old-hash-table.h
+++ b/be/src/exec/old-hash-table.h
@@ -233,15 +233,15 @@ class OldHashTable {
   /// Codegen for evaluating a tuple row.  Codegen'd function matches the signature
   /// for EvalBuildRow and EvalTupleRow.
   /// if build_row is true, the codegen uses the build_exprs, otherwise the probe_exprs
-  llvm::Function* CodegenEvalTupleRow(RuntimeState* state, bool build_row);
+  llvm::Function* CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build_row);
 
   /// Codegen for hashing the expr values in 'expr_values_buffer_'.  Function
   /// prototype matches HashCurrentRow identically.
-  llvm::Function* CodegenHashCurrentRow(RuntimeState* state);
+  llvm::Function* CodegenHashCurrentRow(LlvmCodeGen* codegen);
 
   /// Codegen for evaluating a TupleRow and comparing equality against
   /// 'expr_values_buffer_'.  Function signature matches OldHashTable::Equals()
-  llvm::Function* CodegenEquals(RuntimeState* state);
+  llvm::Function* CodegenEquals(LlvmCodeGen* codegen);
 
   static const char* LLVM_CLASS_NAME;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 629e407..6862bb3 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -172,14 +172,6 @@ Status PartitionedAggregationNode::Init(const TPlanNode& tnode, RuntimeState* st
 Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
 
-  // Create the codegen object before preparing conjunct_ctxs_ and children_, so that any
-  // ScalarFnCalls will use codegen.
-  // TODO: this is brittle and hard to reason about, revisit
-  if (state->codegen_enabled()) {
-    LlvmCodeGen* codegen;
-    RETURN_IF_ERROR(state->GetCodegen(&codegen));
-  }
-
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   state_ = state;
 
@@ -292,18 +284,24 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
     }
     DCHECK(serialize_stream_->has_write_block());
   }
-
-  bool codegen_enabled = false;
-  Status codegen_status;
-  if (state->codegen_enabled()) {
-    codegen_status =
-        is_streaming_preagg_ ? CodegenProcessBatchStreaming() : CodegenProcessBatch();
-    codegen_enabled = codegen_status.ok();
+  if (!state->codegen_enabled()) {
+    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
   }
-  runtime_profile()->AddCodegenMsg(codegen_enabled, codegen_status);
   return Status::OK();
 }
 
+void PartitionedAggregationNode::Codegen(RuntimeState* state) {
+  DCHECK(state->codegen_enabled());
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != NULL);
+  TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
+  Status codegen_status =
+     is_streaming_preagg_ ? CodegenProcessBatchStreaming(codegen, prefetch_mode) :
+          CodegenProcessBatch(codegen, prefetch_mode);
+  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+  ExecNode::Codegen(state);
+}
+
 Status PartitionedAggregationNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
@@ -1531,11 +1529,8 @@ Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
 // ret:                                              ; preds = %src_not_null, %entry
 //   ret void
 // }
-Status PartitionedAggregationNode::CodegenUpdateSlot(
+Status PartitionedAggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
     AggFnEvaluator* evaluator, SlotDescriptor* slot_desc, Function** fn) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state_->GetCodegen(&codegen));
-
   // TODO: Fix this DCHECK and Init() once CodegenUpdateSlot() can handle AggFnEvaluator
   // with multiple input expressions (e.g. group_concat).
   DCHECK_EQ(evaluator->input_expr_ctxs().size(), 1);
@@ -1550,7 +1545,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(
   }
 
   Function* agg_expr_fn;
-  RETURN_IF_ERROR(agg_expr->GetCodegendComputeFn(state_, &agg_expr_fn));
+  RETURN_IF_ERROR(agg_expr->GetCodegendComputeFn(codegen, &agg_expr_fn));
 
   PointerType* fn_ctx_type =
       codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME);
@@ -1746,9 +1741,8 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(
 //                          %"class.impala::TupleRow"* %row)
 //   ret void
 // }
-Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state_->GetCodegen(&codegen));
+Status PartitionedAggregationNode::CodegenUpdateTuple(LlvmCodeGen* codegen,
+    Function** fn) {
   SCOPED_TIMER(codegen->codegen_timer());
 
   int j = grouping_expr_ctxs_.size();
@@ -1838,7 +1832,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) {
       builder.CreateStore(count_inc, slot_ptr);
     } else {
       Function* update_slot_fn;
-      RETURN_IF_ERROR(CodegenUpdateSlot(evaluator, slot_desc, &update_slot_fn));
+      RETURN_IF_ERROR(CodegenUpdateSlot(codegen, evaluator, slot_desc, &update_slot_fn));
       Value* agg_fn_ctx_ptr = builder.CreateConstGEP1_32(agg_fn_ctxs_arg, i);
       Value* agg_fn_ctx = builder.CreateLoad(agg_fn_ctx_ptr, "agg_fn_ctx");
       // Call GetExprCtx() to get the expression context.
@@ -1860,13 +1854,12 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CodegenProcessBatch() {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state_->GetCodegen(&codegen));
+Status PartitionedAggregationNode::CodegenProcessBatch(LlvmCodeGen* codegen,
+    TPrefetchMode::type prefetch_mode) {
   SCOPED_TIMER(codegen->codegen_timer());
 
   Function* update_tuple_fn;
-  RETURN_IF_ERROR(CodegenUpdateTuple(&update_tuple_fn));
+  RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
 
   // Get the cross compiled update row batch function
   IRFunction::Type ir_fn = (!grouping_expr_ctxs_.empty() ?
@@ -1880,7 +1873,6 @@ Status PartitionedAggregationNode::CodegenProcessBatch() {
     // Codegen for grouping using hash table
 
     // Replace prefetch_mode with constant so branches can be optimised out.
-    TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
     Value* prefetch_mode_arg = codegen->GetArgument(process_batch_fn, 3);
     prefetch_mode_arg->replaceAllUsesWith(
         ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
@@ -1888,15 +1880,15 @@ Status PartitionedAggregationNode::CodegenProcessBatch() {
     // The codegen'd ProcessBatch function is only used in Open() with level_ = 0,
     // so don't use murmur hash
     Function* hash_fn;
-    RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state_, /* use murmur */ false, &hash_fn));
+    RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, /* use murmur */ false, &hash_fn));
 
     // Codegen HashTable::Equals<true>
     Function* build_equals_fn;
-    RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state_, true, &build_equals_fn));
+    RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn));
 
     // Codegen for evaluating input rows
     Function* eval_grouping_expr_fn;
-    RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(state_, false, &eval_grouping_expr_fn));
+    RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_grouping_expr_fn));
 
     // Replace call sites
     replaced = codegen->ReplaceCallSites(process_batch_fn, eval_grouping_expr_fn,
@@ -1911,7 +1903,7 @@ Status PartitionedAggregationNode::CodegenProcessBatch() {
 
     HashTableCtx::HashTableReplacedConstants replaced_constants;
     const bool stores_duplicates = false;
-    RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state_, stores_duplicates, 1,
+    RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates, 1,
         process_batch_fn, &replaced_constants));
     DCHECK_GE(replaced_constants.stores_nulls, 1);
     DCHECK_GE(replaced_constants.finds_some_nulls, 1);
@@ -1935,10 +1927,9 @@ Status PartitionedAggregationNode::CodegenProcessBatch() {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CodegenProcessBatchStreaming() {
+Status PartitionedAggregationNode::CodegenProcessBatchStreaming(
+    LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
   DCHECK(is_streaming_preagg_);
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state_->GetCodegen(&codegen));
   SCOPED_TIMER(codegen->codegen_timer());
 
   IRFunction::Type ir_fn = IRFunction::PART_AGG_NODE_PROCESS_BATCH_STREAMING;
@@ -1951,25 +1942,24 @@ Status PartitionedAggregationNode::CodegenProcessBatchStreaming() {
       ConstantInt::get(Type::getInt1Ty(codegen->context()), needs_serialize_));
 
   // Replace prefetch_mode with constant so branches can be optimised out.
-  TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
   Value* prefetch_mode_arg = codegen->GetArgument(process_batch_streaming_fn, 3);
   prefetch_mode_arg->replaceAllUsesWith(
       ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
 
   Function* update_tuple_fn;
-  RETURN_IF_ERROR(CodegenUpdateTuple(&update_tuple_fn));
+  RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
 
   // We only use the top-level hash function for streaming aggregations.
   Function* hash_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state_, false, &hash_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, false, &hash_fn));
 
   // Codegen HashTable::Equals
   Function* equals_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state_, true, &equals_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &equals_fn));
 
   // Codegen for evaluating input rows
   Function* eval_grouping_expr_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(state_, false, &eval_grouping_expr_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_grouping_expr_fn));
 
   // Replace call sites
   int replaced = codegen->ReplaceCallSites(process_batch_streaming_fn, update_tuple_fn,
@@ -1988,7 +1978,7 @@ Status PartitionedAggregationNode::CodegenProcessBatchStreaming() {
 
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = false;
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state_, stores_duplicates, 1,
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates, 1,
       process_batch_streaming_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
   DCHECK_GE(replaced_constants.finds_some_nulls, 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 0c0f3e8..9e14e66 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -126,6 +126,7 @@ class PartitionedAggregationNode : public ExecNode {
 
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
+  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state);
@@ -136,7 +137,6 @@ class PartitionedAggregationNode : public ExecNode {
  protected:
   /// Frees local allocations from aggregate_evaluators_ and agg_fn_ctxs
   virtual Status QueryMaintenance(RuntimeState* state);
-
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:
@@ -637,11 +637,11 @@ class PartitionedAggregationNode : public ExecNode {
 
   /// Codegen UpdateSlot(). Returns non-OK status if codegen is unsuccessful.
   /// Assumes is_merge = false;
-  Status CodegenUpdateSlot(AggFnEvaluator* evaluator, SlotDescriptor* slot_desc,
-      llvm::Function** fn);
+  Status CodegenUpdateSlot(LlvmCodeGen* codegen, AggFnEvaluator* evaluator,
+      SlotDescriptor* slot_desc, llvm::Function** fn);
 
   /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
-  Status CodegenUpdateTuple(llvm::Function** fn);
+  Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn);
 
   /// Codegen the non-streaming process row batch loop. The loop has already been
   /// compiled to IR and loaded into the codegen object. UpdateAggTuple has also been
@@ -650,11 +650,12 @@ class PartitionedAggregationNode : public ExecNode {
   /// 'process_batch_no_grouping_fn_' will be updated with the codegened function
   /// depending on whether this is a grouping or non-grouping aggregation.
   /// Assumes AGGREGATED_ROWS = false.
-  Status CodegenProcessBatch();
+  Status CodegenProcessBatch(LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode);
 
   /// Codegen the materialization loop for streaming preaggregations.
   /// 'process_batch_streaming_fn_' will be updated with the codegened function.
-  Status CodegenProcessBatchStreaming();
+  Status CodegenProcessBatchStreaming(
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode);
 
   /// We need two buffers per partition, one for the aggregated stream and one
   /// for the unaggregated stream. We need an additional buffer to read the stream

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index bf5b42a..0f15876 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -138,8 +138,9 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
   partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime");
   build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime");
   repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime");
-
-  Codegen(state);
+  if (!state->codegen_enabled()) {
+    profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
+  }
   return Status::OK();
 }
 
@@ -704,39 +705,34 @@ not_built:
   return Status::OK();
 }
 
-void PhjBuilder::Codegen(RuntimeState* state) {
-  bool build_codegen_enabled = false;
-  bool insert_codegen_enabled = false;
-  Status build_codegen_status, insert_codegen_status;
-  if (state->codegen_enabled()) {
-    Status codegen_status;
-    // Codegen for hashing rows with the builder's hash table context.
-    Function* hash_fn;
-    codegen_status = ht_ctx_->CodegenHashRow(runtime_state_, false, &hash_fn);
-    Function* murmur_hash_fn;
-    codegen_status.MergeStatus(
-        ht_ctx_->CodegenHashRow(runtime_state_, true, &murmur_hash_fn));
-
-    // Codegen for evaluating build rows
-    Function* eval_build_row_fn;
-    codegen_status.MergeStatus(
-        ht_ctx_->CodegenEvalRow(runtime_state_, true, &eval_build_row_fn));
-
-    if (codegen_status.ok()) {
-      build_codegen_status =
-          CodegenProcessBuildBatch(hash_fn, murmur_hash_fn, eval_build_row_fn);
-      insert_codegen_status =
-          CodegenInsertBatch(hash_fn, murmur_hash_fn, eval_build_row_fn);
-    } else {
-      build_codegen_status = codegen_status;
-      insert_codegen_status = codegen_status;
-    }
-    build_codegen_enabled = build_codegen_status.ok();
-    insert_codegen_enabled = insert_codegen_status.ok();
+void PhjBuilder::Codegen(LlvmCodeGen* codegen) {
+  Status build_codegen_status;
+  Status insert_codegen_status;
+  Status codegen_status;
+
+  // Codegen for hashing rows with the builder's hash table context.
+  Function* hash_fn;
+  codegen_status = ht_ctx_->CodegenHashRow(codegen, false, &hash_fn);
+  Function* murmur_hash_fn;
+  codegen_status.MergeStatus(ht_ctx_->CodegenHashRow(codegen, true, &murmur_hash_fn));
+
+  // Codegen for evaluating build rows
+  Function* eval_build_row_fn;
+  codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(codegen, true, &eval_build_row_fn));
+
+  if (codegen_status.ok()) {
+    TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
+    build_codegen_status =
+        CodegenProcessBuildBatch(codegen, hash_fn, murmur_hash_fn, eval_build_row_fn);
+    insert_codegen_status = CodegenInsertBatch(codegen, hash_fn, murmur_hash_fn,
+        eval_build_row_fn, prefetch_mode);
+  } else {
+    build_codegen_status = codegen_status;
+    insert_codegen_status = codegen_status;
   }
-  profile()->AddCodegenMsg(build_codegen_enabled, build_codegen_status, "Build Side");
-  profile()->AddCodegenMsg(
-      insert_codegen_enabled, insert_codegen_status, "Hash Table Construction");
+  profile()->AddCodegenMsg(build_codegen_status.ok(), build_codegen_status, "Build Side");
+  profile()->AddCodegenMsg(insert_codegen_status.ok(), insert_codegen_status,
+      "Hash Table Construction");
 }
 
 string PhjBuilder::DebugString() const {
@@ -763,11 +759,8 @@ string PhjBuilder::DebugString() const {
   return ss.str();
 }
 
-Status PhjBuilder::CodegenProcessBuildBatch(
+Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
     Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
-
   Function* process_build_batch_fn =
       codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true);
   DCHECK(process_build_batch_fn != NULL);
@@ -781,7 +774,7 @@ Status PhjBuilder::CodegenProcessBuildBatch(
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
   const int num_build_tuples = row_desc_.tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(runtime_state_, stores_duplicates,
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates,
       num_build_tuples, process_build_batch_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
   DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
@@ -838,18 +831,14 @@ Status PhjBuilder::CodegenProcessBuildBatch(
   return Status::OK();
 }
 
-Status PhjBuilder::CodegenInsertBatch(
-    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
-
+Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen, Function* hash_fn,
+    Function* murmur_hash_fn, Function* eval_row_fn, TPrefetchMode::type prefetch_mode) {
   Function* insert_batch_fn = codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true);
   Function* build_equals_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(runtime_state_, true, &build_equals_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, true, &build_equals_fn));
 
   // Replace the parameter 'prefetch_mode' with constant.
   Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
-  TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
   DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
   DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
   prefetch_mode_arg->replaceAllUsesWith(
@@ -867,7 +856,7 @@ Status PhjBuilder::CodegenInsertBatch(
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
   const int num_build_tuples = row_desc_.tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(runtime_state_, stores_duplicates,
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates,
       num_build_tuples, insert_batch_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
   DCHECK_EQ(replaced_constants.finds_some_nulls, 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 7f81e5a..650452c 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -84,6 +84,11 @@ class PhjBuilder : public DataSink {
   virtual Status FlushFinal(RuntimeState* state) override;
   virtual void Close(RuntimeState* state) override;
 
+  /// Does all codegen for the builder (if codegen is enabled).
+  /// Updates the the builder's runtime profile with info about whether any errors
+  /// occured during codegen.
+  void Codegen(LlvmCodeGen* codegen);
+
   /////////////////////////////////////////
   // The following functions are used only by PartitionedHashJoinNode.
   /////////////////////////////////////////
@@ -312,20 +317,16 @@ class PhjBuilder : public DataSink {
   /// unacceptably high false-positive rate.
   void PublishRuntimeFilters(int64_t num_build_rows);
 
-  /// Does all codegen for the builder (if codegen is enabled).
-  /// Updates the the builder's runtime profile with info about whether the codegen was
-  /// enabled and whether any errors occured during codegen.
-  void Codegen(RuntimeState* state);
-
   /// Codegen processing build batches. Identical signature to ProcessBuildBatch().
   /// Returns non-OK status if codegen was not possible.
-  Status CodegenProcessBuildBatch(llvm::Function* hash_fn, llvm::Function* murmur_hash_fn,
-      llvm::Function* eval_row_fn);
+  Status CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
+      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn);
 
   /// Codegen inserting batches into a partition's hash table. Identical signature to
   /// Partition::InsertBatch(). Returns non-OK if codegen was not possible.
-  Status CodegenInsertBatch(llvm::Function* hash_fn, llvm::Function* murmur_hash_fn,
-      llvm::Function* eval_row_fn);
+  Status CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
+      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
+      TPrefetchMode::type prefetch_mode);
 
   RuntimeState* const runtime_state_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 46b91ba..024fdc3 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -98,14 +98,6 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
 Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
 
-  // Create the codegen object before preparing conjunct_ctxs_ and children_, so that any
-  // ScalarFnCalls will use codegen.
-  // TODO: this is brittle and hard to reason about, revisit
-  if (state->codegen_enabled()) {
-    LlvmCodeGen* codegen;
-    RETURN_IF_ERROR(state->GetCodegen(&codegen));
-  }
-
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   runtime_state_ = state;
 
@@ -143,18 +135,30 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
 
   num_probe_rows_partitioned_ =
       ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
-
-  bool probe_codegen_enabled = false;
-  Status probe_codegen_status;
-  if (state->codegen_enabled()) {
-    probe_codegen_status = CodegenProcessProbeBatch(state);
-    probe_codegen_enabled = probe_codegen_status.ok();
+  if (!state->codegen_enabled()) {
+    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
   }
-  runtime_profile()->AddCodegenMsg(
-      probe_codegen_enabled, probe_codegen_status, "Probe Side");
   return Status::OK();
 }
 
+void PartitionedHashJoinNode::Codegen(RuntimeState* state) {
+  DCHECK(state->codegen_enabled());
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != NULL);
+
+  // Codegen the build side.
+  builder_->Codegen(codegen);
+
+  // Codegen the probe side.
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  Status probe_codegen_status = CodegenProcessProbeBatch(codegen, prefetch_mode);
+  runtime_profile()->AddCodegenMsg(probe_codegen_status.ok(), probe_codegen_status,
+      "Probe Side");
+
+  // Codegen the children node;
+  ExecNode::Codegen(state);
+}
+
 Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(BlockingJoinNode::Open(state));
@@ -1190,14 +1194,13 @@ Status PartitionedHashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* codegen,
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::CodegenProcessProbeBatch(RuntimeState* state) {
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
+Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
+    LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
   // Codegen for hashing rows
   Function* hash_fn;
   Function* murmur_hash_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state, false, &hash_fn));
-  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state, true, &murmur_hash_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, false, &hash_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(codegen, true, &murmur_hash_fn));
 
   // Get cross compiled function
   IRFunction::Type ir_fn = IRFunction::FN_END;
@@ -1243,7 +1246,6 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(RuntimeState* state) {
 
   // Replace the parameter 'prefetch_mode' with constant.
   Value* prefetch_mode_arg = codegen->GetArgument(process_probe_batch_fn, 1);
-  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
   DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
   DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
   prefetch_mode_arg->replaceAllUsesWith(
@@ -1251,11 +1253,11 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(RuntimeState* state) {
 
   // Codegen HashTable::Equals
   Function* probe_equals_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state, false, &probe_equals_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenEquals(codegen, false, &probe_equals_fn));
 
   // Codegen for evaluating probe rows
   Function* eval_row_fn;
-  RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(state, false, &eval_row_fn));
+  RETURN_IF_ERROR(ht_ctx_->CodegenEvalRow(codegen, false, &eval_row_fn));
 
   // Codegen CreateOutputRow
   Function* create_output_row_fn;
@@ -1263,12 +1265,12 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(RuntimeState* state) {
 
   // Codegen evaluating other join conjuncts
   Function* eval_other_conjuncts_fn;
-  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(state, other_join_conjunct_ctxs_,
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, other_join_conjunct_ctxs_,
       &eval_other_conjuncts_fn, "EvalOtherConjuncts"));
 
   // Codegen evaluating conjuncts
   Function* eval_conjuncts_fn;
-  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(state, conjunct_ctxs_,
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs_,
       &eval_conjuncts_fn));
 
   // Replace all call sites with codegen version
@@ -1317,7 +1319,7 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(RuntimeState* state) {
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
   const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
-  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates,
+  RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, stores_duplicates,
       num_build_tuples, process_probe_batch_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
   DCHECK_GE(replaced_constants.finds_some_nulls, 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 5b9264c..504dc7b 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -110,6 +110,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
+  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state);
@@ -355,7 +356,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Codegen processing probe batches.  Identical signature to ProcessProbeBatch.
   /// Returns non-OK if codegen was not possible.
-  Status CodegenProcessProbeBatch(RuntimeState* state);
+  Status CodegenProcessProbeBatch(
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode);
 
   /// Returns the current state of the partition as a string.
   std::string PrintState() const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 9271721..140e662 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -50,22 +50,23 @@ Status SortNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(sort_exec_exprs_.Prepare(
       state, child(0)->row_desc(), row_descriptor_, expr_mem_tracker()));
   AddExprCtxsToFree(sort_exec_exprs_);
-  TupleRowComparator less_than(sort_exec_exprs_, is_asc_order_, nulls_first_);
-
-  bool codegen_enabled = false;
-  Status codegen_status;
-  if (state->codegen_enabled()) {
-    codegen_status = less_than.Codegen(state);
-    codegen_enabled = codegen_status.ok();
-  }
-  runtime_profile()->AddCodegenMsg(codegen_enabled, codegen_status);
-
-  sorter_.reset(new Sorter(less_than, sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
+  less_than_.reset(new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_));
+  sorter_.reset(new Sorter(*less_than_.get(), sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
       &row_descriptor_, mem_tracker(), runtime_profile(), state));
   RETURN_IF_ERROR(sorter_->Init());
+  if (!state->codegen_enabled()) {
+    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
+  }
   return Status::OK();
 }
 
+void SortNode::Codegen(RuntimeState* state) {
+  DCHECK(state->codegen_enabled());
+  Status codegen_status = less_than_->Codegen(state);
+  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+  ExecNode::Codegen(state);
+}
+
 Status SortNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index a90c99e..75513ba 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -41,6 +41,7 @@ class SortNode : public ExecNode {
 
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
+  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state);
@@ -56,6 +57,9 @@ class SortNode : public ExecNode {
   /// Number of rows to skip.
   int64_t offset_;
 
+  /// The tuple row comparator derived based on 'sort_exec_exprs_'.
+  boost::scoped_ptr<TupleRowComparator> less_than_;
+
   /// Expressions and parameters used for tuple materialization and tuple comparison.
   SortExecExprs sort_exec_exprs_;
   std::vector<bool> is_asc_order_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b15d992a/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 6dd1e34..e72249f 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -63,39 +63,6 @@ Status TopNNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   return Status::OK();
 }
 
-Status TopNNode::Codegen(RuntimeState* state) {
-  DCHECK(materialized_tuple_desc_ != NULL);
-  LlvmCodeGen* codegen;
-  RETURN_IF_ERROR(state->GetCodegen(&codegen));
-  Function* insert_batch_fn =
-      codegen->GetFunction(IRFunction::TOPN_NODE_INSERT_BATCH, true);
-
-  // Generate two MaterializeExprs() functions, one using tuple_pool_ and one with no
-  // pool.
-  Function* materialize_exprs_tuple_pool_fn;
-  RETURN_IF_ERROR(Tuple::CodegenMaterializeExprs(state, false, *materialized_tuple_desc_,
-      sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), tuple_pool_.get(),
-      &materialize_exprs_tuple_pool_fn));
-
-  Function* materialize_exprs_no_pool_fn;
-  RETURN_IF_ERROR(Tuple::CodegenMaterializeExprs(state, false, *materialized_tuple_desc_,
-      sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), NULL, &materialize_exprs_no_pool_fn));
-
-  int replaced = codegen->ReplaceCallSites(insert_batch_fn,
-      materialize_exprs_tuple_pool_fn, Tuple::MATERIALIZE_EXPRS_SYMBOL);
-  DCHECK_EQ(replaced, 1) << LlvmCodeGen::Print(insert_batch_fn);
-
-  replaced = codegen->ReplaceCallSites(insert_batch_fn, materialize_exprs_no_pool_fn,
-      Tuple::MATERIALIZE_EXPRS_NULL_POOL_SYMBOL);
-  DCHECK_EQ(replaced, 1) << LlvmCodeGen::Print(insert_batch_fn);
-
-  insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn);
-  DCHECK(insert_batch_fn != NULL);
-  codegen->AddFunctionToJit(insert_batch_fn,
-      reinterpret_cast<void**>(&codegend_insert_batch_fn_));
-  return Status::OK();
-}
-
 Status TopNNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
@@ -106,23 +73,66 @@ Status TopNNode::Prepare(RuntimeState* state) {
   AddExprCtxsToFree(sort_exec_exprs_);
   tuple_row_less_than_.reset(
       new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_));
-  bool codegen_enabled = false;
-  Status codegen_status;
-  if (state->codegen_enabled()) {
-    // TODO: inline tuple_row_less_than_->Compare()
-    codegen_status = tuple_row_less_than_->Codegen(state);
-    codegen_status.MergeStatus(Codegen(state));
-    codegen_enabled = codegen_status.ok();
-  }
-  runtime_profile()->AddCodegenMsg(codegen_enabled, codegen_status);
   priority_queue_.reset(
       new priority_queue<Tuple*, vector<Tuple*>, ComparatorWrapper<TupleRowComparator>>(
           *tuple_row_less_than_));
   materialized_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
   insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime");
+  if (!state->codegen_enabled()) {
+    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
+  }
+
+runtime_profile()->AddCodegenMsg(false);
   return Status::OK();
 }
 
+void TopNNode::Codegen(RuntimeState* state) {
+  DCHECK(state->codegen_enabled());
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != NULL);
+
+  // TODO: inline tuple_row_less_than_->Compare()
+  Status codegen_status = tuple_row_less_than_->Codegen(state);
+  if (codegen_status.ok()) {
+    Function* insert_batch_fn =
+        codegen->GetFunction(IRFunction::TOPN_NODE_INSERT_BATCH, true);
+    DCHECK(insert_batch_fn != NULL);
+
+    // Generate two MaterializeExprs() functions, one using tuple_pool_ and
+    // one with no pool.
+    DCHECK(materialized_tuple_desc_ != NULL);
+    Function* materialize_exprs_tuple_pool_fn;
+    Function* materialize_exprs_no_pool_fn;
+
+    codegen_status = Tuple::CodegenMaterializeExprs(codegen, false,
+        *materialized_tuple_desc_, sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
+        tuple_pool_.get(), &materialize_exprs_tuple_pool_fn);
+
+    if (codegen_status.ok()) {
+      codegen_status = Tuple::CodegenMaterializeExprs(codegen, false,
+          *materialized_tuple_desc_, sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
+          NULL, &materialize_exprs_no_pool_fn);
+
+      if (codegen_status.ok()) {
+        int replaced = codegen->ReplaceCallSites(insert_batch_fn,
+            materialize_exprs_tuple_pool_fn, Tuple::MATERIALIZE_EXPRS_SYMBOL);
+        DCHECK_EQ(replaced, 1) << LlvmCodeGen::Print(insert_batch_fn);
+
+        replaced = codegen->ReplaceCallSites(insert_batch_fn,
+            materialize_exprs_no_pool_fn, Tuple::MATERIALIZE_EXPRS_NULL_POOL_SYMBOL);
+        DCHECK_EQ(replaced, 1) << LlvmCodeGen::Print(insert_batch_fn);
+
+        insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn);
+        DCHECK(insert_batch_fn != NULL);
+        codegen->AddFunctionToJit(insert_batch_fn,
+            reinterpret_cast<void**>(&codegend_insert_batch_fn_));
+      }
+    }
+  }
+  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+  ExecNode::Codegen(state);
+}
+
 Status TopNNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));