You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/04/01 23:16:06 UTC

[impala] branch master updated: IMPALA-4080 [part 7]: Codegen once per fragment

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new f2837e9  IMPALA-4080 [part 7]: Codegen once per fragment
f2837e9 is described below

commit f2837e97b25be37221ab475b0fd4f66320a4f66b
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Wed Mar 4 10:41:01 2020 -0800

    IMPALA-4080 [part 7]: Codegen once per fragment
    
    This is the final patch:
    - Removes all temporary changes that called codegen on plan nodes and
      data sink through the exec node.
    - Introduces FragmentState class that basically contains the plan tree,
      data sink config and the codegen state for every fragment.
    - Every FragmentState object has its own runtime profile that helps to
      map out every codegen object's profile to its fragment id.
    - Plan nodes are generated before fragment instance threads are spawned.
    - The first fragment instance thread to call codegen on its parent
      parent fragment does the actual codegen work and the rest of the
      instance threads wait till codegen is complete.
    - This very closely mimics the timing of the state machines both for
      query state and the fragment instance states.
    
    Testing:
    - Ran exhaustive tests successfully.
    
    An example of how the codegen in the profile looks:
    
    Per Node Profiles:
      localhost:22002:
         - AdmissionSlots: 1 (1)
         - BloomFilterBytes: 16.00 MB (16777216)
         - ScratchBytesRead: 0
         - ScratchBytesWritten: 0
         - ScratchFileUsedBytes: 0
         - ScratchReads: 0 (0)
         - ScratchWrites: 0 (0)
         - TotalEncryptionTime: 0.000ns
         - TotalReadBlockTime: 0.000ns
        Buffer pool:
           - AllocTime: 153.663us
           - CumulativeAllocationBytes: 16.00 MB (16777216)
           - CumulativeAllocations: 8 (8)
           - PeakReservation: 16.00 MB (16777216)
           - PeakUnpinnedBytes: 0
           - PeakUsedReservation: 16.00 MB (16777216)
           - ReadIoBytes: 0
           - ReadIoOps: 0 (0)
           - ReadIoWaitTime: 0.000ns
           - ReservationLimit: 16.00 MB (16777216)
           - SystemAllocTime: 133.841us
           - WriteIoBytes: 0
           - WriteIoOps: 0 (0)
           - WriteIoWaitTime: 0.000ns
        Fragment F02:
          CodeGen:(Total: 695.207ms, non-child: 0.000ns, % non-child: 0.00%)
             - CodegenInvoluntaryContextSwitches: 102 (102)
             - CodegenTotalWallClockTime: 695.180ms
               - CodegenSysTime: 0.000ns
               - CodegenUserTime: 688.998ms
             - CodegenVoluntaryContextSwitches: 22 (22)
             - CompileTime: 39.657ms
             - IrGenerationTime: 27.443ms
             - LoadTime: 0.000ns
             - ModuleBitcodeSize: 2.53 MB (2647876)
             - NumFunctions: 161 (161)
             - NumInstructions: 7.13K (7128)
             - OptimizationTime: 596.075ms
             - PeakMemoryUsage: 3.48 MB (3649536)
             - PrepareTime: 30.782ms
        Fragment F00:
          CodeGen:(Total: 145.314ms, non-child: 0.000ns, % non-child: 0.00%)
             - CodegenInvoluntaryContextSwitches: 11 (11)
             - CodegenTotalWallClockTime: 145.304ms
               - CodegenSysTime: 3.944ms
               - CodegenUserTime: 137.738ms
             - CodegenVoluntaryContextSwitches: 4 (4)
             - CompileTime: 6.697ms
             - IrGenerationTime: 3.173ms
             - LoadTime: 0.000ns
             - ModuleBitcodeSize: 2.53 MB (2647876)
             - NumFunctions: 41 (41)
             - NumInstructions: 1.02K (1024)
             - OptimizationTime: 119.623ms
             - PeakMemoryUsage: 512.00 KB (524288)
             - PrepareTime: 14.405ms
    
    Change-Id: I3aef8bc621f96caafe9a1c378617a2987e4ad452
    Reviewed-on: http://gerrit.cloudera.org:8080/15408
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/benchmarks/expr-benchmark.cc          |  20 ++-
 be/src/benchmarks/hash-benchmark.cc          |   8 +-
 be/src/codegen/llvm-codegen-test.cc          |  23 ++--
 be/src/codegen/llvm-codegen.cc               |  13 +-
 be/src/codegen/llvm-codegen.h                |  13 +-
 be/src/exec/aggregation-node-base.cc         |  13 +-
 be/src/exec/aggregation-node-base.h          |   4 +-
 be/src/exec/aggregator.cc                    |   7 +-
 be/src/exec/aggregator.h                     |  16 ++-
 be/src/exec/analytic-eval-node.cc            |   4 +-
 be/src/exec/analytic-eval-node.h             |   2 +-
 be/src/exec/blocking-join-node.cc            |   4 +-
 be/src/exec/blocking-join-node.h             |   2 +-
 be/src/exec/data-sink.cc                     |  23 +++-
 be/src/exec/data-sink.h                      |  28 ++--
 be/src/exec/exchange-node.cc                 |  21 +--
 be/src/exec/exchange-node.h                  |   5 +-
 be/src/exec/exec-node.cc                     |  55 ++++----
 be/src/exec/exec-node.h                      |  49 ++++---
 be/src/exec/grouping-aggregator.cc           |  21 ++-
 be/src/exec/grouping-aggregator.h            |  12 +-
 be/src/exec/hdfs-avro-scanner.cc             |   5 +-
 be/src/exec/hdfs-avro-scanner.h              |   4 +-
 be/src/exec/hdfs-columnar-scanner.cc         |   3 +-
 be/src/exec/hdfs-columnar-scanner.h          |   2 +-
 be/src/exec/hdfs-scan-node-base.cc           |  26 ++--
 be/src/exec/hdfs-scan-node-base.h            |   5 +-
 be/src/exec/hdfs-scanner.cc                  |  18 +--
 be/src/exec/hdfs-scanner.h                   |   4 +-
 be/src/exec/hdfs-sequence-scanner.cc         |   3 +-
 be/src/exec/hdfs-sequence-scanner.h          |   2 +-
 be/src/exec/hdfs-table-sink.cc               |   2 +-
 be/src/exec/hdfs-table-sink.h                |   2 +-
 be/src/exec/hdfs-text-scanner.cc             |   3 +-
 be/src/exec/hdfs-text-scanner.h              |   2 +-
 be/src/exec/join-builder.cc                  |   2 +-
 be/src/exec/join-builder.h                   |   2 +-
 be/src/exec/nested-loop-join-builder.cc      |   3 +-
 be/src/exec/nested-loop-join-builder.h       |   2 +-
 be/src/exec/nested-loop-join-node.cc         |   2 +-
 be/src/exec/nested-loop-join-node.h          |   2 +-
 be/src/exec/non-grouping-aggregator.cc       |  16 +--
 be/src/exec/non-grouping-aggregator.h        |  10 +-
 be/src/exec/partial-sort-node.cc             |  20 +--
 be/src/exec/partial-sort-node.h              |   5 +-
 be/src/exec/partitioned-hash-join-builder.cc |  41 +++---
 be/src/exec/partitioned-hash-join-builder.h  |  22 ++-
 be/src/exec/partitioned-hash-join-node.cc    |  45 +++---
 be/src/exec/partitioned-hash-join-node.h     |   7 +-
 be/src/exec/scan-node.cc                     |   4 +-
 be/src/exec/scan-node.h                      |   2 +-
 be/src/exec/select-node.cc                   |  16 +--
 be/src/exec/select-node.h                    |   5 +-
 be/src/exec/sort-node.cc                     |  18 +--
 be/src/exec/sort-node.h                      |   5 +-
 be/src/exec/subplan-node.cc                  |   4 +-
 be/src/exec/subplan-node.h                   |   4 +-
 be/src/exec/topn-node.cc                     |  17 +--
 be/src/exec/topn-node.h                      |   5 +-
 be/src/exec/union-node.cc                    |  16 +--
 be/src/exec/union-node.h                     |   5 +-
 be/src/exec/unnest-node.cc                   |   5 +-
 be/src/exec/unnest-node.h                    |   4 +-
 be/src/exprs/agg-fn.cc                       |   5 +-
 be/src/exprs/agg-fn.h                        |   7 +-
 be/src/exprs/expr-codegen-test.cc            |  16 ++-
 be/src/exprs/hive-udf-call.cc                |   2 +-
 be/src/exprs/hive-udf-call.h                 |   2 +-
 be/src/exprs/is-not-empty-predicate.cc       |   2 +-
 be/src/exprs/is-not-empty-predicate.h        |   2 +-
 be/src/exprs/kudu-partition-expr.cc          |   4 +-
 be/src/exprs/kudu-partition-expr.h           |   2 +-
 be/src/exprs/scalar-expr.cc                  |  13 +-
 be/src/exprs/scalar-expr.h                   |  15 +-
 be/src/exprs/scalar-fn-call.cc               |   3 +-
 be/src/exprs/scalar-fn-call.h                |   2 +-
 be/src/exprs/slot-ref.cc                     |   3 +-
 be/src/exprs/slot-ref.h                      |   2 +-
 be/src/exprs/tuple-is-null-predicate.cc      |   2 +-
 be/src/exprs/tuple-is-null-predicate.h       |   2 +-
 be/src/exprs/valid-tuple-id.cc               |   2 +-
 be/src/exprs/valid-tuple-id.h                |   2 +-
 be/src/runtime/CMakeLists.txt                |   1 +
 be/src/runtime/data-stream-test.cc           |  19 ++-
 be/src/runtime/fragment-instance-state.cc    |  51 ++-----
 be/src/runtime/fragment-instance-state.h     |  10 +-
 be/src/runtime/fragment-state.cc             | 165 ++++++++++++++++++++++
 be/src/runtime/fragment-state.h              | 196 +++++++++++++++++++++++++++
 be/src/runtime/krpc-data-stream-sender.cc    |  25 ++--
 be/src/runtime/krpc-data-stream-sender.h     |  14 +-
 be/src/runtime/query-state.cc                | 120 ++++++++++------
 be/src/runtime/query-state.h                 |  16 ++-
 be/src/runtime/runtime-state.cc              |  21 ---
 be/src/runtime/runtime-state.h               |  71 ----------
 be/src/runtime/test-env.cc                   |   5 +-
 be/src/service/fe-support.cc                 |  19 ++-
 be/src/udf/udf-internal.h                    |   6 +-
 be/src/udf/udf.cc                            |   9 +-
 be/src/util/tuple-row-compare.cc             |   3 +-
 be/src/util/tuple-row-compare.h              |   3 +-
 100 files changed, 930 insertions(+), 625 deletions(-)

diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index 79ad8ae..028f137 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -50,8 +50,10 @@
 #include "common/init.h"
 #include "common/object-pool.h"
 #include "common/status.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "service/fe-support.h"
 #include "service/frontend.h"
 #include "service/impala-server.h"
@@ -70,6 +72,10 @@ class Planner {
     ABORT_IF_ERROR(exec_env_.InitForFeTests());
   }
 
+  ~Planner() {
+    if (fragment_state_ != nullptr) fragment_state_->ReleaseResources();
+  }
+
   Status GeneratePlan(const string& stmt, TExecRequest* result) {
     TQueryCtx query_ctx;
     query_ctx.client_request.stmt = stmt;
@@ -78,18 +84,23 @@ class Planner {
     TNetworkAddress dummy;
     ImpalaServer::PrepareQueryContext(dummy, dummy, &query_ctx);
     runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_));
+    TPlanFragmentCtx* fragment_ctx =
+        runtime_state_->obj_pool()->Add(new TPlanFragmentCtx());
+    fragment_state_ = runtime_state_->obj_pool()->Add(
+        new FragmentState(runtime_state_->query_state(), *fragment_ctx));
 
     return frontend_.GetExecRequest(query_ctx, result);
   }
 
-  RuntimeState* GetRuntimeState() {
-    return runtime_state_.get();
-  }
+  RuntimeState* GetRuntimeState() { return runtime_state_.get(); }
+
+  FragmentState* GetFragmentState() { return fragment_state_; }
 
  private:
   Frontend frontend_;
   ExecEnv exec_env_;
   scoped_ptr<RuntimeState> runtime_state_;
+  FragmentState* fragment_state_;
 
   TQueryOptions query_options_;
   TSessionState session_state_;
@@ -114,7 +125,8 @@ static Status PrepareSelectList(
   DCHECK_EQ(texprs.size(), 1);
   RuntimeState* state = planner->GetRuntimeState();
   ScalarExpr* expr;
-  RETURN_IF_ERROR(ScalarExpr::Create(texprs[0], RowDescriptor(), state, &expr));
+  RETURN_IF_ERROR(
+      ScalarExpr::Create(texprs[0], RowDescriptor(), planner->GetFragmentState(), &expr));
   RETURN_IF_ERROR(
       ScalarExprEvaluator::Create(*expr, state, &pool, &mem_pool, &mem_pool, eval));
   return Status::OK();
diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc
index 3f605c9..d8e7d5e 100644
--- a/be/src/benchmarks/hash-benchmark.cc
+++ b/be/src/benchmarks/hash-benchmark.cc
@@ -23,7 +23,9 @@
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
 #include "experiments/data-provider.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/test-env.h"
 #include "service/fe-support.h"
 #include "util/benchmark.h"
@@ -479,6 +481,10 @@ int main(int argc, char **argv) {
     return -1;
   }
   status = test_env.CreateQueryState(0, nullptr, &state);
+  QueryState* qs = state->query_state();
+  TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx());
+  FragmentState* fragment_state =
+      qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx));
   if (!status.ok()) {
     cout << "Could not create RuntimeState";
     return -1;
@@ -492,7 +498,7 @@ int main(int argc, char **argv) {
   DataProvider mixed_provider(&mem_pool, mixed_profile);
 
   scoped_ptr<LlvmCodeGen> codegen;
-  status = LlvmCodeGen::CreateImpalaCodegen(state, NULL, "test", &codegen);
+  status = LlvmCodeGen::CreateImpalaCodegen(fragment_state, NULL, "test", &codegen);
   if (!status.ok()) {
     cout << "Could not start codegen.";
     return -1;
diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc
index f77d8df..36b9bad 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -23,6 +23,8 @@
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
 #include "common/object-pool.h"
+#include "runtime/fragment-state.h"
+#include "runtime/query-state.h"
 #include "runtime/string-value.h"
 #include "runtime/test-env.h"
 #include "service/fe-support.h"
@@ -42,16 +44,21 @@ namespace impala {
 class LlvmCodeGenTest : public testing:: Test {
  protected:
   scoped_ptr<TestEnv> test_env_;
-  RuntimeState* runtime_state_;
+  FragmentState* fragment_state_;
 
   virtual void SetUp() {
     test_env_.reset(new TestEnv());
     ASSERT_OK(test_env_->Init());
+    RuntimeState* runtime_state_;
     ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
+    QueryState* qs = runtime_state_->query_state();
+    TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx());
+    fragment_state_ = qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx));
   }
 
   virtual void TearDown() {
-    runtime_state_ = NULL;
+    fragment_state_->ReleaseResources();
+    fragment_state_ = nullptr;
     test_env_.reset();
   }
 
@@ -78,8 +85,8 @@ class LlvmCodeGenTest : public testing:: Test {
 
   // Wrapper to call private test-only methods on LlvmCodeGen object
   Status CreateFromFile(const string& filename, scoped_ptr<LlvmCodeGen>* codegen) {
-    RETURN_IF_ERROR(LlvmCodeGen::CreateFromFile(runtime_state_,
-        runtime_state_->obj_pool(), NULL, filename, "test", codegen));
+    RETURN_IF_ERROR(LlvmCodeGen::CreateFromFile(fragment_state_,
+        fragment_state_->obj_pool(), NULL, filename, "test", codegen));
     return (*codegen)->MaterializeModule();
   }
 
@@ -342,7 +349,7 @@ llvm::Function* CodegenStringTest(LlvmCodeGen* codegen) {
 // and modify it.
 TEST_F(LlvmCodeGenTest, StringValue) {
   scoped_ptr<LlvmCodeGen> codegen;
-  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, NULL, "test", &codegen));
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", &codegen));
   EXPECT_TRUE(codegen.get() != NULL);
 
   string str("Test");
@@ -383,7 +390,7 @@ TEST_F(LlvmCodeGenTest, StringValue) {
 // Test calling memcpy intrinsic
 TEST_F(LlvmCodeGenTest, MemcpyTest) {
   scoped_ptr<LlvmCodeGen> codegen;
-  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, NULL, "test", &codegen));
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", &codegen));
   ASSERT_TRUE(codegen.get() != NULL);
 
   LlvmCodeGen::FnPrototype prototype(codegen.get(), "MemcpyTest", codegen->void_type());
@@ -429,7 +436,7 @@ TEST_F(LlvmCodeGenTest, HashTest) {
   // Loop to test both the sse4 on/off paths
   for (int i = 0; i < 2; ++i) {
     scoped_ptr<LlvmCodeGen> codegen;
-    ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, NULL, "test", &codegen));
+    ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", &codegen));
     ASSERT_TRUE(codegen.get() != NULL);
     const auto close_codegen =
         MakeScopeExitTrigger([&codegen]() { codegen->Close(); });
@@ -541,7 +548,7 @@ TEST_F(LlvmCodeGenTest, CpuAttrWhitelist) {
 // finalizes the llvm module.
 TEST_F(LlvmCodeGenTest, CleanupNonFinalizedMethodsTest) {
   scoped_ptr<LlvmCodeGen> codegen;
-  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, nullptr, "test", &codegen));
+  ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, nullptr, "test", &codegen));
   ASSERT_TRUE(codegen.get() != nullptr);
   const auto close_codegen = MakeScopeExitTrigger([&codegen]() { codegen->Close(); });
   LlvmBuilder builder(codegen->context());
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 0d14bdf..333a645 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -70,6 +70,7 @@
 #include "runtime/lib-cache.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/fragment-state.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-value.h"
 #include "runtime/timestamp-value.h"
@@ -194,7 +195,7 @@ Status LlvmCodeGen::InitializeLlvm(bool load_backend) {
   return Status::OK();
 }
 
-LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
+LlvmCodeGen::LlvmCodeGen(FragmentState* state, ObjectPool* pool,
     MemTracker* parent_mem_tracker, const string& id)
   : state_(state),
     id_(id),
@@ -221,7 +222,7 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
   llvm_thread_counters_ = ADD_THREAD_COUNTERS(profile_, "Codegen");
 }
 
-Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool,
+Status LlvmCodeGen::CreateFromFile(FragmentState* state, ObjectPool* pool,
     MemTracker* parent_mem_tracker, const string& file, const string& id,
     scoped_ptr<LlvmCodeGen>* codegen) {
   codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
@@ -239,7 +240,7 @@ error:
   return status;
 }
 
-Status LlvmCodeGen::CreateFromMemory(RuntimeState* state, ObjectPool* pool,
+Status LlvmCodeGen::CreateFromMemory(FragmentState* state, ObjectPool* pool,
     MemTracker* parent_mem_tracker, const string& id, scoped_ptr<LlvmCodeGen>* codegen) {
   codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
   SCOPED_TIMER((*codegen)->profile_->total_time_counter());
@@ -369,7 +370,7 @@ void LlvmCodeGen::StripGlobalCtorsDtors(llvm::Module* module) {
   if (destructors != NULL) destructors->eraseFromParent();
 }
 
-Status LlvmCodeGen::CreateImpalaCodegen(RuntimeState* state,
+Status LlvmCodeGen::CreateImpalaCodegen(FragmentState* state,
     MemTracker* parent_mem_tracker, const string& id,
     scoped_ptr<LlvmCodeGen>* codegen_ret) {
   DCHECK(state != nullptr);
@@ -993,8 +994,8 @@ int LlvmCodeGen::InlineConstFnAttrs(const FunctionContext::TypeDesc& ret_type,
     int i_val = static_cast<int>(i_arg->getSExtValue());
     DCHECK(state_ != nullptr);
     // All supported constants are currently integers.
-    call_instr->replaceAllUsesWith(GetI32Constant(
-        FunctionContextImpl::GetConstFnAttr(state_, ret_type, arg_types, t_val, i_val)));
+    call_instr->replaceAllUsesWith(GetI32Constant(FunctionContextImpl::GetConstFnAttr(
+        state_->query_options().decimal_v2, ret_type, arg_types, t_val, i_val)));
     call_instr->eraseFromParent();
     ++replaced;
   }
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 6e4ab95..f28fbe9 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -85,6 +85,7 @@ namespace impala {
 
 class CodegenCallGraph;
 class CodegenSymbolEmitter;
+class FragmentState;
 class ImpalaMCJITMemoryManager;
 class SubExprElimination;
 class TupleDescriptor;
@@ -169,7 +170,7 @@ class LlvmCodeGen {
   /// 'codegen' will contain the created object on success.
   /// 'parent_mem_tracker' - if non-NULL, the CodeGen MemTracker is created under this.
   /// 'id' is used for outputting the IR module for debugging.
-  static Status CreateImpalaCodegen(RuntimeState* state, MemTracker* parent_mem_tracker,
+  static Status CreateImpalaCodegen(FragmentState* state, MemTracker* parent_mem_tracker,
       const std::string& id, boost::scoped_ptr<LlvmCodeGen>* codegen);
 
   ~LlvmCodeGen();
@@ -610,7 +611,7 @@ class LlvmCodeGen {
   friend class SubExprElimination;
 
   /// Top level codegen object. 'module_id' is used for debugging when outputting the IR.
-  LlvmCodeGen(RuntimeState* state, ObjectPool* pool, MemTracker* parent_mem_tracker,
+  LlvmCodeGen(FragmentState* state, ObjectPool* pool, MemTracker* parent_mem_tracker,
       const std::string& module_id);
 
   /// Initializes the jitter and execution engine with the given module.
@@ -620,7 +621,7 @@ class LlvmCodeGen {
   /// 'codegen' will contain the created object on success. The functions in the module
   /// are materialized lazily. Getting a reference to a function via GetFunction() will
   /// materialize the function and its callees recursively.
-  static Status CreateFromFile(RuntimeState* state, ObjectPool* pool,
+  static Status CreateFromFile(FragmentState* state, ObjectPool* pool,
       MemTracker* parent_mem_tracker, const std::string& file,
       const std::string& id, boost::scoped_ptr<LlvmCodeGen>* codegen);
 
@@ -628,7 +629,7 @@ class LlvmCodeGen {
   /// 'codegen' will contain the created object on success. The functions in the module
   /// are materialized lazily. Getting a reference to a function via GetFunction() will
   /// materialize the function and its callees recursively.
-  static Status CreateFromMemory(RuntimeState* state, ObjectPool* pool,
+  static Status CreateFromMemory(FragmentState* state, ObjectPool* pool,
       MemTracker* parent_mem_tracker, const std::string& id,
       boost::scoped_ptr<LlvmCodeGen>* codegen);
 
@@ -755,9 +756,9 @@ class LlvmCodeGen {
   /// Used for determining dependencies when materializing IR functions.
   static CodegenCallGraph shared_call_graph_;
 
-  /// Pointer to the RuntimeState which owns this codegen object. Needed in
+  /// Pointer to the FragmentState which owns this codegen object. Needed in
   /// InlineConstFnAttr() to access the query options.
-  const RuntimeState* state_;
+  const FragmentState* state_;
 
   /// ID used for debugging (can be e.g. the fragment instance ID)
   std::string id_;
diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc
index 2a12a3e..0cad9a8 100644
--- a/be/src/exec/aggregation-node-base.cc
+++ b/be/src/exec/aggregation-node-base.cc
@@ -21,6 +21,7 @@
 #include "exec/grouping-aggregator.h"
 #include "exec/non-grouping-aggregator.h"
 #include "exec/streaming-aggregation-node.h"
+#include "runtime/fragment-state.h"
 #include "runtime/runtime-state.h"
 #include "util/runtime-profile-counters.h"
 
@@ -28,7 +29,7 @@
 
 namespace impala {
 
-Status AggregationPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status AggregationPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   int num_ags = tnode_->agg_node.aggregators.size();
   for (int i = 0; i < num_ags; ++i) {
@@ -44,6 +45,7 @@ Status AggregationPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(aggs_[i]->Init(agg, state, this));
   }
   DCHECK(aggs_.size() > 0);
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
@@ -94,15 +96,16 @@ Status AggregationNodeBase::Prepare(RuntimeState* state) {
     agg->SetDebugOptions(debug_options_);
     RETURN_IF_ERROR(agg->Prepare(state));
   }
-  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   return Status::OK();
 }
 
-void AggregationNodeBase::Codegen(RuntimeState* state) {
+void AggregationPlanNode::Codegen(FragmentState* state) {
   DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
+  PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  for (auto& agg : aggs_) agg->Codegen(state);
+  for (auto& agg : aggs_) {
+    agg->Codegen(state);
+  }
 }
 
 Status AggregationNodeBase::Reset(RuntimeState* state, RowBatch* row_batch) {
diff --git a/be/src/exec/aggregation-node-base.h b/be/src/exec/aggregation-node-base.h
index efdc964..6d148b9 100644
--- a/be/src/exec/aggregation-node-base.h
+++ b/be/src/exec/aggregation-node-base.h
@@ -27,7 +27,8 @@ namespace impala {
 
 class AggregationPlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
+  virtual void Codegen(FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
   ~AggregationPlanNode() {}
@@ -45,7 +46,6 @@ class AggregationNodeBase : public ExecNode {
       ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs);
 
   virtual Status Prepare(RuntimeState* state) override;
-  virtual void Codegen(RuntimeState* state) override;
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
 
  protected:
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index 705102b..d6eb986 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -28,6 +28,7 @@
 #include "exprs/scalar-expr-evaluator.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
@@ -43,7 +44,7 @@
 namespace impala {
 
 AggregatorConfig::AggregatorConfig(
-    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx)
+    const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx)
   : agg_idx_(agg_idx),
     intermediate_tuple_id_(taggregator.intermediate_tuple_id),
     intermediate_tuple_desc_(
@@ -55,7 +56,7 @@ AggregatorConfig::AggregatorConfig(
     needs_finalize_(taggregator.need_finalize) {}
 
 Status AggregatorConfig::Init(
-    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) {
+    const TAggregator& taggregator, FragmentState* state, PlanNode* pnode) {
   DCHECK(intermediate_tuple_desc_ != nullptr);
   DCHECK(output_tuple_desc_ != nullptr);
   DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
@@ -85,6 +86,7 @@ Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool,
     const AggregatorConfig& config, const std::string& name)
   : id_(exec_node->id()),
     exec_node_(exec_node),
+    config_(config),
     agg_idx_(config.agg_idx_),
     pool_(pool),
     intermediate_tuple_id_(config.intermediate_tuple_id_),
@@ -120,6 +122,7 @@ Status Aggregator::Prepare(RuntimeState* state) {
 }
 
 Status Aggregator::Open(RuntimeState* state) {
+  runtime_profile_->AppendExecOption(config_.codegen_status_msg_);
   RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(conjunct_evals_, state));
   return Status::OK();
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index 3293c9d..693716e 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -38,6 +38,7 @@ class PlanNode;
 class CodegenAnyVal;
 class DescriptorTbl;
 class ExecNode;
+class FragmentState;
 class LlvmBuilder;
 class LlvmCodeGen;
 class MemPool;
@@ -60,12 +61,12 @@ class AggregatorConfig {
  public:
   /// 'agg_idx' is the index of 'TAggregator' in the parent TAggregationNode.
   AggregatorConfig(
-      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx);
+      const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx);
   virtual Status Init(
-      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode);
+      const TAggregator& taggregator, FragmentState* state, PlanNode* pnode);
   /// Closes the expressions created in Init();
   virtual void Close();
-  virtual Status Codegen(RuntimeState* state) = 0;
+  virtual void Codegen(FragmentState* state) = 0;
   virtual ~AggregatorConfig() {}
 
   /// The index of this Aggregator within the AggregationNode which is also equivalent to
@@ -98,6 +99,11 @@ class AggregatorConfig {
   /// The list of all aggregate operations for this aggregator.
   std::vector<AggFn*> aggregate_functions_;
 
+  /// A message that will eventually be added to the aggregator's (spawned using this
+  /// config) runtime profile to convey codegen related information. Populated in
+  /// Codegen().
+  std::string codegen_status_msg_;
+
   virtual int GetNumGroupingExprs() const = 0;
 
  protected:
@@ -134,7 +140,6 @@ class Aggregator {
   /// Aggregators follow the same lifecycle as ExecNodes, except that after Open() and
   /// before GetNext() rows should be added with AddBatch(), followed by InputDone()[
   virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
-  virtual void Codegen(RuntimeState* state) = 0;
   virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
   virtual Status GetNext(
       RuntimeState* state, RowBatch* row_batch, bool* eos) WARN_UNUSED_RESULT = 0;
@@ -172,6 +177,9 @@ class Aggregator {
   const int id_;
   ExecNode* exec_node_;
 
+  /// Reference to the config object used to generate this instance.
+  const AggregatorConfig& config_;
+
   /// The index of this Aggregator within the AggregationNode. When returning output, this
   /// Aggregator should only write tuples at 'agg_idx_' within the row.
   const int agg_idx_;
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 37d1d31..c3287e0 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -26,8 +26,8 @@
 #include "exprs/scalar-expr.h"
 #include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "udf/udf-internal.h"
@@ -42,7 +42,7 @@ using namespace strings;
 
 namespace impala {
 
-Status AnalyticEvalPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status AnalyticEvalPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
 
   const TAnalyticNode& analytic_node = tnode.analytic_node;
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index d55f5f9..fc6967b 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -34,7 +34,7 @@ class ScalarExprEvaluator;
 
 class AnalyticEvalPlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 2912e63..97c6fe6 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -23,9 +23,9 @@
 #include "exec/join-builder.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/thread-resource-mgr.h"
@@ -43,7 +43,7 @@ using namespace impala;
 
 const char* BlockingJoinNode::LLVM_CLASS_NAME = "class.impala::BlockingJoinNode";
 
-Status BlockingJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status BlockingJoinPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   DCHECK(tnode.__isset.join_node);
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   join_op_ = tnode_->join_node.join_op;
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 18c6cf2..39211de 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -37,7 +37,7 @@ class BlockingJoinPlanNode : public PlanNode {
  public:
   /// Subclasses should call BlockingJoinNode::Init() and then perform any other Init()
   /// work, e.g. creating expr trees.
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override = 0;
 
   /// Returns true if this join node will use a separate builder that is the root sink
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index a49df19..30e1201 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -36,6 +36,7 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
+#include "runtime/fragment-state.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
@@ -46,19 +47,30 @@ using strings::Substitute;
 
 namespace impala {
 
+void DataSinkConfig::Codegen(FragmentState* state) {
+  return;
+}
+
 void DataSinkConfig::Close() {
   ScalarExpr::Close(output_exprs_);
 }
 
 Status DataSinkConfig::Init(
-    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
   tsink_ = &tsink;
   input_row_desc_ = input_row_desc;
   return ScalarExpr::Create(tsink.output_exprs, *input_row_desc_, state, &output_exprs_);
 }
 
+void DataSinkConfig::AddCodegenStatus(
+    const Status& codegen_status, const std::string& extra_label) {
+  codegen_status_msgs_.emplace_back(FragmentState::GenerateCodegenMsg(
+      codegen_status.ok(), codegen_status, extra_label));
+}
+
 Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
-    const RowDescriptor* row_desc, RuntimeState* state, DataSinkConfig** data_sink) {
+    const RowDescriptor* row_desc, FragmentState* state,
+    DataSinkConfig** data_sink) {
   ObjectPool* pool = state->obj_pool();
   *data_sink = nullptr;
   switch (thrift_sink.type) {
@@ -151,12 +163,11 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   return Status::OK();
 }
 
-void DataSink::Codegen(RuntimeState* state) {
-  return;
-}
-
 Status DataSink::Open(RuntimeState* state) {
   DCHECK_EQ(output_exprs_.size(), output_expr_evals_.size());
+  for (const string& codegen_msg : sink_config_.codegen_status_msgs_) {
+    profile_->AppendExecOption(codegen_msg);
+  }
   return ScalarExprEvaluator::Open(output_expr_evals_, state);
 }
 
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 9445ff2..3dcb268 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -29,12 +29,11 @@
 namespace impala {
 
 class DataSink;
+class FragmentState;
 class MemPool;
 class MemTracker;
 class ObjectPool;
 class RowBatch;
-class RuntimeProfile;
-class RuntimeState;
 class RowDescriptor;
 class ScalarExpr;
 class ScalarExprEvaluator;
@@ -57,11 +56,15 @@ class DataSinkConfig {
   virtual DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
     const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const = 0;
 
+  /// Codegen expressions in the sink. Overridden by sink type which supports codegen.
+  /// No-op by default.
+  virtual void Codegen(FragmentState* state);
+
   /// Close() releases all resources that were allocated during creation.
   virtual void Close();
 
   /// Pointer to the thrift data sink struct associated with this sink. Set in Init() and
-  /// owned by QueryState.
+  /// owned by FragmentState.
   const TDataSink* tsink_ = nullptr;
 
   /// The row descriptor for the rows consumed by the sink. Owned by root plan node of
@@ -72,16 +75,24 @@ class DataSinkConfig {
   /// Not used in some sub-classes.
   std::vector<ScalarExpr*> output_exprs_;
 
+  /// A list of messages that will eventually be added to the data sink's runtime
+  /// profile to convey codegen related information. Populated in Codegen().
+  std::vector<std::string> codegen_status_msgs_;
+
   /// Creates a new data sink config, allocated in state->obj_pool() and returned through
   /// *sink, from the thrift sink object in fragment_ctx.
   static Status CreateConfig(const TDataSink& thrift_sink, const RowDescriptor* row_desc,
-      RuntimeState* state, DataSinkConfig** sink);
+      FragmentState* state, DataSinkConfig** data_sink);
 
  protected:
   /// Sets reference to TDataSink and initializes the expressions. Returns error status on
   /// failure. If overridden in subclass, must first call superclass's Init().
-  virtual Status Init(
-      const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state);
+  virtual Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      FragmentState* state);
+
+  /// Helper method to add codegen messages from status objects.
+  void AddCodegenStatus(
+      const Status& codegen_status, const std::string& extra_label = "");
 
  private:
   DISALLOW_COPY_AND_ASSIGN(DataSinkConfig);
@@ -111,11 +122,8 @@ class DataSink {
   /// initializes their evaluators. Subclasses must call DataSink::Prepare().
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
 
-  /// Codegen expressions in the sink. Overridden by sink type which supports codegen.
-  /// No-op by default.
-  virtual void Codegen(RuntimeState* state);
-
   /// Call before Send() to open the sink and initialize output expression evaluators.
+  ///  Subclasses must call DataSink::Open().
   virtual Status Open(RuntimeState* state);
 
   /// Send a row batch into this sink. Generally, Send() should not retain any references
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index ec640eb..9a2894d 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -22,6 +22,7 @@
 #include "exec/exec-node-util.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/row-batch.h"
@@ -42,7 +43,7 @@ using namespace impala;
 DEFINE_int64(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
     "(Advanced) Maximum size of per-query receive-side buffer");
 
-Status ExchangePlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status ExchangePlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   bool is_merging = tnode_->exchange_node.__isset.sort_info;
   if (!is_merging) return Status::OK();
@@ -52,6 +53,7 @@ Status ExchangePlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
       sort_info.ordering_exprs, *row_descriptor_, state, &ordering_exprs_));
   row_comparator_config_ =
       state->obj_pool()->Add(new TupleRowComparatorConfig(sort_info, ordering_exprs_));
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
@@ -110,28 +112,19 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
       FLAGS_exchg_node_buffer_size_bytes, is_merging_, runtime_profile(), mem_tracker(),
       &recvr_buffer_pool_client_);
 
-  if (is_merging_) state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   return Status::OK();
 }
 
-void ExchangeNode::Codegen(RuntimeState* state) {
+void ExchangePlanNode::Codegen(FragmentState* state) {
   DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
+  PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
 
-  if (is_merging_) {
-    const ExchangePlanNode& exch_pnode = static_cast<const ExchangePlanNode&>(plan_node_);
-    ExchangePlanNode& non_const_pnode = const_cast<ExchangePlanNode&>(exch_pnode);
-    non_const_pnode.Codegen(state, runtime_profile());
+  if (row_comparator_config_ != nullptr) {
+    AddCodegenStatus(row_comparator_config_->Codegen(state));
   }
 }
 
-void ExchangePlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
-  DCHECK(row_comparator_config_ != nullptr);
-  Status codegen_status = row_comparator_config_->Codegen(state);
-  runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
-}
-
 Status ExchangeNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   ScopedOpenEventAdder ea(this);
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index 2342058..ab5c3c5 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -34,10 +34,10 @@ class TupleRowComparatorConfig;
 
 class ExchangePlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
-  void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+  virtual void Codegen(FragmentState* state) override;
 
   ~ExchangePlanNode(){}
 
@@ -66,7 +66,6 @@ class ExchangeNode : public ExecNode {
       ObjectPool* pool, const ExchangePlanNode& pnode, const DescriptorTbl& descs);
 
   virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
   /// Blocks until the first batch is available for consumption via GetNext().
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 55589e5..8d5d8f3 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -57,10 +57,10 @@
 #include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/initial-reservations.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"
@@ -72,7 +72,7 @@
 using strings::Substitute;
 
 namespace impala {
-Status PlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status PlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   tnode_ = &tnode;
   row_descriptor_ = state->obj_pool()->Add(
       new RowDescriptor(state->desc_tbl(), tnode_->row_tuples, tnode_->nullable_tuples));
@@ -93,14 +93,14 @@ void PlanNode::Close() {
   }
 }
 
-Status PlanNode::CreateTree(
-      RuntimeState* state, const TPlan& plan, PlanNode** root) {
+Status PlanNode::CreateTree(FragmentState* state, const TPlan& plan, PlanNode** root) {
   if (plan.nodes.size() == 0) {
     *root = NULL;
     return Status::OK();
   }
   int node_idx = 0;
-  Status status = CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root);
+  Status status =
+      CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root);
   if (status.ok() && node_idx + 1 != plan.nodes.size()) {
     status = Status(
         "Plan tree only partially reconstructed. Not all thrift nodes were used.");
@@ -112,9 +112,9 @@ Status PlanNode::CreateTree(
   return status;
 }
 
-Status PlanNode::CreateTreeHelper(RuntimeState* state,
-      const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx,
-      PlanNode** root) {
+Status PlanNode::CreateTreeHelper(FragmentState* state,
+    const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx,
+    PlanNode** root) {
   // propagate error case
   if (*node_idx >= tnodes.size()) {
     return Status("Failed to reconstruct plan tree from thrift.");
@@ -123,7 +123,7 @@ Status PlanNode::CreateTreeHelper(RuntimeState* state,
 
   int num_children = tnode.num_children;
   PlanNode* node = NULL;
-  RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node, state));
+  RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node));
   if (parent != NULL) {
     parent->children_.push_back(node);
   } else {
@@ -131,7 +131,8 @@ Status PlanNode::CreateTreeHelper(RuntimeState* state,
   }
   for (int i = 0; i < num_children; ++i) {
     ++*node_idx;
-    RETURN_IF_ERROR(CreateTreeHelper(state, tnodes, node, node_idx, NULL));
+    RETURN_IF_ERROR(
+        CreateTreeHelper(state, tnodes, node, node_idx, nullptr));
     // we are expecting a child, but have used all nodes
     // this means we have been given a bad tree and must fail
     if (*node_idx >= tnodes.size()) {
@@ -144,8 +145,14 @@ Status PlanNode::CreateTreeHelper(RuntimeState* state,
   return Status::OK();
 }
 
-Status PlanNode::CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNode** node,
-      RuntimeState* state) {
+void PlanNode::AddCodegenStatus(
+    const Status& codegen_status, const std::string& extra_label) {
+  codegen_status_msgs_.emplace_back(FragmentState::GenerateCodegenMsg(
+      codegen_status.ok(), codegen_status, extra_label));
+}
+
+Status PlanNode::CreatePlanNode(
+    ObjectPool* pool, const TPlanNode& tnode, PlanNode** node) {
   switch (tnode.node_type) {
     case TPlanNodeType::HDFS_SCAN_NODE:
       *node = pool->Add(new HdfsScanPlanNode());
@@ -215,6 +222,14 @@ Status PlanNode::CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNo
   return Status::OK();
 }
 
+void PlanNode::Codegen(FragmentState* state) {
+  DCHECK(state->ShouldCodegen());
+  DCHECK(state->codegen() != nullptr);
+  for (PlanNode* child : children_) {
+    child->Codegen(state);
+  }
+}
+
 const string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";
 
 ExecNode::ExecNode(ObjectPool* pool, const PlanNode& pnode, const DescriptorTbl& descs)
@@ -231,7 +246,6 @@ ExecNode::ExecNode(ObjectPool* pool, const PlanNode& pnode, const DescriptorTbl&
     rows_returned_counter_(nullptr),
     rows_returned_rate_(nullptr),
     containing_subplan_(nullptr),
-    disable_codegen_(pnode.tnode_->disable_codegen),
     num_rows_returned_(0),
     is_closed_(false) {
   runtime_profile_->SetPlanNodeId(id_);
@@ -271,17 +285,12 @@ Status ExecNode::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
-void ExecNode::Codegen(RuntimeState* state) {
-  DCHECK(state->ShouldCodegen());
-  DCHECK(state->codegen() != NULL);
-  for (int i = 0; i < children_.size(); ++i) {
-    children_[i]->Codegen(state);
-  }
-}
-
 Status ExecNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::OPEN, state));
   DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
+  for (const string& codegen_msg : plan_node_.codegen_status_msgs_) {
+    runtime_profile_->AppendExecOption(codegen_msg);
+  }
   return ScalarExprEvaluator::Open(conjunct_evals_, state);
 }
 
@@ -470,10 +479,6 @@ Status ExecNode::QueryMaintenance(RuntimeState* state) {
   return state->CheckQueryState();
 }
 
-bool ExecNode::IsNodeCodegenDisabled() const {
-  return disable_codegen_;
-}
-
 // Codegen for EvalConjuncts.  The generated signature is the same as EvalConjuncts().
 //
 // For a node with two conjunct predicates:
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index ec91276..998e19f 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -38,6 +38,7 @@ class Function;
 namespace impala {
 
 class DataSink;
+class FragmentState;
 class MemPool;
 class MemTracker;
 class ObjectPool;
@@ -72,7 +73,7 @@ class PlanNode {
   /// initialized here will be owned by state->obj_pool().
   /// If overridden in subclass, must first call superclass's Init().
   /// Should only be called after all children have been set and Init()-ed.
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state);
 
   /// Close() releases all resources that were allocated in Init().
   virtual void Close();
@@ -83,13 +84,24 @@ class PlanNode {
   /// Creates plan node tree from list of nodes contained in plan via depth-first
   /// traversal. All nodes are placed in state->obj_pool() and have Init() called on them.
   /// Returns error if 'plan' is corrupted, otherwise success.
-  static Status CreateTree(
-      RuntimeState* state, const TPlan& plan, PlanNode** root) WARN_UNUSED_RESULT;
+  static Status CreateTree(FragmentState* state, const TPlan& plan, PlanNode** root);
+
+  /// Recursively calls Codegen() on all children.
+  /// Expected to be overriden in subclass to generate LLVM IR functions and register
+  /// them with the LlvmCodeGen object. The function pointers of the compiled IR functions
+  /// will be set up when the LlvmCodeGen object is finalized. If overridden in subclass,
+  /// must also call superclass's Codegen() before or after the code generation for this
+  /// plan node. Will only be called once in the node's lifetime.
+  virtual void Codegen(FragmentState* state);
 
   virtual ~PlanNode(){}
 
   bool IsInSubplan() const { return containing_subplan_ != nullptr; }
 
+  /// Return true if codegen was disabled by the planner for this ExecNode. Does not
+  /// check to see if codegen was enabled for the enclosing fragment.
+  bool IsNodeCodegenDisabled() const { return tnode_->disable_codegen; }
+
   /// TODO: IMPALA-9216: Add accessor methods for these members instead of making
   /// them public.
   /// Reference to the thrift node that represents this PlanNode.
@@ -112,16 +124,24 @@ class PlanNode {
   /// 'this' node. Not owned.
   SubplanPlanNode* containing_subplan_ = nullptr;
 
+  /// A list of messages that will eventually be added to the exec node's runtime
+  /// profile to convey codegen related information. Populated in Codegen().
+  std::vector<std::string> codegen_status_msgs_;
+
+ protected:
+  /// Helper method to add codegen messages from status objects.
+  void AddCodegenStatus(
+      const Status& codegen_status, const std::string& extra_label = "");
+
  private:
   DISALLOW_COPY_AND_ASSIGN(PlanNode);
 
   /// Create a single exec node derived from thrift node; place exec node in 'pool'.
-  static Status CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNode** node,
-      RuntimeState* state) WARN_UNUSED_RESULT;
+  static Status CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNode** node);
 
-  static Status CreateTreeHelper(RuntimeState* state,
+  static Status CreateTreeHelper(FragmentState* state,
       const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx,
-      PlanNode** root) WARN_UNUSED_RESULT;
+      PlanNode** root);
 };
 
 class ExecNode {
@@ -139,14 +159,6 @@ class ExecNode {
   /// If overridden in subclass, must first call superclass's Prepare().
   virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
 
-  /// Recursively calls Codegen() on all children.
-  /// Expected to be overriden in subclass to generate LLVM IR functions and register
-  /// them with the LlvmCodeGen object. The function pointers of the compiled IR functions
-  /// will be set up in PlanFragmentExecutor::Open(). If overridden in subclass, must also
-  /// call superclass's Codegen() before or after the code generation for this exec node.
-  /// Will only be called once in the node's lifetime.
-  virtual void Codegen(RuntimeState* state);
-
   /// Performs any preparatory work prior to calling GetNext().
   /// If overridden in subclass, must first call superclass's Open().
   /// Open() is called after Prepare() or Reset(), i.e., possibly multiple times
@@ -321,10 +333,6 @@ class ExecNode {
   const TBackendResourceProfile& resource_profile() { return resource_profile_; }
   bool is_closed() const { return is_closed_; }
 
-  /// Return true if codegen was disabled by the planner for this ExecNode. Does not
-  /// check to see if codegen was enabled for the enclosing fragment.
-  bool IsNodeCodegenDisabled() const;
-
   /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode.
   bool IsInSubplan() const { return plan_node_.IsInSubplan(); }
 
@@ -428,9 +436,6 @@ class ExecNode {
   /// Set by SubplanNode::Prepare() before Prepare() is called on 'this' node. Not owned.
   SubplanNode* containing_subplan_;
 
-  /// If true, codegen should be disabled for this exec node.
-  const bool disable_codegen_;
-
   virtual bool IsScanNode() const { return false; }
 
   /// Executes 'debug_action_' if 'phase' matches 'debug_phase_'.
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index bed5c94..a66ed62 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -29,6 +29,7 @@
 #include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
@@ -84,14 +85,14 @@ static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
 };
 
 GroupingAggregatorConfig::GroupingAggregatorConfig(
-    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx)
+    const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx)
   : AggregatorConfig(taggregator, state, pnode, agg_idx),
     intermediate_row_desc_(intermediate_tuple_desc_, false),
     is_streaming_preagg_(taggregator.use_streaming_preaggregation),
     resource_profile_(taggregator.resource_profile){};
 
 Status GroupingAggregatorConfig::Init(
-    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) {
+    const TAggregator& taggregator, FragmentState* state, PlanNode* pnode) {
   RETURN_IF_ERROR(ScalarExpr::Create(
       taggregator.grouping_exprs, input_row_desc_, state, &grouping_exprs_));
 
@@ -133,7 +134,6 @@ GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
     const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality)
   : Aggregator(
         exec_node, pool, config, Substitute("GroupingAggregator $0", config.agg_idx_)),
-    agg_config_(config),
     hash_table_config_(*config.hash_table_config_),
     intermediate_row_desc_(config.intermediate_row_desc_),
     is_streaming_preagg_(config.is_streaming_preagg_),
@@ -195,19 +195,14 @@ Status GroupingAggregator::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
-Status GroupingAggregatorConfig::Codegen(RuntimeState* state) {
+void GroupingAggregatorConfig::Codegen(FragmentState* state) {
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != nullptr);
   TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-  return is_streaming_preagg_ ? CodegenAddBatchStreamingImpl(codegen, prefetch_mode) :
-                                CodegenAddBatchImpl(codegen, prefetch_mode);
-}
-
-void GroupingAggregator::Codegen(RuntimeState* state) {
-  // TODO: This const cast will be removed once codegen call is moved before FIS creation
-  Status codegen_status =
-      const_cast<GroupingAggregatorConfig&>(agg_config_).Codegen(state);
-  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+  Status status = is_streaming_preagg_ ?
+      CodegenAddBatchStreamingImpl(codegen, prefetch_mode) :
+      CodegenAddBatchImpl(codegen, prefetch_mode);
+  codegen_status_msg_ = FragmentState::GenerateCodegenMsg(status.ok(), status);
 }
 
 Status GroupingAggregator::Open(RuntimeState* state) {
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index b4509bb..ffef305 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -36,6 +36,7 @@ class AggFnEvaluator;
 class GroupingAggregator;
 class PlanNode;
 class LlvmCodeGen;
+class QueryState;
 class RowBatch;
 class RuntimeState;
 struct ScalarExprsResultsRowLayout;
@@ -119,11 +120,11 @@ class Tuple;
 class GroupingAggregatorConfig : public AggregatorConfig {
  public:
   GroupingAggregatorConfig(
-      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx);
+      const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx);
   Status Init(
-      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) override;
+      const TAggregator& taggregator, FragmentState* state, PlanNode* pnode) override;
   void Close() override;
-  Status Codegen(RuntimeState* state) override;
+  void Codegen(FragmentState* state) override;
   ~GroupingAggregatorConfig() override {}
 
   /// Row with the intermediate tuple as its only tuple.
@@ -193,7 +194,6 @@ class GroupingAggregator : public Aggregator {
       const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality);
 
   virtual Status Prepare(RuntimeState* state) override;
-  virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
@@ -214,10 +214,6 @@ class GroupingAggregator : public Aggregator {
  private:
   struct Partition;
 
-  /// TODO: Remove reference once codegen is performed before FIS creation.
-  /// Reference to the config object and only used to call Codegen().
-  const GroupingAggregatorConfig& agg_config_;
-
   /// Reference to the hash table config which is a part of the GroupingAggregatorConfig
   /// that was used to create this object. Its used to create an instance of the
   /// HashTableCtx in Prepare(). Not Owned.
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 20fee8e..6e038a6 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -26,6 +26,7 @@
 #include "exec/hdfs-scan-node.h"
 #include "exec/read-write-util.h"
 #include "exec/scanner-context.inline.h"
+#include "runtime/fragment-state.h"
 #include "runtime/raw-value.h"
 #include "runtime/runtime-state.h"
 #include "util/codec.h"
@@ -79,7 +80,7 @@ Status HdfsAvroScanner::Open(ScannerContext* context) {
 }
 
 Status HdfsAvroScanner::Codegen(HdfsScanPlanNode* node,
-   RuntimeState* state, llvm::Function** decode_avro_data_fn) {
+   FragmentState* state, llvm::Function** decode_avro_data_fn) {
   *decode_avro_data_fn = nullptr;
   DCHECK(state->ShouldCodegen());
   DCHECK(state->codegen() != nullptr);
@@ -1098,7 +1099,7 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
 }
 
 Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanPlanNode* node,
-    RuntimeState* state, llvm::Function** decode_avro_data_fn) {
+    FragmentState* state, llvm::Function** decode_avro_data_fn) {
   const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
   LlvmCodeGen* codegen = state->codegen();
 
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index b3bd355..40b52d5 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -96,7 +96,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// Codegen DecodeAvroData(). Stores the resulting function in 'decode_avro_data_fn' if
   /// codegen was successful or nullptr otherwise.
   static Status Codegen(
-      HdfsScanPlanNode* node, RuntimeState* state, llvm::Function** decode_avro_data_fn);
+      HdfsScanPlanNode* node, FragmentState* state, llvm::Function** decode_avro_data_fn);
 
   static const char* LLVM_CLASS_NAME;
 
@@ -208,7 +208,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted
   /// functions. Stores the resulting function in 'decode_avro_data_fn' if codegen was
   /// successful or returns an error.
-  static Status CodegenDecodeAvroData(const HdfsScanPlanNode* node, RuntimeState* state,
+  static Status CodegenDecodeAvroData(const HdfsScanPlanNode* node, FragmentState* state,
       llvm::Function** decode_avro_data_fn);
 
   /// Codegens a version of MaterializeTuple() that reads records based on the table
diff --git a/be/src/exec/hdfs-columnar-scanner.cc b/be/src/exec/hdfs-columnar-scanner.cc
index b7a33fa..c84c2d8 100644
--- a/be/src/exec/hdfs-columnar-scanner.cc
+++ b/be/src/exec/hdfs-columnar-scanner.cc
@@ -22,6 +22,7 @@
 #include "codegen/llvm-codegen.h"
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/scratch-tuple-batch.h"
+#include "runtime/fragment-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 
@@ -72,7 +73,7 @@ int HdfsColumnarScanner::TransferScratchTuples(RowBatch* dst_batch) {
   return num_rows_to_commit;
 }
 
-Status HdfsColumnarScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+Status HdfsColumnarScanner::Codegen(HdfsScanPlanNode* node, FragmentState* state,
     llvm::Function** process_scratch_batch_fn) {
   DCHECK(state->ShouldCodegen());
   *process_scratch_batch_fn = nullptr;
diff --git a/be/src/exec/hdfs-columnar-scanner.h b/be/src/exec/hdfs-columnar-scanner.h
index a5c747c..a510f4e 100644
--- a/be/src/exec/hdfs-columnar-scanner.h
+++ b/be/src/exec/hdfs-columnar-scanner.h
@@ -38,7 +38,7 @@ class HdfsColumnarScanner : public HdfsScanner {
 
   /// Codegen ProcessScratchBatch(). Stores the resulting function in
   /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
-  static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+  static Status Codegen(HdfsScanPlanNode* node, FragmentState* state,
       llvm::Function** process_scratch_batch_fn);
 
   /// Class name in LLVM IR.
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index e515ab2..5e10f6a 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -41,6 +41,7 @@
 #include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/request-context.h"
@@ -150,7 +151,7 @@ const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC =
 // Determines how many unexpected remote bytes trigger an error in the runtime state
 const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
 
-Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status HdfsScanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(ScanPlanNode::Init(tnode, state));
 
   tuple_id_ = tnode.hdfs_scan_node.tuple_id;
@@ -222,10 +223,10 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
         *min_max_row_desc, state, &min_max_conjuncts_));
   }
 
-  // TODO: Find formats to be read across all instances once codegen done per fragment.
-  const TPlanFragmentInstanceCtx& instance_ctx = state->instance_ctx();
-  auto ranges = instance_ctx.per_node_scan_ranges.find(tnode.node_id);
-  if (ranges != instance_ctx.per_node_scan_ranges.end()) {
+  const vector<const TPlanFragmentInstanceCtx*>& instance_ctxs = state->instance_ctxs();
+  for (auto ctx : instance_ctxs) {
+    auto ranges = ctx->per_node_scan_ranges.find(tnode.node_id);
+    if (ranges == ctx->per_node_scan_ranges.end()) continue;
     for (const TScanRangeParams& scan_range_param : ranges->second) {
       const THdfsFileSplit& split = scan_range_param.scan_range.hdfs_file_split;
       HdfsPartitionDescriptor* partition_desc =
@@ -233,6 +234,7 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
       scanned_file_formats_.insert(partition_desc->file_format());
     }
   }
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
@@ -406,21 +408,13 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   PrintHdfsSplitStats(per_volume_stats, &str);
   runtime_profile()->AddInfoString("Table Name", hdfs_table_->fully_qualified_name());
   runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
-  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   return Status::OK();
 }
 
-void HdfsScanNodeBase::Codegen(RuntimeState* state) {
+void HdfsScanPlanNode::Codegen(FragmentState* state) {
   DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
+  PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  const HdfsScanPlanNode& const_plan_node =
-      static_cast<const HdfsScanPlanNode&>(plan_node_);
-  HdfsScanPlanNode& hdfs_plan_node = const_cast<HdfsScanPlanNode&>(const_plan_node);
-  hdfs_plan_node.Codegen(state, state->runtime_profile());
-}
-
-void HdfsScanPlanNode::Codegen(RuntimeState* state, RuntimeProfile* profile) {
   for (THdfsFileFormat::type format: scanned_file_formats_) {
     llvm::Function* fn;
     Status status;
@@ -451,7 +445,7 @@ void HdfsScanPlanNode::Codegen(RuntimeState* state, RuntimeProfile* profile) {
       codegen->AddFunctionToJit(
           fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
     }
-    profile->AddCodegenMsg(status.ok(), status, format_name);
+    AddCodegenStatus(status, format_name);
   }
 }
 
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 66a15e9..6466778 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -101,10 +101,10 @@ struct ScanRangeMetadata {
 
 class HdfsScanPlanNode : public ScanPlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
-  void Codegen(RuntimeState* state, RuntimeProfile* profile);
+  virtual void Codegen(FragmentState* state) override;
 
   /// Returns index into materialized_slots with 'path'.  Returns SKIP_COLUMN if
   /// that path is not materialized. Only valid to call after Init().
@@ -228,7 +228,6 @@ class HdfsScanNodeBase : public ScanNode {
   ~HdfsScanNodeBase();
 
   virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
-  virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override WARN_UNUSED_RESULT;
   virtual Status Reset(
       RuntimeState* state, RowBatch* row_batch) override WARN_UNUSED_RESULT;
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 44ad4b8..aa37ffa 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -26,6 +26,7 @@
 #include "exec/text-converter.inline.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/collection-value-builder.h"
+#include "runtime/fragment-state.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/tuple-row.h"
@@ -326,7 +327,7 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
 //   ret i1 false
 // }
 Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
-    RuntimeState* state, llvm::Function** write_complete_tuple_fn) {
+    FragmentState* state, llvm::Function** write_complete_tuple_fn) {
   const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
   LlvmCodeGen* codegen = state->codegen();
   *write_complete_tuple_fn = NULL;
@@ -339,7 +340,8 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
     SlotDescriptor* slot_desc = node->materialized_slots_[i];
     RETURN_IF_ERROR(TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc, &fn,
         node->hdfs_table_->null_column_value().data(),
-        node->hdfs_table_->null_column_value().size(), true, state->strict_mode()));
+        node->hdfs_table_->null_column_value().size(), true,
+        state->query_options().strict_mode));
     if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) codegen->SetNoInline(fn);
     slot_fns.push_back(fn);
   }
@@ -489,14 +491,8 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
       // tuple against that conjunct and start a new parse_block for the next conjunct
       parse_block = llvm::BasicBlock::Create(context, "parse", fn, eval_fail_block);
       llvm::Function* conjunct_fn;
-      Status status =
-          conjuncts[conjunct_idx]->GetCodegendComputeFn(codegen, false, &conjunct_fn);
-      if (!status.ok()) {
-        stringstream ss;
-        ss << "Failed to codegen conjunct: " << status.GetDetail();
-        state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
-        return status;
-      }
+      RETURN_IF_ERROR(
+          conjuncts[conjunct_idx]->GetCodegendComputeFn(codegen, false, &conjunct_fn));
       if (node->materialized_slots_.size() + conjunct_idx
           >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
         codegen->SetNoInline(conjunct_fn);
@@ -532,7 +528,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
 }
 
 Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanPlanNode* node,
-    RuntimeState* state, llvm::Function* write_complete_tuple_fn,
+    FragmentState* state, llvm::Function* write_complete_tuple_fn,
     llvm::Function** write_aligned_tuples_fn) {
   LlvmCodeGen* codegen = state->codegen();
   *write_aligned_tuples_fn = NULL;
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index d716b8d..18cd84b 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -489,13 +489,13 @@ class HdfsScanner {
   /// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn'
   /// if codegen was successful or NULL otherwise.
   static Status CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
-      RuntimeState* state, llvm::Function** write_complete_tuple_fn);
+      FragmentState* state, llvm::Function** write_complete_tuple_fn);
 
   /// Codegen function to replace WriteAlignedTuples.  WriteAlignedTuples is cross
   /// compiled to IR.  This function loads the precompiled IR function, modifies it,
   /// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was
   /// successful or NULL otherwise.
-  static Status CodegenWriteAlignedTuples(const HdfsScanPlanNode*, RuntimeState*,
+  static Status CodegenWriteAlignedTuples(const HdfsScanPlanNode*, FragmentState*,
       llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn);
 
   /// Codegen function to replace InitTuple() removing runtime constants like the tuple
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 9520dcd..fee5ebc 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -36,6 +36,7 @@
 #include "exec/text-converter.h"
 #include "gen-cpp/ErrorCodes_types.h"
 #include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -74,7 +75,7 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
 }
 
 // Codegen for materialized parsed data into tuples.
-Status HdfsSequenceScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+Status HdfsSequenceScanner::Codegen(HdfsScanPlanNode* node, FragmentState* state,
     llvm::Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = nullptr;
   DCHECK(state->ShouldCodegen());
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 0b79b78..3dc445e 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -171,7 +171,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
 
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
   /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
-  static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+  static Status Codegen(HdfsScanPlanNode* node, FragmentState* state,
       llvm::Function** write_aligned_tuples_fn);
 
  protected:
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 9246bda..c1cde0b 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -54,7 +54,7 @@ using namespace strings;
 namespace impala {
 
 Status HdfsTableSinkConfig::Init(
-    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
   RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
   DCHECK(tsink_->__isset.table_sink);
   DCHECK(tsink_->table_sink.__isset.hdfs_table_sink);
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index ccfbd54..2ce9d5c 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -108,7 +108,7 @@ class HdfsTableSinkConfig : public DataSinkConfig {
 
  protected:
   Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
-      RuntimeState* state) override;
+      FragmentState* state) override;
 };
 
 /// The sink consumes all row batches of its child execution tree, and writes the
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index e996f0b..78355dd 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -41,6 +41,7 @@
 #include "gen-cpp/ErrorCodes_types.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
 #include "runtime/io/request-context.h"
 #include "runtime/io/request-ranges.h"
 #include "runtime/mem-pool.h"
@@ -790,7 +791,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
 // Codegen for materializing parsed data into tuples.  The function WriteCompleteTuple is
 // handcrafted using the IRBuilder for the specific tuple description.  This function
 // is then injected into the cross-compiled driving function, WriteAlignedTuples().
-Status HdfsTextScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+Status HdfsTextScanner::Codegen(HdfsScanPlanNode* node, FragmentState* state,
     llvm::Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = nullptr;
   DCHECK(state->ShouldCodegen());
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index cbfbce8..d74db9f 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -61,7 +61,7 @@ class HdfsTextScanner : public HdfsScanner {
 
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
   /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
-  static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+  static Status Codegen(HdfsScanPlanNode* node, FragmentState* state,
       llvm::Function** write_aligned_tuples_fn);
 
   /// Return true if we have builtin support for scanning text files compressed with this
diff --git a/be/src/exec/join-builder.cc b/be/src/exec/join-builder.cc
index 504c7db..880dbd1 100644
--- a/be/src/exec/join-builder.cc
+++ b/be/src/exec/join-builder.cc
@@ -25,7 +25,7 @@
 namespace impala {
 
 Status JoinBuilderConfig::Init(
-    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
   RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
   join_node_id_ = tsink.join_build_sink.dest_node_id;
   join_op_ = tsink.join_build_sink.join_op;
diff --git a/be/src/exec/join-builder.h b/be/src/exec/join-builder.h
index 65e63f0..ef7b7a8 100644
--- a/be/src/exec/join-builder.h
+++ b/be/src/exec/join-builder.h
@@ -37,7 +37,7 @@ class JoinBuilderConfig : public DataSinkConfig {
   friend class PhjBuilder;
 
   Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
-      RuntimeState* state) override;
+      FragmentState* state) override;
 
   /// The ID of the join plan node this is associated with.
   int join_node_id_;
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index efdaa0d..3b9cd11 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -37,7 +37,7 @@ DataSink* NljBuilderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
 }
 
 Status NljBuilderConfig::Init(
-    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
   RETURN_IF_ERROR(JoinBuilderConfig::Init(tsink, input_row_desc, state));
   return Status::OK();
 }
@@ -75,6 +75,7 @@ Status NljBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker)
 }
 
 Status NljBuilder::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(DataSink::Open(state));
   return Status::OK();
 }
 
diff --git a/be/src/exec/nested-loop-join-builder.h b/be/src/exec/nested-loop-join-builder.h
index b66fd23..bcaecbd 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -37,7 +37,7 @@ class NljBuilderConfig : public JoinBuilderConfig {
 
  protected:
   Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
-      RuntimeState* state) override;
+      FragmentState* state) override;
 };
 
 /// Builder for the NestedLoopJoinNode that accumulates the build-side rows for the join.
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index dc8cc60..185e241 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -37,7 +37,7 @@
 using namespace impala;
 using namespace strings;
 
-Status NestedLoopJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status NestedLoopJoinPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(BlockingJoinPlanNode::Init(tnode, state));
   DCHECK(tnode.join_node.__isset.nested_loop_join_node);
   // join_conjunct_evals_ are evaluated in the context of rows assembled from
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index 437efe2..65c5db4 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -38,7 +38,7 @@ class NestedLoopJoinPlanNode : public BlockingJoinPlanNode {
   /// Join conjuncts.
   std::vector<ScalarExpr*> join_conjuncts_;
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 
diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc
index a2c4b0a..424143e 100644
--- a/be/src/exec/non-grouping-aggregator.cc
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -24,6 +24,7 @@
 #include "exprs/agg-fn-evaluator.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -36,21 +37,21 @@
 namespace impala {
 
 NonGroupingAggregatorConfig::NonGroupingAggregatorConfig(
-    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx)
+    const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx)
   : AggregatorConfig(taggregator, state, pnode, agg_idx) {}
 
-Status NonGroupingAggregatorConfig::Codegen(RuntimeState* state) {
+void NonGroupingAggregatorConfig::Codegen(FragmentState* state) {
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != nullptr);
   TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-  return CodegenAddBatchImpl(codegen, prefetch_mode);
+  Status status = CodegenAddBatchImpl(codegen, prefetch_mode);
+  codegen_status_msg_ = FragmentState::GenerateCodegenMsg(status.ok(), status);
 }
 
 NonGroupingAggregator::NonGroupingAggregator(
     ExecNode* exec_node, ObjectPool* pool, const NonGroupingAggregatorConfig& config)
   : Aggregator(
         exec_node, pool, config, Substitute("NonGroupingAggregator $0", config.agg_idx_)),
-    agg_config(config),
     add_batch_impl_fn_(config.add_batch_impl_fn_) {}
 
 Status NonGroupingAggregator::Prepare(RuntimeState* state) {
@@ -59,13 +60,6 @@ Status NonGroupingAggregator::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
-void NonGroupingAggregator::Codegen(RuntimeState* state) {
-  // TODO: This const cast will be removed once codegen call is moved before FIS creation
-  Status codegen_status =
-      const_cast<NonGroupingAggregatorConfig&>(agg_config).Codegen(state);
-  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
-}
-
 Status NonGroupingAggregator::Open(RuntimeState* state) {
   RETURN_IF_ERROR(Aggregator::Open(state));
 
diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h
index 33b0091..95f1695 100644
--- a/be/src/exec/non-grouping-aggregator.h
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -30,6 +30,7 @@ class AggFnEvaluator;
 class AggregationPlanNode;
 class DescriptorTbl;
 class ExecNode;
+class FragmentState;
 class LlvmCodeGen;
 class NonGroupingAggregator;
 class ObjectPool;
@@ -40,9 +41,9 @@ class Tuple;
 
 class NonGroupingAggregatorConfig : public AggregatorConfig {
  public:
-  NonGroupingAggregatorConfig(const TAggregator& taggregator, RuntimeState* state,
+  NonGroupingAggregatorConfig(const TAggregator& taggregator, FragmentState* state,
       PlanNode* pnode, int agg_idx);
-  Status Codegen(RuntimeState* state) override;
+  void Codegen(FragmentState* state) override;
   ~NonGroupingAggregatorConfig() override {}
 
   typedef Status (*AddBatchImplFn)(NonGroupingAggregator*, RowBatch*);
@@ -70,7 +71,6 @@ class NonGroupingAggregator : public Aggregator {
       ExecNode* exec_node, ObjectPool* pool, const NonGroupingAggregatorConfig& config);
 
   virtual Status Prepare(RuntimeState* state) override;
-  virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override {
@@ -97,10 +97,6 @@ class NonGroupingAggregator : public Aggregator {
   virtual void DebugString(int indentation_level, std::stringstream* out) const override;
 
  private:
-  /// TODO: Remove reference once codegen is performed before FIS creation.
-  /// Reference to the config object and only used to call Codegen().
-  const NonGroupingAggregatorConfig& agg_config;
-
   /// MemPool used to allocate memory for 'singleton_output_tuple_'. The ownership of the
   /// pool's memory is transferred to the output batch on eos. The pool should not be
   /// Reset() to allow amortizing memory allocation over a series of
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index acc17f0..62cfadb 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -18,6 +18,7 @@
 #include "exec/partial-sort-node.h"
 
 #include "exec/exec-node-util.h"
+#include "runtime/fragment-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
@@ -27,7 +28,7 @@
 
 namespace impala {
 
-Status PartialSortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status PartialSortPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   DCHECK(!tnode.sort_node.__isset.offset || tnode.sort_node.offset == 0);
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   const TSortInfo& tsort_info = tnode.sort_node.sort_info;
@@ -38,6 +39,7 @@ Status PartialSortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
       *children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
   row_comparator_config_ =
       state->obj_pool()->Add(new TupleRowComparatorConfig(tsort_info, ordering_exprs_));
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
@@ -78,26 +80,16 @@ Status PartialSortNode::Prepare(RuntimeState* state) {
           runtime_profile(), state, label(), false));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
-  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   input_batch_.reset(
       new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
   return Status::OK();
 }
 
-void PartialSortNode::Codegen(RuntimeState* state) {
+void PartialSortPlanNode::Codegen(FragmentState* state) {
   DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
+  PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  const PartialSortPlanNode& partial_sort_pnode =
-      static_cast<const PartialSortPlanNode&>(plan_node_);
-  PartialSortPlanNode& non_const_pnode =
-      const_cast<PartialSortPlanNode&>(partial_sort_pnode);
-  non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void PartialSortPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
-  Status codegen_status = row_comparator_config_->Codegen(state);
-  runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
+  AddCodegenStatus(row_comparator_config_->Codegen(state));
 }
 
 Status PartialSortNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index a6a59eb..0a3fb92 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -25,10 +25,10 @@ namespace impala {
 
 class PartialSortPlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
-  void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+  virtual void Codegen(FragmentState* state) override;
 
   ~PartialSortPlanNode(){}
 
@@ -68,7 +68,6 @@ class PartialSortNode : public ExecNode {
   ~PartialSortNode();
 
   virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 1bee0e9..b719de4 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -28,6 +28,7 @@
 #include "exprs/scalar-expr.h"
 #include "runtime/buffered-tuple-stream.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/row-batch.h"
@@ -71,7 +72,7 @@ PhjBuilder* PhjBuilderConfig::CreateSink(BufferPool::ClientHandle* buffer_pool_c
         max_row_buffer_size, state));
 }
 
-Status PhjBuilderConfig::CreateConfig(RuntimeState* state, int join_node_id,
+Status PhjBuilderConfig::CreateConfig(FragmentState* state, int join_node_id,
     TJoinOp::type join_op, const RowDescriptor* build_row_desc,
     const std::vector<TEqJoinCondition>& eq_join_conjuncts,
     const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
@@ -91,7 +92,7 @@ void PhjBuilderConfig::Close() {
   DataSinkConfig::Close();
 }
 
-Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
+Status PhjBuilderConfig::InitExprsAndFilters(FragmentState* state,
     const vector<TEqJoinCondition>& eq_join_conjuncts,
     const vector<TRuntimeFilterDesc>& filter_descs) {
   for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
@@ -107,10 +108,13 @@ Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
         filter_desc.is_broadcast_join || state->query_options().num_nodes == 1);
     DCHECK(!state->query_options().disable_row_runtime_filtering ||
         filter_desc.applied_on_partition_columns);
-    // Skip over filters that are not produced by this instance of the join, i.e.
+    // Skip over filters that are not produced by the instances of the builder, i.e.
     // broadcast filters where this instance was not selected as a filter producer.
-    const vector<TRuntimeFilterSource> filters_produced =
-        state->instance_ctx().filters_produced;
+    const vector<const TPlanFragmentInstanceCtx*>& instance_ctxs = state->instance_ctxs();
+    // We can pick any instance since the filters produced should be the same for all
+    // instances.
+    const vector<TRuntimeFilterSource>& filters_produced =
+        instance_ctxs[0]->filters_produced;
     auto it = std::find_if(filters_produced.begin(), filters_produced.end(),
         [this, &filter_desc](const TRuntimeFilterSource f) {
           return f.src_node_id == join_node_id_ && f.filter_id == filter_desc.filter_id;
@@ -126,14 +130,14 @@ Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
   hash_table_config_ = state->obj_pool()->Add(new HashTableConfig(build_exprs_,
       build_exprs_, PhjBuilder::HashTableStoresNulls(join_op_, is_not_distinct_from_),
       is_not_distinct_from_));
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
-Status PhjBuilderConfig::Init(RuntimeState* state, int join_node_id,
+Status PhjBuilderConfig::Init(FragmentState* state, int join_node_id,
     TJoinOp::type join_op, const RowDescriptor* build_row_desc,
-    const std::vector<TEqJoinCondition>& eq_join_conjuncts,
-    const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
-    TDataSink* tsink) {
+    const vector<TEqJoinCondition>& eq_join_conjuncts,
+    const vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed, TDataSink* tsink) {
   tsink->__isset.join_build_sink = true;
   tsink->join_build_sink.__set_dest_node_id(join_node_id);
   tsink->join_build_sink.__set_join_op(join_op);
@@ -142,8 +146,8 @@ Status PhjBuilderConfig::Init(RuntimeState* state, int join_node_id,
   return InitExprsAndFilters(state, eq_join_conjuncts, filters);
 }
 
-Status PhjBuilderConfig::Init(
-    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+Status PhjBuilderConfig::Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+    FragmentState* state) {
   RETURN_IF_ERROR(JoinBuilderConfig::Init(tsink, input_row_desc, state));
   const TJoinBuildSink& build_sink = tsink.join_build_sink;
   hash_seed_ = build_sink.hash_seed;
@@ -263,12 +267,12 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker)
   num_hash_table_builds_skipped_ =
       ADD_COUNTER(profile(), "NumHashTableBuildsSkipped", TUnit::UNIT);
   repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime");
-  state->CheckAndAddCodegenDisabledMessage(profile());
   return Status::OK();
 }
 
 Status PhjBuilder::Open(RuntimeState* state) {
   SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(DataSink::Open(state));
   if (!buffer_pool_client_->is_registered()) {
     DCHECK(is_separate_build_) << "Client is registered by PhjNode if not separate";
     DCHECK_GE(resource_profile_->min_reservation, MinReservation().second);
@@ -1226,13 +1230,7 @@ std::string PhjBuilder::Partition::DebugString() {
   return ss.str();
 }
 
-void PhjBuilder::Codegen(RuntimeState* state) {
-  const PhjBuilderConfig& phj_config = static_cast<const PhjBuilderConfig&>(sink_config_);
-  PhjBuilderConfig& non_const_config = const_cast<PhjBuilderConfig&>(phj_config);
-  non_const_config.Codegen(state, profile());
-}
-
-void PhjBuilderConfig::Codegen(RuntimeState* state, RuntimeProfile* profile) {
+void PhjBuilderConfig::Codegen(FragmentState* state) {
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != nullptr);
   Status build_codegen_status;
@@ -1266,9 +1264,8 @@ void PhjBuilderConfig::Codegen(RuntimeState* state, RuntimeProfile* profile) {
     build_codegen_status = codegen_status;
     insert_codegen_status = codegen_status;
   }
-  profile->AddCodegenMsg(build_codegen_status.ok(), build_codegen_status, "Build Side");
-  profile->AddCodegenMsg(
-      insert_codegen_status.ok(), insert_codegen_status, "Hash Table Construction");
+  AddCodegenStatus(build_codegen_status, "Build Side");
+  AddCodegenStatus(insert_codegen_status, "Hash Table Construction");
 }
 
 string PhjBuilder::DebugString() const {
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index c009211..22a4127 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -63,15 +63,14 @@ class PhjBuilderConfig : public JoinBuilderConfig {
   /// Creates a PhjBuilderConfig for embedded use within a PartitionedHashJoinNode.
   /// Creates the object in the state's object pool. To be used only by
   /// PartitionedHashJoinPlanNode.
-  static Status CreateConfig(RuntimeState* state, int join_node_id, TJoinOp::type join_op,
-      const RowDescriptor* build_row_desc,
+  static Status CreateConfig(FragmentState* state, int join_node_id,
+      TJoinOp::type join_op, const RowDescriptor* build_row_desc,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
       const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
       PhjBuilderConfig** sink);
 
   void Close() override;
-
-  void Codegen(RuntimeState* state, RuntimeProfile* profile);
+  void Codegen(FragmentState* state) override;
 
   ~PhjBuilderConfig() override {}
 
@@ -117,13 +116,13 @@ class PhjBuilderConfig : public JoinBuilderConfig {
  protected:
   /// Initialization for separate sink.
   Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
-      RuntimeState* state) override;
+      FragmentState* state) override;
 
  private:
   /// Helper method used by CreateConfig() to initialize embedded builder.
   /// 'tsink' does not need to be initialized by the caller - all values to be used are
   /// passed in as arguments and this function fills in required fields in 'tsink'.
-  Status Init(RuntimeState* state, int join_node_id, TJoinOp::type join_op,
+  Status Init(FragmentState* state, int join_node_id, TJoinOp::type join_op,
       const RowDescriptor* build_row_desc,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
       const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
@@ -132,7 +131,7 @@ class PhjBuilderConfig : public JoinBuilderConfig {
   /// Initializes the build and filter expressions, creates a copy of the filter
   /// descriptors that will be generated by this sink and initializes the hash table
   /// config object.
-  Status InitExprsAndFilters(RuntimeState* state,
+  Status InitExprsAndFilters(FragmentState* state,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
       const std::vector<TRuntimeFilterDesc>& filters);
 
@@ -150,8 +149,8 @@ class PhjBuilderConfig : public JoinBuilderConfig {
 
   /// Codegen inserting rows into runtime filters. Identical signature to
   /// InsertRuntimeFilters(). Returns non-OK if codegen was not possible.
-  Status CodegenInsertRuntimeFilters(
-      LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, llvm::Function** fn);
+  Status CodegenInsertRuntimeFilters(LlvmCodeGen* codegen,
+      const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn);
 };
 
 /// See partitioned-hash-join-node.h for explanation of the top-level algorithm and how
@@ -272,11 +271,6 @@ class PhjBuilder : public JoinBuilder {
   virtual Status FlushFinal(RuntimeState* state) override;
   virtual void Close(RuntimeState* state) override;
 
-  /// Does all codegen for the builder (if codegen is enabled).
-  /// Updates the the builder's runtime profile with info about whether any errors
-  /// occured during codegen.
-  virtual void Codegen(RuntimeState* state) override;
-
   /////////////////////////////////////////
   // The following functions are used only by PartitionedHashJoinNode.
   /////////////////////////////////////////
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 5f53f1d..bdc5a4e 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -29,6 +29,7 @@
 #include "exprs/scalar-expr.h"
 #include "exprs/slot-ref.h"
 #include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -48,7 +49,8 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
 using namespace impala;
 using strings::Substitute;
 
-Status PartitionedHashJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status PartitionedHashJoinPlanNode::Init(
+    const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(BlockingJoinPlanNode::Init(tnode, state));
   DCHECK(tnode.__isset.join_node);
   DCHECK(tnode.join_node.__isset.hash_join_node);
@@ -83,10 +85,11 @@ Status PartitionedHashJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* s
   // Create the config always. It is only used if UseSeparateBuild() is true, but in
   // Init(), IsInSubplan() isn't available yet.
   // TODO: simplify this by ensuring that UseSeparateBuild() is accurate in Init().
-  RETURN_IF_ERROR(PhjBuilderConfig::CreateConfig(state, tnode_->node_id,
-      tnode_->join_node.join_op, &build_row_desc(), eq_join_conjuncts,
-      tnode_->runtime_filters, tnode_->join_node.hash_join_node.hash_seed,
-      &phj_builder_config));
+  RETURN_IF_ERROR(
+      PhjBuilderConfig::CreateConfig(state, tnode_->node_id, tnode_->join_node.join_op,
+          &build_row_desc(), eq_join_conjuncts, tnode_->runtime_filters,
+          tnode_->join_node.hash_join_node.hash_seed, &phj_builder_config_));
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
@@ -94,7 +97,7 @@ void PartitionedHashJoinPlanNode::Close() {
   ScalarExpr::Close(probe_exprs_);
   ScalarExpr::Close(build_exprs_);
   ScalarExpr::Close(other_join_conjuncts_);
-  if (phj_builder_config != nullptr) phj_builder_config->Close();
+  if (phj_builder_config_ != nullptr) phj_builder_config_->Close();
   PlanNode::Close();
 }
 
@@ -129,7 +132,7 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
   runtime_state_ = state;
   if (!UseSeparateBuild(state->query_options())) {
     const PhjBuilderConfig& builder_config =
-      *static_cast<const PartitionedHashJoinPlanNode&>(plan_node_).phj_builder_config;
+      *static_cast<const PartitionedHashJoinPlanNode&>(plan_node_).phj_builder_config_;
     builder_ = builder_config.CreateSink(buffer_pool_client(),
           resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size,
           state);
@@ -158,14 +161,13 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
 
   num_probe_rows_partitioned_ =
       ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
-  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   return Status::OK();
 }
 
-void PartitionedHashJoinNode::Codegen(RuntimeState* state) {
+void PartitionedHashJoinPlanNode::Codegen(FragmentState* state) {
   DCHECK(state->ShouldCodegen());
-  // Codegen the children node;
-  ExecNode::Codegen(state);
+  // Codegen the children nodes.
+  PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
 
   LlvmCodeGen* codegen = state->codegen();
@@ -173,27 +175,12 @@ void PartitionedHashJoinNode::Codegen(RuntimeState* state) {
 
   // Codegen the build side (if integrated into this join node).
   if (!UseSeparateBuild(state->query_options())) {
-    // TODO: invoke codegen on the PhjBuilderConfig obj inside
-    // PartitionedHashJoinPlanNode::Codegen() once codegen invocation is completely moved
-    // to the plan nodes.
-    DCHECK(builder_ != nullptr);
-    builder_->Codegen(state);
+    DCHECK(phj_builder_config_ != nullptr);
+    phj_builder_config_->Codegen(state);
   }
 
-  // Codegen the probe side.
-  const PartitionedHashJoinPlanNode& phj_pnode =
-      static_cast<const PartitionedHashJoinPlanNode&>(plan_node_);
-  PartitionedHashJoinPlanNode& non_const_pnode =
-      const_cast<PartitionedHashJoinPlanNode&>(phj_pnode);
-  non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void PartitionedHashJoinPlanNode::Codegen(RuntimeState* state, RuntimeProfile* profile) {
-  LlvmCodeGen* codegen = state->codegen();
-  DCHECK(codegen != nullptr);
   TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-  Status probe_codegen_status = CodegenProcessProbeBatch(codegen, prefetch_mode);
-  profile->AddCodegenMsg(probe_codegen_status.ok(), probe_codegen_status, "Probe Side");
+  AddCodegenStatus(CodegenProcessProbeBatch(codegen, prefetch_mode),  "Probe Side");
 }
 
 Status PartitionedHashJoinNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index b6bcca9..4be8868 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -40,10 +40,10 @@ class TupleRow;
 
 class PartitionedHashJoinPlanNode : public BlockingJoinPlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
-  void Codegen(RuntimeState* state, RuntimeProfile* profile);
+  virtual void Codegen(FragmentState* state) override;
 
   ~PartitionedHashJoinPlanNode(){}
 
@@ -61,7 +61,7 @@ class PartitionedHashJoinPlanNode : public BlockingJoinPlanNode {
 
   /// Data sink config object for creating a phj builder that will be eventually used by
   /// the exec node.
-  PhjBuilderConfig* phj_builder_config;
+  PhjBuilderConfig* phj_builder_config_;
 
   /// Seed used for hashing rows.
   uint32_t hash_seed_;
@@ -170,7 +170,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   virtual ~PartitionedHashJoinNode();
 
   virtual Status Prepare(RuntimeState* state) override;
-  virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 1615cad..6d9b9b2 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -27,8 +27,8 @@
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/blocking-row-batch-queue.h"
+#include "runtime/fragment-state.h"
 #include "runtime/io/disk-io-mgr.h"
-#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
@@ -98,7 +98,7 @@ PROFILE_DEFINE_HIGH_WATER_MARK_COUNTER(PeakScannerThreadConcurrency, STABLE_LOW,
 
 const string ScanNode::SCANNER_THREAD_COUNTERS_PREFIX = "ScannerThreads";
 
-Status ScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status ScanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   const TQueryOptions& query_options = state->query_options();
   for (const TRuntimeFilterDesc& filter_desc : tnode.runtime_filters) {
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 23aaa88..6978f07 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -31,7 +31,7 @@ class TScanRange;
 
 class ScanPlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 };
 
diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc
index 9e96a69..50cd3cd 100644
--- a/be/src/exec/select-node.cc
+++ b/be/src/exec/select-node.cc
@@ -22,6 +22,7 @@
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "gen-cpp/PlanNodes_types.h"
+#include "runtime/fragment-state.h"
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -51,21 +52,14 @@ Status SelectNode::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
-void SelectNode::Codegen(RuntimeState* state) {
+void SelectPlanNode::Codegen(FragmentState* state) {
   DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
+  PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  const SelectPlanNode& select_pnode = static_cast<const SelectPlanNode&>(plan_node_);
-  SelectPlanNode& non_const_pnode = const_cast<SelectPlanNode&>(select_pnode);
-  non_const_pnode.Codegen(state, runtime_profile());
+  AddCodegenStatus(CodegenCopyRows(state));
 }
 
-void SelectPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
-  Status codegen_status = CodegenCopyRows(state);
-  runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
-}
-
-Status SelectPlanNode::CodegenCopyRows(RuntimeState* state) {
+Status SelectPlanNode::CodegenCopyRows(FragmentState* state) {
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != nullptr);
   llvm::Function* copy_rows_fn =
diff --git a/be/src/exec/select-node.h b/be/src/exec/select-node.h
index 74818d9..4104318 100644
--- a/be/src/exec/select-node.h
+++ b/be/src/exec/select-node.h
@@ -34,7 +34,7 @@ class TupleRow;
 class SelectPlanNode : public PlanNode {
  public:
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
-  void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+  virtual void Codegen(FragmentState* state) override;
 
   ~SelectPlanNode(){}
 
@@ -44,7 +44,7 @@ class SelectPlanNode : public PlanNode {
 
  private:
   /// Codegen SelectNode::CopyRows().
-  Status CodegenCopyRows(RuntimeState* state);
+  Status CodegenCopyRows(FragmentState* state);
 };
 
 /// Node that evaluates conjuncts and enforces a limit but otherwise passes along
@@ -55,7 +55,6 @@ class SelectNode : public ExecNode {
   SelectNode(ObjectPool* pool, const SelectPlanNode& pnode, const DescriptorTbl& descs);
 
   virtual Status Prepare(RuntimeState* state) override;
-  virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 833010a..e44b72c 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -18,6 +18,7 @@
 #include "exec/sort-node.h"
 
 #include "exec/exec-node-util.h"
+#include "runtime/fragment-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
@@ -27,7 +28,7 @@
 
 namespace impala {
 
-Status SortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status SortPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   const TSortInfo& tsort_info = tnode.sort_node.sort_info;
   RETURN_IF_ERROR(ScalarExpr::Create(
@@ -37,6 +38,7 @@ Status SortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
       *children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
   row_comparator_config_ =
       state->obj_pool()->Add(new TupleRowComparatorConfig(tsort_info, ordering_exprs_));
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
@@ -74,22 +76,14 @@ Status SortNode::Prepare(RuntimeState* state) {
       resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), true));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
-  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   return Status::OK();
 }
 
-void SortNode::Codegen(RuntimeState* state) {
+void SortPlanNode::Codegen(FragmentState* state) {
   DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
+  PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  const SortPlanNode& sort_pnode = static_cast<const SortPlanNode&>(plan_node_);
-  SortPlanNode& non_const_pnode = const_cast<SortPlanNode&>(sort_pnode);
-  non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void SortPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
-  Status codegen_status = row_comparator_config_->Codegen(state);
-  runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
+  AddCodegenStatus(row_comparator_config_->Codegen(state));
 }
 
 Status SortNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index a5d4bee..b2d8cda 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -25,10 +25,10 @@ namespace impala {
 
 class SortPlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
-  void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+  virtual void Codegen(FragmentState* state) override;
 
   ~SortPlanNode(){}
 
@@ -59,7 +59,6 @@ class SortNode : public ExecNode {
   ~SortNode();
 
   virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
diff --git a/be/src/exec/subplan-node.cc b/be/src/exec/subplan-node.cc
index 85addfd..26b7b4c 100644
--- a/be/src/exec/subplan-node.cc
+++ b/be/src/exec/subplan-node.cc
@@ -27,7 +27,7 @@
 
 namespace impala {
 
-Status SubplanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status SubplanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   DCHECK_EQ(children_.size(), 2);
   RETURN_IF_ERROR(SetContainingSubplan(state, this, children_[1]));
@@ -35,7 +35,7 @@ Status SubplanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 }
 
 Status SubplanPlanNode::SetContainingSubplan(
-    RuntimeState* state, SubplanPlanNode* ancestor, PlanNode* node) {
+    FragmentState* state, SubplanPlanNode* ancestor, PlanNode* node) {
   node->containing_subplan_ = ancestor;
   if (node->tnode_->node_type == TPlanNodeType::SUBPLAN_NODE) {
     // Only traverse the first child and not the second one, because the Subplan
diff --git a/be/src/exec/subplan-node.h b/be/src/exec/subplan-node.h
index 6517df8..c8201e9 100644
--- a/be/src/exec/subplan-node.h
+++ b/be/src/exec/subplan-node.h
@@ -27,7 +27,7 @@ class TupleRow;
 
 class SubplanPlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 
   /// Sets 'ancestor' as the containing Subplan in all plan nodes inside the plan-node
@@ -35,7 +35,7 @@ class SubplanPlanNode : public PlanNode {
   /// setting the subplan. Doesn't traverse the second child of SubplanPlanNodes within
   /// 'node'.
   Status SetContainingSubplan(
-    RuntimeState* state, SubplanPlanNode* ancestor, PlanNode* node);
+      FragmentState* state, SubplanPlanNode* ancestor, PlanNode* node);
 
   ~SubplanPlanNode(){}
 };
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 3108040..276e08d 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -24,6 +24,7 @@
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
@@ -41,7 +42,7 @@
 using std::priority_queue;
 using namespace impala;
 
-Status TopNPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status TopNPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   const TSortInfo& tsort_info = tnode.sort_node.sort_info;
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   RETURN_IF_ERROR(ScalarExpr::Create(
@@ -53,6 +54,7 @@ Status TopNPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   row_comparator_config_ =
       state->obj_pool()->Add(new TupleRowComparatorConfig(tsort_info, ordering_exprs_));
   DCHECK_EQ(conjuncts_.size(), 0) << "TopNNode should never have predicates to evaluate.";
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
@@ -90,22 +92,15 @@ Status TopNNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_tuple_exprs_, state, pool_,
       expr_perm_pool(), expr_results_pool(), &output_tuple_expr_evals_));
   insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime");
-  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   tuple_pool_reclaim_counter_ = ADD_COUNTER(runtime_profile(), "TuplePoolReclamations",
       TUnit::UNIT);
   return Status::OK();
 }
 
-void TopNNode::Codegen(RuntimeState* state) {
+void TopNPlanNode::Codegen(FragmentState* state) {
   DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
+  PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  const TopNPlanNode& topn_pnode = static_cast<const TopNPlanNode&>(plan_node_);
-  TopNPlanNode& non_const_pnode = const_cast<TopNPlanNode&>(topn_pnode);
-  non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void TopNPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != NULL);
 
@@ -146,7 +141,7 @@ void TopNPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile)
       }
     }
   }
-  runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
+  AddCodegenStatus(codegen_status);
 }
 
 Status TopNNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index 24be9fa..57dde08 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -36,10 +36,10 @@ class Tuple;
 
 class TopNPlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
-  void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+  virtual void Codegen(FragmentState* state) override;
 
   ~TopNPlanNode(){}
 
@@ -71,7 +71,6 @@ class TopNNode : public ExecNode {
   TopNNode(ObjectPool* pool, const TopNPlanNode& pnode, const DescriptorTbl& descs);
 
   virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index a929748..88b6351 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -22,6 +22,7 @@
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "gen-cpp/PlanNodes_types.h"
+#include "runtime/fragment-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
@@ -32,7 +33,7 @@
 
 using namespace impala;
 
-Status UnionPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status UnionPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   DCHECK(tnode_->__isset.union_node);
   DCHECK_EQ(conjuncts_.size(), 0);
@@ -116,16 +117,10 @@ Status UnionNode::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
-void UnionNode::Codegen(RuntimeState* state) {
+void UnionPlanNode::Codegen(FragmentState* state){
   DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
+  PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  const UnionPlanNode& union_pnode = static_cast<const UnionPlanNode&>(plan_node_);
-  UnionPlanNode& non_const_pnode = const_cast<UnionPlanNode&>(union_pnode);
-  non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void UnionPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile){
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != nullptr);
   std::stringstream codegen_message;
@@ -161,8 +156,7 @@ void UnionPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile
     codegen->AddFunctionToJit(union_materialize_batch_fn,
         reinterpret_cast<void**>(&(codegend_union_materialize_batch_fns_.data()[i])));
   }
-  runtime_profile->AddCodegenMsg(
-      codegen_status.ok(), codegen_status, codegen_message.str());
+  AddCodegenStatus(codegen_status, codegen_message.str());
 }
 
 Status UnionNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index 04bdcf4..96aca8b 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -39,10 +39,10 @@ class UnionNode;
 
 class UnionPlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
-  void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+  virtual void Codegen(FragmentState* state) override;
 
   ~UnionPlanNode(){}
 
@@ -89,7 +89,6 @@ class UnionNode : public ExecNode {
   UnionNode(ObjectPool* pool, const UnionPlanNode& pnode, const DescriptorTbl& descs);
 
   virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index a20be34..82907da 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -22,6 +22,7 @@
 #include "exec/subplan-node.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
+#include "runtime/fragment-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
@@ -31,7 +32,7 @@ namespace impala {
 
 const CollectionValue UnnestNode::EMPTY_COLLECTION_VALUE;
 
-Status UnnestPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status UnnestPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
   DCHECK(tnode.__isset.unnest_node);
   RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   return Status::OK();
@@ -42,7 +43,7 @@ void UnnestPlanNode::Close() {
   PlanNode::Close();
 }
 
-Status UnnestPlanNode::InitCollExpr(RuntimeState* state) {
+Status UnnestPlanNode::InitCollExpr(FragmentState* state) {
   DCHECK(containing_subplan_ != nullptr)
       << "set_containing_subplan() must have been called";
   const RowDescriptor& row_desc = *containing_subplan_->children_[0]->row_descriptor_;
diff --git a/be/src/exec/unnest-node.h b/be/src/exec/unnest-node.h
index 0b8bf6d..8045fab 100644
--- a/be/src/exec/unnest-node.h
+++ b/be/src/exec/unnest-node.h
@@ -28,12 +28,12 @@ class TupleDescriptor;
 
 class UnnestPlanNode : public PlanNode {
  public:
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
   virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
   /// Initializes the expression which produces the collection to be unnested.
   /// Called by the containing subplan plan-node.
-  Status InitCollExpr(RuntimeState* state);
+  Status InitCollExpr(FragmentState* state);
 
   ~UnnestPlanNode(){}
 
diff --git a/be/src/exprs/agg-fn.cc b/be/src/exprs/agg-fn.cc
index 2c619f9..8eaacdd 100644
--- a/be/src/exprs/agg-fn.cc
+++ b/be/src/exprs/agg-fn.cc
@@ -21,6 +21,7 @@
 #include "exprs/anyval-util.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
 #include "runtime/lib-cache.h"
 
 #include "common/names.h"
@@ -60,7 +61,7 @@ AggFn::AggFn(const TExprNode& tnode, const SlotDescriptor& intermediate_slot_des
   }
 }
 
-Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) {
+Status AggFn::Init(const RowDescriptor& row_desc, FragmentState* state) {
   // Initialize all children (i.e. input exprs to this aggregate expr).
   for (ScalarExpr* input_expr : children()) {
     RETURN_IF_ERROR(input_expr->Init(row_desc, /*is_entry_point*/ false, state));
@@ -116,7 +117,7 @@ Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) {
 
 Status AggFn::Create(const TExpr& texpr, const RowDescriptor& row_desc,
     const SlotDescriptor& intermediate_slot_desc, const SlotDescriptor& output_slot_desc,
-    RuntimeState* state, AggFn** agg_fn) {
+    FragmentState* state, AggFn** agg_fn) {
   *agg_fn = nullptr;
   ObjectPool* pool = state->obj_pool();
   const TExprNode& texpr_node = texpr.nodes[0];
diff --git a/be/src/exprs/agg-fn.h b/be/src/exprs/agg-fn.h
index c97effe..d9f6ef2 100644
--- a/be/src/exprs/agg-fn.h
+++ b/be/src/exprs/agg-fn.h
@@ -31,6 +31,7 @@ namespace impala {
 
 using impala_udf::FunctionContext;
 
+class FragmentState;
 class LlvmCodeGen;
 class MemPool;
 class MemTracker;
@@ -103,8 +104,8 @@ class AggFn : public Expr {
   /// of the output value. On failure, returns error status and sets 'agg_fn' to NULL.
   static Status Create(const TExpr& texpr, const RowDescriptor& row_desc,
       const SlotDescriptor& intermediate_slot_desc,
-      const SlotDescriptor& output_slot_desc, RuntimeState* state, AggFn** agg_fn)
-      WARN_UNUSED_RESULT;
+      const SlotDescriptor& output_slot_desc, FragmentState* state,
+      AggFn** agg_fn) WARN_UNUSED_RESULT;
 
   bool is_merge() const { return is_merge_; }
   AggregationOp agg_op() const { return agg_op_; }
@@ -176,7 +177,7 @@ class AggFn : public Expr {
 
   /// Initializes the AggFn and its input expressions. May load the UDAF from LibCache
   /// if necessary.
-  virtual Status Init(const RowDescriptor& desc, RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status Init(const RowDescriptor& desc, FragmentState* state) WARN_UNUSED_RESULT;
 };
 
 }
diff --git a/be/src/exprs/expr-codegen-test.cc b/be/src/exprs/expr-codegen-test.cc
index e1c5c50..c5d95a8 100644
--- a/be/src/exprs/expr-codegen-test.cc
+++ b/be/src/exprs/expr-codegen-test.cc
@@ -18,6 +18,8 @@
 // The following is cross-compiled to native code and IR, and used in the test below
 #include "exprs/decimal-operators.h"
 #include "exprs/scalar-expr.h"
+#include "runtime/fragment-state.h"
+#include "runtime/query-state.h"
 #include "udf/udf.h"
 
 #ifdef IR_COMPILE
@@ -88,6 +90,7 @@ class ExprCodegenTest : public ::testing::Test {
  protected:
   scoped_ptr<TestEnv> test_env_;
   RuntimeState* runtime_state_;
+  FragmentState* fragment_state_;
   FunctionContext* fn_ctx_;
   FnAttr fn_type_attr_;
 
@@ -101,8 +104,8 @@ class ExprCodegenTest : public ::testing::Test {
   }
 
   Status CreateFromFile(const string& filename, scoped_ptr<LlvmCodeGen>* codegen) {
-    RETURN_IF_ERROR(LlvmCodeGen::CreateFromFile(runtime_state_,
-        runtime_state_->obj_pool(), NULL, filename, "test", codegen));
+    RETURN_IF_ERROR(LlvmCodeGen::CreateFromFile(fragment_state_,
+        fragment_state_->obj_pool(), NULL, filename, "test", codegen));
     return (*codegen)->MaterializeModule();
   }
 
@@ -113,6 +116,9 @@ class ExprCodegenTest : public ::testing::Test {
     test_env_.reset(new TestEnv());
     ASSERT_OK(test_env_->Init());
     ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_));
+    QueryState* qs = runtime_state_->query_state();
+    TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx());
+    fragment_state_ = qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx));
 
     FunctionContext::TypeDesc return_type;
     return_type.type = FunctionContext::TYPE_DECIMAL;
@@ -145,7 +151,9 @@ class ExprCodegenTest : public ::testing::Test {
   virtual void TearDown() {
     fn_ctx_->impl()->Close();
     delete fn_ctx_;
-    runtime_state_ = NULL;
+    fragment_state_->ReleaseResources();
+    fragment_state_ = nullptr;
+    runtime_state_ = nullptr;
     test_env_.reset();
   }
 
@@ -308,7 +316,7 @@ TEST_F(ExprCodegenTest, TestInlineConstFnAttrs) {
   // Create Expr
   MemTracker tracker;
   ScalarExpr* expr;
-  ASSERT_OK(ScalarExpr::Create(texpr, RowDescriptor(), runtime_state_, &expr));
+  ASSERT_OK(ScalarExpr::Create(texpr, RowDescriptor(), fragment_state_, &expr));
 
   // Get TestGetFnAttrs() IR function
   stringstream test_udf_file;
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index f8a851f..8f8399c 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -175,7 +175,7 @@ Status HiveUdfCall::InitEnv() {
 }
 
 Status HiveUdfCall::Init(
-    const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+    const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
   // Initialize children first.
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
 
diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h
index 7cf9ae6..19109b9 100644
--- a/be/src/exprs/hive-udf-call.h
+++ b/be/src/exprs/hive-udf-call.h
@@ -91,7 +91,7 @@ class HiveUdfCall : public ScalarExpr {
   HiveUdfCall(const TExprNode& node);
 
   virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
-      RuntimeState* state) override WARN_UNUSED_RESULT;
+      FragmentState* state) override WARN_UNUSED_RESULT;
   virtual Status OpenEvaluator(FunctionContext::FunctionStateScope scope,
       RuntimeState* state, ScalarExprEvaluator* eval) const override WARN_UNUSED_RESULT;
   virtual void CloseEvaluator(FunctionContext::FunctionStateScope scope,
diff --git a/be/src/exprs/is-not-empty-predicate.cc b/be/src/exprs/is-not-empty-predicate.cc
index a51ad36..7627fdd 100644
--- a/be/src/exprs/is-not-empty-predicate.cc
+++ b/be/src/exprs/is-not-empty-predicate.cc
@@ -43,7 +43,7 @@ BooleanVal IsNotEmptyPredicate::GetBooleanValInterpreted(
 }
 
 Status IsNotEmptyPredicate::Init(
-    const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+    const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
   DCHECK_EQ(children_.size(), 1);
   return Status::OK();
diff --git a/be/src/exprs/is-not-empty-predicate.h b/be/src/exprs/is-not-empty-predicate.h
index 313252c..b6a41fb 100644
--- a/be/src/exprs/is-not-empty-predicate.h
+++ b/be/src/exprs/is-not-empty-predicate.h
@@ -41,7 +41,7 @@ class IsNotEmptyPredicate : public Predicate {
   friend class ScalarExpr;
 
   virtual Status Init(
-      const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) override;
+      const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) override;
   explicit IsNotEmptyPredicate(const TExprNode& node);
 };
 
diff --git a/be/src/exprs/kudu-partition-expr.cc b/be/src/exprs/kudu-partition-expr.cc
index 3fa69ef..6b351e6 100644
--- a/be/src/exprs/kudu-partition-expr.cc
+++ b/be/src/exprs/kudu-partition-expr.cc
@@ -22,7 +22,7 @@
 #include "exec/kudu-util.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/exec-env.h"
-#include "runtime/query-state.h"
+#include "runtime/fragment-state.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
@@ -34,7 +34,7 @@ KuduPartitionExpr::KuduPartitionExpr(const TExprNode& node)
   : ScalarExpr(node), tkudu_partition_expr_(node.kudu_partition_expr) {}
 
 Status KuduPartitionExpr::Init(
-    const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+    const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
   DCHECK_EQ(tkudu_partition_expr_.referenced_columns.size(), children_.size());
 
diff --git a/be/src/exprs/kudu-partition-expr.h b/be/src/exprs/kudu-partition-expr.h
index 35722cc..9f90e53 100644
--- a/be/src/exprs/kudu-partition-expr.h
+++ b/be/src/exprs/kudu-partition-expr.h
@@ -41,7 +41,7 @@ class KuduPartitionExpr : public ScalarExpr {
   KuduPartitionExpr(const TExprNode& node);
 
   virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
-      RuntimeState* state) override WARN_UNUSED_RESULT;
+      FragmentState* state) override WARN_UNUSED_RESULT;
 
   virtual IntVal GetIntValInterpreted(
       ScalarExprEvaluator* eval, const TupleRow* row) const override;
diff --git a/be/src/exprs/scalar-expr.cc b/be/src/exprs/scalar-expr.cc
index b634f9e..ad5d4b3 100644
--- a/be/src/exprs/scalar-expr.cc
+++ b/be/src/exprs/scalar-expr.cc
@@ -46,6 +46,7 @@
 #include "exprs/udf-builtins.h"
 #include "exprs/utility-functions.h"
 #include "exprs/valid-tuple-id.h"
+#include "runtime/fragment-state.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
@@ -76,7 +77,7 @@ ScalarExpr::ScalarExpr(const TExprNode& node)
 }
 
 Status ScalarExpr::Create(const TExpr& texpr, const RowDescriptor& row_desc,
-    RuntimeState* state, ObjectPool* pool, ScalarExpr** scalar_expr) {
+    FragmentState* state, ObjectPool* pool, ScalarExpr** scalar_expr) {
   *scalar_expr = nullptr;
   ScalarExpr* root;
   RETURN_IF_ERROR(CreateNode(texpr.nodes[0], pool, &root));
@@ -98,7 +99,7 @@ Status ScalarExpr::Create(const TExpr& texpr, const RowDescriptor& row_desc,
 }
 
 Status ScalarExpr::Create(const vector<TExpr>& texprs, const RowDescriptor& row_desc,
-    RuntimeState* state, ObjectPool* pool, vector<ScalarExpr*>* exprs) {
+    FragmentState* state, ObjectPool* pool, vector<ScalarExpr*>* exprs) {
   exprs->clear();
   for (const TExpr& texpr: texprs) {
     ScalarExpr* expr;
@@ -110,12 +111,12 @@ Status ScalarExpr::Create(const vector<TExpr>& texprs, const RowDescriptor& row_
 }
 
 Status ScalarExpr::Create(const TExpr& texpr, const RowDescriptor& row_desc,
-    RuntimeState* state, ScalarExpr** scalar_expr) {
+    FragmentState* state, ScalarExpr** scalar_expr) {
   return ScalarExpr::Create(texpr, row_desc, state, state->obj_pool(), scalar_expr);
 }
 
 Status ScalarExpr::Create(const vector<TExpr>& texprs, const RowDescriptor& row_desc,
-    RuntimeState* state, vector<ScalarExpr*>* exprs) {
+    FragmentState* state, vector<ScalarExpr*>* exprs) {
   return ScalarExpr::Create(texprs, row_desc, state, state->obj_pool(), exprs);
 }
 
@@ -283,7 +284,7 @@ ScalarExprsResultsRowLayout::ScalarExprsResultsRowLayout(
 }
 
 Status ScalarExpr::Init(
-    const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+    const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
   DCHECK(type_.type != INVALID_TYPE);
   for (int i = 0; i < children_.size(); ++i) {
     RETURN_IF_ERROR(children_[i]->Init(row_desc, false, state));
@@ -318,7 +319,7 @@ string ScalarExpr::DebugString(const vector<ScalarExpr*>& exprs) {
   return out.str();
 }
 
-bool ScalarExpr::ShouldCodegen(const RuntimeState* state) const {
+bool ScalarExpr::ShouldCodegen(const FragmentState* state) const {
   // Use the interpreted path and call the builtin without codegen if any of the
   // followings is true:
   // 1. The expression does not have an associated RuntimeState, e.g. is a partition
diff --git a/be/src/exprs/scalar-expr.h b/be/src/exprs/scalar-expr.h
index fceb83d..86cb6a4 100644
--- a/be/src/exprs/scalar-expr.h
+++ b/be/src/exprs/scalar-expr.h
@@ -56,6 +56,7 @@ using impala_udf::DecimalVal;
 using impala_udf::DateVal;
 using impala_udf::CollectionVal;
 
+class FragmentState;
 struct LibCacheEntry;
 class LlvmCodeGen;
 class MemTracker;
@@ -146,22 +147,22 @@ class ScalarExpr : public Expr {
   /// tuple row descriptor of the input tuple row. On failure, 'expr' is set to NULL and
   /// the expr tree (if created) will be closed. Error status will be returned too.
   static Status Create(const TExpr& texpr, const RowDescriptor& row_desc,
-      RuntimeState* state, ObjectPool* pool, ScalarExpr** expr) WARN_UNUSED_RESULT;
+      FragmentState* state, ObjectPool* pool, ScalarExpr** expr) WARN_UNUSED_RESULT;
 
   /// Create a new ScalarExpr based on thrift Expr 'texpr'. The newly created ScalarExpr
   /// is stored in ObjectPool 'state->obj_pool()' and returned in 'expr'. 'row_desc' is
   /// the tuple row descriptor of the input tuple row. Returns error status on failure.
   static Status Create(const TExpr& texpr, const RowDescriptor& row_desc,
-      RuntimeState* state, ScalarExpr** expr) WARN_UNUSED_RESULT;
+      FragmentState* state, ScalarExpr** expr) WARN_UNUSED_RESULT;
 
   /// Convenience functions creating multiple ScalarExpr.
   static Status Create(const std::vector<TExpr>& texprs, const RowDescriptor& row_desc,
-      RuntimeState* state, ObjectPool* pool, std::vector<ScalarExpr*>* exprs)
-      WARN_UNUSED_RESULT;
+      FragmentState* state, ObjectPool* pool,
+      std::vector<ScalarExpr*>* exprs) WARN_UNUSED_RESULT;
 
   /// Convenience functions creating multiple ScalarExpr.
   static Status Create(const std::vector<TExpr>& texprs, const RowDescriptor& row_desc,
-      RuntimeState* state, std::vector<ScalarExpr*>* exprs) WARN_UNUSED_RESULT;
+      FragmentState* state, std::vector<ScalarExpr*>* exprs) WARN_UNUSED_RESULT;
 
   /// Returns true if this expression is a SlotRef. Overridden by SlotRef.
   virtual bool IsSlotRef() const { return false; }
@@ -308,7 +309,7 @@ class ScalarExpr : public Expr {
   /// point into the codegen'd code. Currently we assume all roots of ScalarExpr subtrees
   /// exprs are potential entry points.
   virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
-      RuntimeState* state) WARN_UNUSED_RESULT;
+      FragmentState* state) WARN_UNUSED_RESULT;
 
   /// Initializes 'eval' for execution. If scope if FRAGMENT_LOCAL, both
   /// fragment-local and thread-local states should be initialized. If scope is
@@ -364,7 +365,7 @@ class ScalarExpr : public Expr {
  protected:
   /// Return true if we should codegen this expression node, based on query options
   /// and the properties of this ScalarExpr node.
-  bool ShouldCodegen(const RuntimeState* state) const;
+  bool ShouldCodegen(const FragmentState* state) const;
 
   /// Return true if it is possible to evaluate this expression node without codegen.
   /// The vast majority of exprs support interpretation, so default to true. Scalars
diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc
index ca283ba..51d11a9 100644
--- a/be/src/exprs/scalar-fn-call.cc
+++ b/be/src/exprs/scalar-fn-call.cc
@@ -31,6 +31,7 @@
 #include "codegen/llvm-codegen.h"
 #include "exprs/anyval-util.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "runtime/fragment-state.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/lib-cache.h"
 #include "runtime/runtime-state.h"
@@ -71,7 +72,7 @@ Status ScalarFnCall::LoadPrepareAndCloseFn(LlvmCodeGen* codegen) {
 }
 
 Status ScalarFnCall::Init(
-    const RowDescriptor& desc, bool is_entry_point, RuntimeState* state) {
+    const RowDescriptor& desc, bool is_entry_point, FragmentState* state) {
   // Initialize children first.
   RETURN_IF_ERROR(ScalarExpr::Init(desc, is_entry_point, state));
 
diff --git a/be/src/exprs/scalar-fn-call.h b/be/src/exprs/scalar-fn-call.h
index 82f687b..4190feb 100644
--- a/be/src/exprs/scalar-fn-call.h
+++ b/be/src/exprs/scalar-fn-call.h
@@ -79,7 +79,7 @@ class ScalarFnCall : public ScalarExpr {
 
   ScalarFnCall(const TExprNode& node);
   virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
-      RuntimeState* state) override WARN_UNUSED_RESULT;
+      FragmentState* state) override WARN_UNUSED_RESULT;
   virtual Status OpenEvaluator(FunctionContext::FunctionStateScope scope,
       RuntimeState* state, ScalarExprEvaluator* eval) const override WARN_UNUSED_RESULT;
   virtual void CloseEvaluator(FunctionContext::FunctionStateScope scope,
diff --git a/be/src/exprs/slot-ref.cc b/be/src/exprs/slot-ref.cc
index b5f758f..634a989 100644
--- a/be/src/exprs/slot-ref.cc
+++ b/be/src/exprs/slot-ref.cc
@@ -26,6 +26,7 @@
 #include "gen-cpp/Exprs_types.h"
 #include "runtime/collection-value.h"
 #include "runtime/decimal-value.h"
+#include "runtime/fragment-state.h"
 #include "runtime/multi-precision.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-value.inline.h"
@@ -72,7 +73,7 @@ SlotRef::SlotRef(const ColumnType& type, int offset, const bool nullable /* = fa
     slot_id_(-1) {}
 
 Status SlotRef::Init(
-    const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+    const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
   DCHECK_EQ(children_.size(), 0);
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
   if (slot_id_ != -1) {
diff --git a/be/src/exprs/slot-ref.h b/be/src/exprs/slot-ref.h
index e9bd4e8..0449bfb 100644
--- a/be/src/exprs/slot-ref.h
+++ b/be/src/exprs/slot-ref.h
@@ -50,7 +50,7 @@ class SlotRef : public ScalarExpr {
 
   /// Exposed as public so AGG node can initialize its build expressions.
   virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
-      RuntimeState* state) override WARN_UNUSED_RESULT;
+      FragmentState* state) override WARN_UNUSED_RESULT;
   virtual std::string DebugString() const override;
   virtual Status GetCodegendComputeFnImpl(
       LlvmCodeGen* codegen, llvm::Function** fn) override WARN_UNUSED_RESULT;
diff --git a/be/src/exprs/tuple-is-null-predicate.cc b/be/src/exprs/tuple-is-null-predicate.cc
index 769ca33..4225863 100644
--- a/be/src/exprs/tuple-is-null-predicate.cc
+++ b/be/src/exprs/tuple-is-null-predicate.cc
@@ -44,7 +44,7 @@ TupleIsNullPredicate::TupleIsNullPredicate(const TExprNode& node)
         node.tuple_is_null_pred.tuple_ids.end()) {}
 
 Status TupleIsNullPredicate::Init(
-    const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+    const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
   DCHECK_EQ(0, children_.size());
   // Resolve tuple ids to tuple indexes.
diff --git a/be/src/exprs/tuple-is-null-predicate.h b/be/src/exprs/tuple-is-null-predicate.h
index 552b325..e028ca3 100644
--- a/be/src/exprs/tuple-is-null-predicate.h
+++ b/be/src/exprs/tuple-is-null-predicate.h
@@ -40,7 +40,7 @@ class TupleIsNullPredicate: public Predicate {
   TupleIsNullPredicate(const TExprNode& node);
 
   virtual Status Init(
-      const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) override;
+      const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) override;
   virtual Status GetCodegendComputeFnImpl(
       LlvmCodeGen* codegen, llvm::Function** fn) override WARN_UNUSED_RESULT;
   virtual std::string DebugString() const override;
diff --git a/be/src/exprs/valid-tuple-id.cc b/be/src/exprs/valid-tuple-id.cc
index 2884dcb..e6fe640 100644
--- a/be/src/exprs/valid-tuple-id.cc
+++ b/be/src/exprs/valid-tuple-id.cc
@@ -32,7 +32,7 @@ const char* ValidTupleIdExpr::LLVM_CLASS_NAME = "class.impala::ValidTupleIdExpr"
 ValidTupleIdExpr::ValidTupleIdExpr(const TExprNode& node) : ScalarExpr(node) {}
 
 Status ValidTupleIdExpr::Init(
-    const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+    const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
   DCHECK_EQ(0, children_.size());
   tuple_ids_.reserve(row_desc.tuple_descriptors().size());
diff --git a/be/src/exprs/valid-tuple-id.h b/be/src/exprs/valid-tuple-id.h
index 342baf8..6422bb1 100644
--- a/be/src/exprs/valid-tuple-id.h
+++ b/be/src/exprs/valid-tuple-id.h
@@ -36,7 +36,7 @@ class ValidTupleIdExpr : public ScalarExpr {
   explicit ValidTupleIdExpr(const TExprNode& node);
 
   virtual Status Init(
-      const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) override;
+      const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) override;
   virtual Status GetCodegendComputeFnImpl(
       LlvmCodeGen* codegen, llvm::Function** fn) override WARN_UNUSED_RESULT;
   virtual std::string DebugString() const override;
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index b6550e2..6990421 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -45,6 +45,7 @@ add_library(Runtime
   descriptors.cc
   dml-exec-state.cc
   exec-env.cc
+  fragment-state.cc
   fragment-instance-state.cc
   hbase-table.cc
   hbase-table-factory.cc
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 3ac64bd..ddcd84d 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -28,6 +28,8 @@
 #include "rpc/auth-provider.h"
 #include "rpc/thrift-server.h"
 #include "rpc/rpc-mgr.h"
+#include "runtime/fragment-state.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/exec-env.h"
@@ -152,6 +154,10 @@ class DataStreamTest : public testing::Test {
     ABORT_IF_ERROR(exec_env_->InitForFeTests());
     exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024);
     runtime_state_.reset(new RuntimeState(TQueryCtx(), exec_env_.get()));
+    TPlanFragmentCtx* fragment_ctx =
+        runtime_state_->obj_pool()->Add(new TPlanFragmentCtx());
+    fragment_state_ = runtime_state_->obj_pool()->Add(
+        new FragmentState(runtime_state_->query_state(), *fragment_ctx));
     mem_pool_.reset(new MemPool(&tracker_));
 
     // Register a BufferPool client for allocating buffers for row batches.
@@ -163,7 +169,7 @@ class DataStreamTest : public testing::Test {
     CreateRowDesc();
 
     SlotRef* lhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
-    ASSERT_OK(lhs_slot->Init(RowDescriptor(), true, runtime_state_.get()));
+    ASSERT_OK(lhs_slot->Init(RowDescriptor(), true, fragment_state_));
     ordering_exprs_.push_back(lhs_slot);
 
     tsort_info_.sorting_order = TSortingOrder::LEXICAL;
@@ -227,6 +233,8 @@ class DataStreamTest : public testing::Test {
       less_than_comparator->Close(runtime_state_.get());
     }
     ScalarExpr::Close(ordering_exprs_);
+    fragment_state_->ReleaseResources();
+    fragment_state_ = nullptr;
     mem_pool_->FreeAll();
     StopKrpcBackend();
     exec_env_->buffer_pool()->DeregisterClient(&buffer_pool_client_);
@@ -247,6 +255,7 @@ class DataStreamTest : public testing::Test {
   vector<TupleRowComparator*> less_than_comparators_;
   boost::scoped_ptr<ExecEnv> exec_env_;
   scoped_ptr<RuntimeState> runtime_state_;
+  FragmentState* fragment_state_;
   TUniqueId next_instance_id_;
   string stmt_;
   // The sorting expression for the single BIGINT column.
@@ -582,9 +591,13 @@ class DataStreamTest : public testing::Test {
       TPartitionType::type partition_type, SenderInfo* info, bool reset_hash_seed) {
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
-    const TDataSink& sink = GetSink(partition_type);
+    const TDataSink sink = GetSink(partition_type);
+    TPlanFragmentCtx fragment_ctx;
+    fragment_ctx.fragment.output_sink = sink;
+    fragment_ctx.destinations = dest_;
+    FragmentState fragment_state(state.query_state(), fragment_ctx);
     DataSinkConfig* data_sink = nullptr;
-    EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &state, &data_sink));
+    EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &fragment_state, &data_sink));
 
     // We create an object of the base class DataSink and cast to the appropriate sender
     // according to the 'is_thrift' option.
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index c5a1d50..94e3105 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -38,6 +38,7 @@
 #include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
@@ -65,13 +66,12 @@ static const string OPEN_TIMER_NAME = "OpenTime";
 static const string PREPARE_TIMER_NAME = "PrepareTime";
 static const string EXEC_TIMER_NAME = "ExecTime";
 
-FragmentInstanceState::FragmentInstanceState(
-    QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& instance_ctx)
+FragmentInstanceState::FragmentInstanceState(QueryState* query_state,
+    FragmentState* fragment_state, const TPlanFragmentInstanceCtx& instance_ctx)
   : query_state_(query_state),
-    fragment_ctx_(fragment_ctx),
-    instance_ctx_(instance_ctx) {
-}
+    fragment_state_(fragment_state),
+    fragment_ctx_(fragment_state->fragment_ctx()),
+    instance_ctx_(instance_ctx) {}
 
 Status FragmentInstanceState::Exec() {
   bool is_prepared = false;
@@ -174,11 +174,11 @@ Status FragmentInstanceState::Prepare() {
       bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
 
-  RETURN_IF_ERROR(
-      PlanNode::CreateTree(runtime_state_, fragment_ctx_.fragment.plan, &plan_tree_));
-  // set up plan
+  // Create the exec tree.
+  const PlanNode* plan_tree = fragment_state_->plan_tree();
+  DCHECK(plan_tree != nullptr);
   RETURN_IF_ERROR(ExecNode::CreateTree(
-      runtime_state_, *plan_tree_, query_state_->desc_tbl(), &exec_tree_));
+      runtime_state_, *plan_tree, query_state_->desc_tbl(), &exec_tree_));
   runtime_state_->set_fragment_root_id(exec_tree_->id());
   if (instance_ctx_.__isset.debug_options) {
     ExecNode::SetDebugOptions(instance_ctx_.debug_options, exec_tree_);
@@ -214,11 +214,9 @@ Status FragmentInstanceState::Prepare() {
   PrintVolumeIds();
 
   // prepare sink_
-  DCHECK(fragment_ctx_.fragment.__isset.output_sink);
-  const TDataSink& thrift_sink = fragment_ctx_.fragment.output_sink;
-  RETURN_IF_ERROR(DataSinkConfig::CreateConfig(
-      thrift_sink, plan_tree_->row_descriptor_, runtime_state_, &sink_config_));
-  sink_ = sink_config_->CreateSink(fragment_ctx_, instance_ctx_, runtime_state_);
+  const DataSinkConfig* sink_config = fragment_state_->sink_config();
+  DCHECK(sink_config != nullptr);
+  sink_ = sink_config->CreateSink(fragment_ctx_, instance_ctx_, runtime_state_);
   RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
   RuntimeProfile* sink_profile = sink_->profile();
   if (sink_profile != nullptr) profile()->AddChild(sink_profile);
@@ -332,26 +330,9 @@ Status FragmentInstanceState::Open() {
       ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
 
-  if (runtime_state_->ShouldCodegen()) {
+  if (fragment_state_->ShouldCodegen()) {
     UpdateState(StateEvent::CODEGEN_START);
-    RETURN_IF_ERROR(runtime_state_->CreateCodegen());
-    {
-      SCOPED_TIMER2(runtime_state_->codegen()->ir_generation_timer(),
-          runtime_state_->codegen()->runtime_profile()->total_time_counter());
-      SCOPED_THREAD_COUNTER_MEASUREMENT(
-          runtime_state_->codegen()->llvm_thread_counters());
-      exec_tree_->Codegen(runtime_state_);
-      sink_->Codegen(runtime_state_);
-
-      // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
-      // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
-      // the error status for now.
-      RETURN_IF_ERROR(runtime_state_->CodegenScalarExprs());
-    }
-
-    LlvmCodeGen* codegen = runtime_state_->codegen();
-    DCHECK(codegen != nullptr);
-    RETURN_IF_ERROR(codegen->FinalizeModule());
+    RETURN_IF_ERROR(fragment_state_->InvokeCodegen());
   }
 
   {
@@ -421,7 +402,6 @@ void FragmentInstanceState::Close() {
 
   // guard against partially-finished Prepare()
   if (sink_ != nullptr) sink_->Close(runtime_state_);
-  if (sink_config_ != nullptr) sink_config_->Close();
 
   // Stop updating profile counters in background.
   profile()->StopPeriodicCounters();
@@ -429,7 +409,6 @@ void FragmentInstanceState::Close() {
   // Delete row_batch_ to free resources associated with it.
   row_batch_.reset();
   if (exec_tree_ != nullptr) exec_tree_->Close(runtime_state_);
-  if (plan_tree_ != nullptr) plan_tree_->Close();
   runtime_state_->ReleaseResources();
 
   // Sanity timer checks
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 20856be..36c81d8 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -43,6 +43,7 @@ class RpcContext;
 
 namespace impala {
 
+class FragmentState;
 class TPlanFragmentCtx;
 class TPlanFragmentInstanceCtx;
 class TBloomFilter;
@@ -79,7 +80,7 @@ class JoinBuilder;
 /// - absorb RuntimeState?
 class FragmentInstanceState {
  public:
-  FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
+  FragmentInstanceState(QueryState* query_state, FragmentState* fragment_state,
       const TPlanFragmentInstanceCtx& instance_ctx);
 
   /// Main loop of fragment instance execution. Blocks until execution finishes and
@@ -156,6 +157,7 @@ class FragmentInstanceState {
 
  private:
   QueryState* query_state_;
+  FragmentState* fragment_state_;
   const TPlanFragmentCtx& fragment_ctx_;
   const TPlanFragmentInstanceCtx& instance_ctx_;
 
@@ -163,9 +165,6 @@ class FragmentInstanceState {
   /// in Prepare().
   ExecNode* exec_tree_ = nullptr; // lives in obj_pool()
   RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
-  /// Lives in obj_pool(). Not mutated after being initialized except for being closed.
-  PlanNode* plan_tree_ = nullptr;
-  DataSinkConfig* sink_config_ = nullptr;
 
   /// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
   /// There should be only one thread doing status report at the same time.
@@ -229,9 +228,6 @@ class FragmentInstanceState {
   /// should live in obj_pool(), but managed separately so we can delete it in Close()
   boost::scoped_ptr<RowBatch> row_batch_;
 
-  /// Set when Prepare() returns.
-  Promise<Status> prepared_promise_;
-
   /// Set when OpenInternal() returns.
   Promise<Status> opened_promise_;
 
diff --git a/be/src/runtime/fragment-state.cc b/be/src/runtime/fragment-state.cc
new file mode 100644
index 0000000..9c27312
--- /dev/null
+++ b/be/src/runtime/fragment-state.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/fragment-state.h"
+
+#include <gutil/strings/substitute.h>
+
+#include "codegen/llvm-codegen.h"
+#include "exec/exec-node.h"
+#include "exec/data-sink.h"
+#include "runtime/exec-env.h"
+#include "runtime/query-state.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "util/debug-util.h"
+#include "util/impalad-metrics.h"
+#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+const string FragmentState::FSTATE_THREAD_GROUP_NAME = "fragment-init";
+const string FragmentState::FSTATE_THREAD_NAME_PREFIX = "init-and-codegen";
+
+Status FragmentState::CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info,
+    QueryState* state, std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map) {
+  int fragment_ctx_idx = 0;
+  const TPlanFragmentCtx& frag_ctx = fragment_info.fragment_ctxs[fragment_ctx_idx];
+  FragmentState* fragment_state =
+      state->obj_pool()->Add(new FragmentState(state, frag_ctx));
+  fragment_map[fragment_state->fragment_idx()] = fragment_state;
+  for (const TPlanFragmentInstanceCtx& instance_ctx :
+      fragment_info.fragment_instance_ctxs) {
+    // determine corresponding TPlanFragmentCtx
+    if (fragment_state->fragment_idx() != instance_ctx.fragment_idx) {
+      ++fragment_ctx_idx;
+      DCHECK_LT(fragment_ctx_idx, fragment_info.fragment_ctxs.size());
+      const TPlanFragmentCtx& fragment_ctx =
+          fragment_info.fragment_ctxs[fragment_ctx_idx];
+      fragment_state = state->obj_pool()->Add(new FragmentState(state, fragment_ctx));
+      fragment_map[fragment_state->fragment_idx()] = fragment_state;
+      // we expect fragment and instance contexts to follow the same order
+      DCHECK_EQ(fragment_state->fragment_idx(), instance_ctx.fragment_idx);
+    }
+    fragment_state->AddInstance(&instance_ctx);
+  }
+  // Init all fragments.
+  for (auto& elem : fragment_map) {
+    RETURN_IF_ERROR(elem.second->Init());
+  }
+  return Status::OK();
+}
+
+Status FragmentState::Init() {
+  RETURN_IF_ERROR(PlanNode::CreateTree(this, fragment_ctx_.fragment.plan, &plan_tree_));
+  RETURN_IF_ERROR(DataSinkConfig::CreateConfig(fragment_ctx_.fragment.output_sink,
+      plan_tree_->row_descriptor_, this, &sink_config_));
+  return Status::OK();
+}
+
+Status FragmentState::InvokeCodegen() {
+  unique_lock<mutex> l(codegen_lock_);
+  if (!codegen_invoked_) {
+    codegen_invoked_ = true;
+    codegen_status_ = CodegenHelper();
+    if (!codegen_status_.ok()) {
+      string error_ctx = Substitute("Fragment failed during codegen, fragment index: $0",
+          fragment_ctx_.fragment.display_name);
+      codegen_status_.AddDetail(error_ctx);
+      query_state_->ErrorDuringFragmentCodegen(codegen_status_);
+    }
+  }
+  return codegen_status_;
+}
+
+Status FragmentState::CodegenHelper() {
+  DCHECK(plan_tree_ != nullptr);
+  DCHECK(sink_config_ != nullptr);
+  DCHECK(ShouldCodegen());
+  RETURN_IF_ERROR(CreateCodegen());
+  {
+    SCOPED_TIMER2(codegen()->ir_generation_timer(),
+        codegen()->runtime_profile()->total_time_counter());
+    SCOPED_THREAD_COUNTER_MEASUREMENT(codegen()->llvm_thread_counters());
+    plan_tree_->Codegen(this);
+    sink_config_->Codegen(this);
+    // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
+    // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
+    // the error status for now. Now that IMPALA-4233 is fixed, revisit this comment.
+    RETURN_IF_ERROR(CodegenScalarExprs());
+  }
+
+  LlvmCodeGen* llvm_codegen = codegen();
+  DCHECK(llvm_codegen != nullptr);
+  RETURN_IF_ERROR(llvm_codegen->FinalizeModule());
+  return Status::OK();
+}
+
+FragmentState::FragmentState(QueryState* query_state,
+    const TPlanFragmentCtx& fragment_ctx)
+  : query_state_(query_state),
+    fragment_ctx_(fragment_ctx) {
+  runtime_profile_ = RuntimeProfile::Create(query_state->obj_pool(),
+      Substitute("Fragment $0", fragment_ctx_.fragment.display_name));
+  query_state_->host_profile()->AddChild(runtime_profile_);
+}
+
+FragmentState::~FragmentState() {}
+
+void FragmentState::ReleaseResources() {
+  if (codegen_ != nullptr) codegen_->Close();
+  if (plan_tree_ != nullptr) plan_tree_->Close();
+  if (sink_config_ != nullptr) sink_config_->Close();
+}
+
+
+Status FragmentState::CreateCodegen() {
+  if (codegen_.get() != NULL) return Status::OK();
+  RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(
+      this, query_mem_tracker(), PrintId(query_id()), &codegen_));
+  codegen_->EnableOptimizations(true);
+  runtime_profile_->AddChild(codegen_->runtime_profile());
+  return Status::OK();
+}
+
+Status FragmentState::CodegenScalarExprs() {
+  for (auto& item : scalar_exprs_to_codegen_) {
+    llvm::Function* fn;
+    RETURN_IF_ERROR(item.first->GetCodegendComputeFn(codegen_.get(), item.second, &fn));
+  }
+  return Status::OK();
+}
+
+
+std::string FragmentState::GenerateCodegenMsg(
+    bool codegen_enabled, const Status& codegen_status, const std::string& extra_label) {
+  const string& err_msg = codegen_status.ok() ? "" : codegen_status.msg().msg();
+  return GenerateCodegenMsg(codegen_enabled, err_msg, extra_label);
+}
+
+std::string FragmentState::GenerateCodegenMsg(bool codegen_enabled,
+    const std::string& extra_info, const std::string& extra_label) {
+  std::stringstream str;
+  if (!extra_label.empty()) str << extra_label << " ";
+  str << (codegen_enabled ? "Codegen Enabled" : "Codegen Disabled");
+  if (!extra_info.empty()) str << ": " + extra_info;
+  return str.str();
+}
+
+}
diff --git a/be/src/runtime/fragment-state.h b/be/src/runtime/fragment-state.h
new file mode 100644
index 0000000..54cf45d
--- /dev/null
+++ b/be/src/runtime/fragment-state.h
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <boost/scoped_ptr.hpp>
+
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "runtime/query-state.h"
+
+namespace impala {
+
+class FragmentInstanceState;
+class QueryCtx;
+class RuntimeProfile;
+
+/// This encapsulates all the static state for a fragment that will be shared across its
+/// instances which includes the thrift structures representing fragments and all its
+/// instances, plan node tree and data sink config. It also contains state and methods
+/// required for creating, invoking and managing codegen. Is not thread safe unless
+/// specified.
+
+class FragmentState {
+ public:
+  /// Create a map of fragment index to its FragmentState object and only populate the
+  /// thrift references of the fragment and instance context objects from 'fragment_info'.
+  static Status CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info,
+      QueryState* state, std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map);
+  FragmentState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx);
+  ~FragmentState();
+
+  /// Called by all the fragment instance threads that execute this fragment. The first
+  /// fragment instance to call this does the actual codegen work. The rest either wait
+  /// till codegen is complete or simple return immediately if it is already completed. In
+  /// case codegen fails, it attempts to set an error status in the query state and
+  /// returns that status on every subsequent call. Is thread-safe.
+  Status InvokeCodegen();
+
+  /// Release resources held by codegen, the plan tree and data sink config.
+  void ReleaseResources();
+
+  ObjectPool* obj_pool() { return &obj_pool_; }
+  int fragment_idx() const { return fragment_ctx_.fragment.idx; }
+  const TQueryOptions& query_options() const { return query_state_->query_options(); }
+  const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; }
+  const std::vector<const TPlanFragmentInstanceCtx*>& instance_ctxs() const {
+    return instance_ctxs_;
+  }
+  const PlanNode* plan_tree() const { return plan_tree_; }
+  const DataSinkConfig* sink_config() const { return sink_config_; }
+  const TUniqueId& query_id() const { return query_state_->query_id(); }
+  const DescriptorTbl& desc_tbl() const { return query_state_->desc_tbl(); }
+  MemTracker* query_mem_tracker() const { return query_state_->query_mem_tracker(); }
+  RuntimeProfile* runtime_profile() { return runtime_profile_; }
+
+  static const std::string FSTATE_THREAD_GROUP_NAME;
+  static const std::string FSTATE_THREAD_NAME_PREFIX;
+
+  /// Methods relevant for codegen.
+
+  /// Create a codegen object accessible via codegen() if it doesn't exist already.
+  Status CreateCodegen();
+
+  /// Codegen all ScalarExpr expressions in 'scalar_exprs_to_codegen_'. If codegen fails
+  /// for any expressions, return immediately with the error status. Once IMPALA-4233 is
+  /// fixed, it's not fatal to fail codegen if the expression can be interpreted.
+  /// TODO: Now that IMPALA-4233 is fixed, revisit this comment.
+  Status CodegenScalarExprs();
+
+  /// Add ScalarExpr expression 'expr' to be codegen'd later if it's not disabled by
+  /// query option. If 'is_codegen_entry_point' is true, 'expr' will be an entry
+  /// point into codegen'd evaluation (i.e. it will have a function pointer populated).
+  /// Adding an expr here ensures that it will be codegen'd (i.e. fragment execution
+  /// will fail with an error if the expr cannot be codegen'd).
+  void AddScalarExprToCodegen(ScalarExpr* expr, bool is_codegen_entry_point) {
+    scalar_exprs_to_codegen_.push_back({expr, is_codegen_entry_point});
+  }
+
+  /// Returns true if there are ScalarExpr expressions in the fragments that we want
+  /// to codegen (because they can't be interpreted or based on options/hints).
+  /// This should only be used after the plan tree and the data sink configs have been
+  /// created, init'ed in which all expressions' Prepare() are invoked.
+  bool ScalarExprNeedsCodegen() const { return !scalar_exprs_to_codegen_.empty(); }
+
+  /// Check if codegen was disabled and if so, add a message to the runtime profile.
+  /// Call this only after expressions have been have been created.
+  void CheckAndAddCodegenDisabledMessage(std::vector<std::string>& codegen_status_msgs) {
+    if (CodegenDisabledByQueryOption()) {
+      codegen_status_msgs.emplace_back(
+          GenerateCodegenMsg(false, "disabled by query option DISABLE_CODEGEN"));
+    } else if (CodegenDisabledByHint()) {
+      codegen_status_msgs.emplace_back(
+          GenerateCodegenMsg(false, "disabled due to optimization hints"));
+    }
+  }
+
+  /// Returns true if there is a hint to disable codegen. This can be true for single node
+  /// optimization or expression evaluation request from FE to BE (see fe-support.cc).
+  /// Note that this internal flag is advisory and it may be ignored if the fragment has
+  /// any UDF which cannot be interpreted. See ScalarExpr::Prepare() for details.
+  inline bool CodegenHasDisableHint() const {
+    return query_state_->query_ctx().disable_codegen_hint;
+  }
+
+  /// Returns true iff there is a hint to disable codegen and all expressions in the
+  /// fragment can be interpreted. This should only be used after the Prepare() phase
+  /// in which all expressions' Prepare() are invoked.
+  inline bool CodegenDisabledByHint() const {
+    return CodegenHasDisableHint() && !ScalarExprNeedsCodegen();
+  }
+
+  /// Returns true if codegen is disabled by query option.
+  inline bool CodegenDisabledByQueryOption() const {
+    return query_options().disable_codegen;
+  }
+
+  /// Returns true if codegen should be enabled for this fragment. Codegen is enabled
+  /// if all the following conditions hold:
+  /// 1. it's enabled by query option
+  /// 2. it's not disabled by internal hints or there are expressions in the fragment
+  ///    which cannot be interpreted.
+  inline bool ShouldCodegen() const {
+    return !CodegenDisabledByQueryOption() && !CodegenDisabledByHint();
+  }
+
+  LlvmCodeGen* codegen() { return codegen_.get(); }
+
+  /// Utility methods for generating a messages from Status objects by adding context
+  /// relevant to codegen.
+  static std::string GenerateCodegenMsg(bool codegen_enabled,
+      const Status& codegen_status, const std::string& extra_label = "");
+  static std::string GenerateCodegenMsg(bool codegen_enabled,
+      const std::string& extra_info = "", const std::string& extra_label = "");
+
+ private:
+  ObjectPool obj_pool_;
+
+  /// Reference to the query state object that owns this.
+  QueryState* query_state_;
+
+  /// References to the thrift structs for this fragment.
+  const TPlanFragmentCtx& fragment_ctx_;
+  std::vector<const TPlanFragmentInstanceCtx*> instance_ctxs_;
+
+  /// Lives in obj_pool(). Not mutated after being initialized in InitAndCodegen() except
+  /// for being closed.
+  PlanNode* plan_tree_ = nullptr;
+  DataSinkConfig* sink_config_ = nullptr;
+
+  boost::scoped_ptr<LlvmCodeGen> codegen_;
+
+  /// Stores the result of calling InitAndCodegen() to check for any errors encountered
+  /// during that call.
+  Status codegen_status_;
+
+  /// Contains all ScalarExpr expressions which need to be codegen'd. The second element
+  /// is true if we want to generate a codegen entry point for this expr.
+  std::vector<std::pair<ScalarExpr*, bool>> scalar_exprs_to_codegen_;
+
+  RuntimeProfile* runtime_profile_ = nullptr;
+
+  /// Serializes access to InvokeCodegen().
+  /// Lock ordering: QueryState::status_lock_ must *not be obtained* prior to this.
+  std::mutex codegen_lock_;
+
+  /// Indicates whether codegen has been invoked. Used to make sure only the first
+  /// fragment instance to call InvokeCodegen() does the actual codegen work.
+  bool codegen_invoked_ = false;
+
+  /// Used by the CreateFragmentStateMap to add TPlanFragmentInstanceCtx thrift objects
+  /// for the fragment that this object represents.
+  void AddInstance(const TPlanFragmentInstanceCtx* instance_ctx) {
+    instance_ctxs_.push_back(instance_ctx);
+  }
+
+  /// Helper method used by InvokeCodegen(). Does the actual codegen work.
+  Status CodegenHelper();
+
+  /// Create the plan tree, data sink config.
+  Status Init();
+};
+}
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 4865757..5f9becc 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -38,6 +38,7 @@
 #include "rpc/rpc-mgr.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
@@ -76,7 +77,7 @@ const char* KrpcDataStreamSender::LLVM_CLASS_NAME = "class.impala::KrpcDataStrea
 const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
 
 Status KrpcDataStreamSenderConfig::Init(
-    const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
   RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
   DCHECK(tsink_->__isset.stream_sink);
   partition_type_ = tsink_->stream_sink.output_partition.type;
@@ -88,6 +89,8 @@ Status KrpcDataStreamSenderConfig::Init(
     exchange_hash_seed_ =
         KrpcDataStreamSender::EXCHANGE_HASH_SEED_CONST ^ state->query_id().hi;
   }
+  num_channels_ = state->fragment_ctx().destinations.size();
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
@@ -773,12 +776,12 @@ Status KrpcDataStreamSender::Prepare(
   for (int i = 0; i < channels_.size(); ++i) {
     RETURN_IF_ERROR(channels_[i]->Init(state));
   }
-  state->CheckAndAddCodegenDisabledMessage(profile());
   return Status::OK();
 }
 
 Status KrpcDataStreamSender::Open(RuntimeState* state) {
   SCOPED_TIMER(profile_->total_time_counter());
+  RETURN_IF_ERROR(DataSink::Open(state));
   return ScalarExprEvaluator::Open(partition_expr_evals_, state);
 }
 
@@ -921,22 +924,15 @@ string KrpcDataStreamSenderConfig::PartitionTypeName() const {
   }
 }
 
-void KrpcDataStreamSender::Codegen(RuntimeState* state) {
-  const KrpcDataStreamSenderConfig& config =
-      static_cast<const KrpcDataStreamSenderConfig&>(sink_config_);
-  KrpcDataStreamSenderConfig& non_const_config =
-      const_cast<KrpcDataStreamSenderConfig&>(config);
-  non_const_config.Codegen(state, profile());
-}
-
-void KrpcDataStreamSenderConfig::Codegen(RuntimeState* state, RuntimeProfile* profile) {
+void KrpcDataStreamSenderConfig::Codegen(FragmentState* state) {
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != nullptr);
   const string sender_name = PartitionTypeName() + " Sender";
   if (partition_type_ != TPartitionType::HASH_PARTITIONED) {
     const string& msg = Substitute("not $0",
         partition_type_ == TPartitionType::KUDU ? "supported" : "needed");
-    profile->AddCodegenMsg(false, msg, sender_name);
+    codegen_status_msgs_.emplace_back(
+        FragmentState::GenerateCodegenMsg(false, msg, sender_name));
     return;
   }
 
@@ -949,9 +945,8 @@ void KrpcDataStreamSenderConfig::Codegen(RuntimeState* state, RuntimeProfile* pr
 
     int num_replaced;
     // Replace GetNumChannels() with a constant.
-    int num_channels =  state->fragment_ctx().destinations.size();
     num_replaced = codegen->ReplaceCallSitesWithValue(hash_and_add_rows_fn,
-        codegen->GetI32Constant(num_channels), "GetNumChannels");
+        codegen->GetI32Constant(num_channels_), "GetNumChannels");
     DCHECK_EQ(num_replaced, 1);
 
     // Replace HashRow() with the handcrafted IR function.
@@ -968,7 +963,7 @@ void KrpcDataStreamSenderConfig::Codegen(RuntimeState* state, RuntimeProfile* pr
           reinterpret_cast<void**>(&hash_and_add_rows_fn_));
     }
   }
-  profile->AddCodegenMsg(codegen_status.ok(), codegen_status, sender_name);
+  AddCodegenStatus(codegen_status, sender_name);
 }
 
 Status KrpcDataStreamSender::AddRowToChannel(const int channel_id, TupleRow* row) {
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index e74e82f..a7e55a1 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -47,7 +47,10 @@ class KrpcDataStreamSenderConfig : public DataSinkConfig {
       RuntimeState* state) const override;
   void Close() override;
 
-  void Codegen(RuntimeState* state, RuntimeProfile* profile);
+  /// Codegen KrpcDataStreamSender::HashAndAddRows() if partitioning type is
+  /// HASH_PARTITIONED. Replaces KrpcDataStreamSender::HashRow() and
+  /// KrpcDataStreamSender::GetNumChannels() based on runtime information.
+  void Codegen(FragmentState* state) override;
 
   /// The type of partitioning to perform.
   TPartitionType::type partition_type_ = TPartitionType::UNPARTITIONED;
@@ -56,6 +59,9 @@ class KrpcDataStreamSenderConfig : public DataSinkConfig {
   /// per-row partition values for shuffling exchange;
   std::vector<ScalarExpr*> partition_exprs_;
 
+  /// The number of channels that this node will create.
+  int  num_channels_;
+
   /// Hash seed used for exchanges. Query id will be used to seed the hash function.
   uint64_t exchange_hash_seed_;
 
@@ -68,7 +74,7 @@ class KrpcDataStreamSenderConfig : public DataSinkConfig {
 
  protected:
   Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
-      RuntimeState* state) override;
+      FragmentState* state) override;
 
  private:
   /// Codegen the KrpcDataStreamSender::HashRow() function and returns the codegen'd
@@ -111,10 +117,6 @@ class KrpcDataStreamSender : public DataSink {
   /// the stat counters. Return error status if any channels failed to initialize.
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
 
-  /// Codegen HashAndAddRows() if partitioning type is HASH_PARTITIONED.
-  /// Replaces HashRow() and GetNumChannels() based on runtime information.
-  virtual void Codegen(RuntimeState* state) override;
-
   /// Initializes the evaluator of the partitioning expressions. Return error status
   /// if initialization failed.
   virtual Status Open(RuntimeState* state) override;
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index a07e737..a154d59 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -33,6 +33,7 @@
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/initial-reservations.h"
 #include "runtime/mem-tracker.h"
@@ -116,6 +117,10 @@ void QueryState::ReleaseBackendResources() {
   if (initial_reservations_ != nullptr) initial_reservations_->ReleaseResources();
   if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
   if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources();
+  // Release any memory associated with codegen.
+  for (auto& elem : fragment_state_map_) {
+    elem.second->ReleaseResources();
+  }
   // Mark the query as finished on the query MemTracker so that admission control will
   // not consider the whole query memory limit to be "reserved".
   query_mem_tracker_->set_query_exec_finished();
@@ -258,6 +263,26 @@ Status QueryState::InitBufferPoolState() {
   return Status::OK();
 }
 
+// Verifies the filters produced by all instances on the same backend are the same.
+bool VerifyFiltersProduced(const vector<TPlanFragmentInstanceCtx>& instance_ctxs) {
+  int fragment_idx = -1;
+  std::unordered_set<int> first_set;
+  for (const TPlanFragmentInstanceCtx& instance_ctx : instance_ctxs) {
+    bool first_instance_of_fragment =
+        fragment_idx == -1 || fragment_idx != instance_ctx.fragment_idx;
+    if (first_instance_of_fragment) {
+      fragment_idx = instance_ctx.fragment_idx;
+      first_set.clear();
+      for (auto f : instance_ctx.filters_produced) first_set.insert(f.filter_id);
+    }
+    if (first_set.size() != instance_ctx.filters_produced.size()) return false;
+    for (auto f : instance_ctx.filters_produced) {
+      if (first_set.find(f.filter_id) == first_set.end()) return false;
+    }
+  }
+  return true;
+}
+
 Status QueryState::InitFilterBank() {
   int64_t runtime_filters_reservation_bytes = 0;
   int fragment_ctx_idx = -1;
@@ -274,6 +299,8 @@ Status QueryState::InitFilterBank() {
       }
     }
   }
+  DCHECK(VerifyFiltersProduced(instance_ctxs))
+      << "Filters produced by all instances on the same backend should be the same";
   for (const TPlanFragmentInstanceCtx& instance_ctx : instance_ctxs) {
     bool first_instance_of_fragment = fragment_ctx_idx == -1
         || fragment_ctxs[fragment_ctx_idx].fragment.idx != instance_ctx.fragment_idx;
@@ -516,6 +543,14 @@ int64_t QueryState::GetReportWaitTimeMs() const {
   return report_interval * (num_failed_reports_ + 1);
 }
 
+void QueryState::ErrorDuringFragmentCodegen(const Status& status) {
+  unique_lock<SpinLock> l(status_lock_);
+  if (!HasErrorStatus()) {
+    overall_status_ = status;
+    failed_finstance_id_ = TUniqueId();
+  }
+}
+
 void QueryState::ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id) {
   {
     unique_lock<SpinLock> l(status_lock_);
@@ -561,9 +596,8 @@ bool QueryState::StartFInstances() {
   DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
 
   DCHECK_GT(fragment_info_.fragment_ctxs.size(), 0);
-  TPlanFragmentCtx* fragment_ctx = &fragment_info_.fragment_ctxs[0];
+  vector<unique_ptr<Thread>> codegen_threads;
   int num_unstarted_instances = fragment_info_.fragment_instance_ctxs.size();
-  int fragment_ctx_idx = 0;
 
   // set up desc tbl
   DCHECK(query_ctx().__isset.desc_tbl_serialized);
@@ -573,49 +607,49 @@ bool QueryState::StartFInstances() {
   VLOG(2) << "descriptor table for query=" << PrintId(query_id())
           << "\n" << desc_tbl_->DebugString();
 
+  start_finstances_status =
+      FragmentState::CreateFragmentStateMap(fragment_info_, this, fragment_state_map_);
+  if (UNLIKELY(!start_finstances_status.ok())) goto error;
+
   fragment_events_start_time_ = MonotonicStopWatch::Now();
-  for (const TPlanFragmentInstanceCtx& instance_ctx :
-      fragment_info_.fragment_instance_ctxs) {
-    // determine corresponding TPlanFragmentCtx
-    if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {
-      ++fragment_ctx_idx;
-      DCHECK_LT(fragment_ctx_idx, fragment_info_.fragment_ctxs.size());
-      fragment_ctx = &fragment_info_.fragment_ctxs[fragment_ctx_idx];
-      // we expect fragment and instance contexts to follow the same order
-      DCHECK_EQ(fragment_ctx->fragment.idx, instance_ctx.fragment_idx);
-    }
-    FragmentInstanceState* fis = obj_pool_.Add(
-        new FragmentInstanceState(this, *fragment_ctx, instance_ctx));
-
-    // start new thread to execute instance
-    refcnt_.Add(1); // decremented in ExecFInstance()
-    AcquireBackendResourceRefcount(); // decremented in ExecFInstance()
-
-    // Add the fragment instance ID to the 'fis_map_'. Has to happen before the thread is
-    // spawned or we may race with users of 'fis_map_'.
-    fis_map_.emplace(fis->instance_id(), fis);
-
-    string thread_name = Substitute("$0 (finst:$1)",
-        FragmentInstanceState::FINST_THREAD_NAME_PREFIX,
-        PrintId(instance_ctx.fragment_instance_id));
-    unique_ptr<Thread> t;
-
-    // Inject thread creation failures through debug actions if enabled.
-    Status debug_action_status = DebugAction(query_options(), "FIS_FAIL_THREAD_CREATION");
-    start_finstances_status = !debug_action_status.ok() ? debug_action_status :
-        Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
-            [this, fis]() { this->ExecFInstance(fis); }, &t, true);
-    if (!start_finstances_status.ok()) {
-      fis_map_.erase(fis->instance_id());
-      // Undo refcnt increments done immediately prior to Thread::Create(). The
-      // reference counts were both greater than zero before the increments, so
-      // neither of these decrements will free any structures.
-      ReleaseBackendResourceRefcount();
-      ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
-      goto error;
+  for (auto& fragment : fragment_state_map_) {
+    FragmentState* fragment_state = fragment.second;
+    for (const TPlanFragmentInstanceCtx* instance_ctx : fragment_state->instance_ctxs()) {
+      FragmentInstanceState* fis =
+          obj_pool_.Add(new FragmentInstanceState(this, fragment_state, *instance_ctx));
+
+      // start new thread to execute instance
+      refcnt_.Add(1); // decremented in ExecFInstance()
+      AcquireBackendResourceRefcount(); // decremented in ExecFInstance()
+
+      // Add the fragment instance ID to the 'fis_map_'. Has to happen before the thread
+      // is spawned or we may race with users of 'fis_map_'.
+      fis_map_.emplace(fis->instance_id(), fis);
+
+      string thread_name =
+          Substitute("$0 (finst:$1)", FragmentInstanceState::FINST_THREAD_NAME_PREFIX,
+              PrintId(instance_ctx->fragment_instance_id));
+      unique_ptr<Thread> t;
+
+      // Inject thread creation failures through debug actions if enabled.
+      Status debug_action_status =
+          DebugAction(query_options(), "FIS_FAIL_THREAD_CREATION");
+      start_finstances_status = !debug_action_status.ok() ?
+          debug_action_status :
+          Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
+              [this, fis]() { this->ExecFInstance(fis); }, &t, true);
+      if (!start_finstances_status.ok()) {
+        fis_map_.erase(fis->instance_id());
+        // Undo refcnt increments done immediately prior to Thread::Create(). The
+        // reference counts were both greater than zero before the increments, so
+        // neither of these decrements will free any structures.
+        ReleaseBackendResourceRefcount();
+        ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
+        goto error;
+      }
+      t->Detach();
+      --num_unstarted_instances;
     }
-    t->Detach();
-    --num_unstarted_instances;
   }
   return true;
 
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index eb6938c..8654d44 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -45,15 +45,20 @@ class RpcContext;
 namespace impala {
 
 class ControlServiceProxy;
+class DataSinkConfig;
 class DescriptorTbl;
+class FragmentState;
 class FragmentInstanceState;
 class InitialReservations;
+class LlvmCodeGen;
 class MemTracker;
+class PlanNode;
 class PublishFilterParamsPB;
 class ReservationTracker;
 class RuntimeFilterBank;
 class RuntimeProfile;
 class RuntimeState;
+class ScalarExpr;
 class ScannerMemLimiter;
 class TmpFileGroup;
 class TRuntimeProfileForest;
@@ -240,6 +245,11 @@ class QueryState {
   /// Called by a FragmentInstanceState thread to notify that it's done executing.
   void DoneExecuting() { discard_result(instances_finished_barrier_->Notify()); }
 
+  /// Called to notify that an error was encountered during codegen. This is called by the
+  /// first fragment instance thread that invoked its corresponding FragmentState's
+  /// Codegen() and encountered the error.
+  void ErrorDuringFragmentCodegen(const Status& status);
+
   /// Called by a fragment instance thread to notify that it hit an error during Prepare()
   /// Updates the query status and the failed instance ID if it's not set already.
   /// Also notifies anyone waiting on WaitForPrepare() if this is called by the last
@@ -370,11 +380,15 @@ class QueryState {
   /// and so is 'failed_instance_id_' if an error is hit.
   std::unique_ptr<CountingBarrier> instances_finished_barrier_;
 
-  /// map from instance id to its state (owned by obj_pool_), populated in
+  /// Map from instance id to its state (owned by obj_pool_), populated in
   /// StartFInstances(); Not valid to read from until 'instances_prepared_barrier_'
   /// is set (i.e. readers should always call WaitForPrepare()).
   std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;
 
+  /// Map from fragment index to its fragment state (owned by obj_pool_), populated in
+  /// StartFInstances();
+  std::unordered_map<TFragmentIdx, FragmentState*> fragment_state_map_;
+
   ObjectPool obj_pool_;
   AtomicInt32 refcnt_;
 
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 8da7259..4cb68e3 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -173,24 +173,6 @@ void RuntimeState::Init() {
   }
 }
 
-Status RuntimeState::CreateCodegen() {
-  if (codegen_.get() != NULL) return Status::OK();
-  // TODO: add the fragment ID to the codegen ID as well
-  RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(this,
-      instance_mem_tracker_, PrintId(fragment_instance_id()), &codegen_));
-  codegen_->EnableOptimizations(true);
-  profile_->AddChild(codegen_->runtime_profile());
-  return Status::OK();
-}
-
-Status RuntimeState::CodegenScalarExprs() {
-  for (auto& item : scalar_exprs_to_codegen_) {
-    llvm::Function* fn;
-    RETURN_IF_ERROR(item.first->GetCodegendComputeFn(codegen_.get(), item.second, &fn));
-  }
-  return Status::OK();
-}
-
 Status RuntimeState::StartSpilling(MemTracker* mem_tracker) {
   return query_state_->StartSpilling(this, mem_tracker);
 }
@@ -322,9 +304,6 @@ void RuntimeState::ReleaseResources() {
   if (resource_pool_ != nullptr) {
     ExecEnv::GetInstance()->thread_mgr()->DestroyPool(move(resource_pool_));
   }
-  // Release any memory associated with codegen.
-  if (codegen_ != nullptr) codegen_->Close();
-
   // Release the reservation, which should be unused at the point.
   if (instance_buffer_reservation_ != nullptr) instance_buffer_reservation_->Close();
 
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 2735d50..102db23 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -143,64 +143,8 @@ class RuntimeState {
   /// Returns runtime state profile
   RuntimeProfile* runtime_profile() { return profile_; }
 
-  /// Returns the LlvmCodeGen object for this fragment instance.
-  LlvmCodeGen* codegen() { return codegen_.get(); }
-
   const std::string& GetEffectiveUser() const;
 
-  /// Add ScalarExpr expression 'expr' to be codegen'd later if it's not disabled by
-  /// query option. If 'is_codegen_entry_point' is true, 'expr' will be an entry
-  /// point into codegen'd evaluation (i.e. it will have a function pointer populated).
-  /// Adding an expr here ensures that it will be codegen'd (i.e. fragment execution
-  /// will fail with an error if the expr cannot be codegen'd).
-  void AddScalarExprToCodegen(ScalarExpr* expr, bool is_codegen_entry_point) {
-    scalar_exprs_to_codegen_.push_back({expr, is_codegen_entry_point});
-  }
-
-  /// Returns true if there are ScalarExpr expressions in the fragments that we want
-  /// to codegen (because they can't be interpreted or based on options/hints).
-  /// This should only be used after the Prepare() phase in which all expressions'
-  /// Prepare() are invoked.
-  bool ScalarExprNeedsCodegen() const { return !scalar_exprs_to_codegen_.empty(); }
-
-  /// Check if codegen was disabled and if so, add a message to the runtime profile.
-  void CheckAndAddCodegenDisabledMessage(RuntimeProfile* profile) {
-    if (CodegenDisabledByQueryOption()) {
-      profile->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
-    } else if (CodegenDisabledByHint()) {
-      profile->AddCodegenMsg(false, "disabled due to optimization hints");
-    }
-  }
-
-  /// Returns true if there is a hint to disable codegen. This can be true for single node
-  /// optimization or expression evaluation request from FE to BE (see fe-support.cc).
-  /// Note that this internal flag is advisory and it may be ignored if the fragment has
-  /// any UDF which cannot be interpreted. See ScalarExpr::Prepare() for details.
-  inline bool CodegenHasDisableHint() const {
-    return query_ctx().disable_codegen_hint;
-  }
-
-  /// Returns true iff there is a hint to disable codegen and all expressions in the
-  /// fragment can be interpreted. This should only be used after the Prepare() phase
-  /// in which all expressions' Prepare() are invoked.
-  inline bool CodegenDisabledByHint() const {
-    return CodegenHasDisableHint() && !ScalarExprNeedsCodegen();
-  }
-
-  /// Returns true if codegen is disabled by query option.
-  inline bool CodegenDisabledByQueryOption() const {
-    return query_options().disable_codegen;
-  }
-
-  /// Returns true if codegen should be enabled for this fragment. Codegen is enabled
-  /// if all the following conditions hold:
-  /// 1. it's enabled by query option
-  /// 2. it's not disabled by internal hints or there are expressions in the fragment
-  ///    which cannot be interpreted.
-  inline bool ShouldCodegen() const {
-    return !CodegenDisabledByQueryOption() && !CodegenDisabledByHint();
-  }
-
   inline Status GetQueryStatus() {
     // Do a racy check for query_status_ to avoid unnecessary spinlock acquisition.
     if (UNLIKELY(!query_status_.ok())) {
@@ -309,15 +253,6 @@ class RuntimeState {
   /// called after ReleaseResources().
   Status CheckQueryState();
 
-  /// Create a codegen object accessible via codegen() if it doesn't exist already.
-  Status CreateCodegen();
-
-  /// Codegen all ScalarExpr expressions in 'scalar_exprs_to_codegen_'. If codegen fails
-  /// for any expressions, return immediately with the error status. Once IMPALA-4233 is
-  /// fixed, it's not fatal to fail codegen if the expression can be interpreted.
-  /// TODO: Fix IMPALA-4233
-  Status CodegenScalarExprs();
-
   /// Helper to call QueryState::StartSpilling().
   Status StartSpilling(MemTracker* mem_tracker);
 
@@ -387,12 +322,6 @@ class RuntimeState {
   /// instead.
   const Timezone* time_zone_for_unix_time_conversions_;
 
-  boost::scoped_ptr<LlvmCodeGen> codegen_;
-
-  /// Contains all ScalarExpr expressions which need to be codegen'd. The second element
-  /// is true if we want to generate a codegen entry point for this expr.
-  std::vector<std::pair<ScalarExpr*, bool>> scalar_exprs_to_codegen_;
-
   /// Thread resource management object for this fragment's execution.  The runtime
   /// state is responsible for returning this pool to the thread mgr.
   std::unique_ptr<ThreadResourcePool> resource_pool_;
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 27747d9..f6ef970 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -23,6 +23,7 @@
 #include "gutil/strings/substitute.h"
 #include "rpc/rpc-mgr.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/fragment-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/query-state.h"
@@ -166,8 +167,10 @@ Status TestEnv::CreateQueryState(
   fragment_info.__set_fragment_instance_ctxs(
       vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
   RETURN_IF_ERROR(qs->Init(&rpc_params, fragment_info));
+  FragmentState* frag_state =
+      qs->obj_pool()->Add(new FragmentState(qs, qs->fragment_info_.fragment_ctxs[0]));
   FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(qs,
-      qs->fragment_info_.fragment_ctxs[0], qs->fragment_info_.fragment_instance_ctxs[0]));
+      frag_state, qs->fragment_info_.fragment_instance_ctxs[0]));
   RuntimeState* rs = qs->obj_pool()->Add(
       new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
   runtime_states_.push_back(rs);
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 830169f..c7ba623 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -37,6 +37,7 @@
 #include "runtime/client-cache.h"
 #include "runtime/decimal-value.inline.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-pool.h"
@@ -200,9 +201,13 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
   query_ctx.request_pool = "fe-eval-exprs";
 
   RuntimeState state(query_ctx, ExecEnv::GetInstance());
+  TPlanFragmentCtx fragment_ctx;
+  FragmentState fragment_state(state.query_state(), fragment_ctx);
   // Make sure to close the runtime state no matter how this scope is exited.
-  const auto close_runtime_state =
-      MakeScopeExitTrigger([&state]() { state.ReleaseResources(); });
+  const auto close_runtime_state = MakeScopeExitTrigger([&state, &fragment_state]() {
+    fragment_state.ReleaseResources();
+    state.ReleaseResources();
+  });
 
   THROW_IF_ERROR_RET(
       jni_frame.push(env), env, JniUtil::internal_exc_class(), result_bytes);
@@ -214,7 +219,7 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
   vector<ScalarExprEvaluator*> evals;
   for (const TExpr& texpr : texprs) {
     ScalarExpr* expr;
-    status = ScalarExpr::Create(texpr, RowDescriptor(), &state, &expr);
+    status = ScalarExpr::Create(texpr, RowDescriptor(), &fragment_state, &expr);
     if (!status.ok()) goto error;
     exprs.push_back(expr);
     ScalarExprEvaluator* eval;
@@ -225,12 +230,12 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
   }
 
   // UDFs which cannot be interpreted need to be handled by codegen.
-  if (state.ScalarExprNeedsCodegen()) {
-    status = state.CreateCodegen();
+  if (fragment_state.ScalarExprNeedsCodegen()) {
+    status = fragment_state.CreateCodegen();
     if (!status.ok()) goto error;
-    LlvmCodeGen* codegen = state.codegen();
+    LlvmCodeGen* codegen = fragment_state.codegen();
     DCHECK(codegen != NULL);
-    status = state.CodegenScalarExprs();
+    status = fragment_state.CodegenScalarExprs();
     if (!status.ok()) goto error;
     codegen->EnableOptimizations(false);
     status = codegen->FinalizeModule();
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index d970d91..0c820a1 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -170,10 +170,10 @@ class FunctionContextImpl {
   int GetConstFnAttr(ConstFnAttr t, int i = -1);
 
   /// Return the function attribute 't' defined in ConstFnAttr above.
-  static int GetConstFnAttr(const RuntimeState* state,
+  static int GetConstFnAttr(bool uses_decimal_v2,
       const impala_udf::FunctionContext::TypeDesc& return_type,
-      const std::vector<impala_udf::FunctionContext::TypeDesc>& arg_types,
-      ConstFnAttr t, int i = -1);
+      const std::vector<impala_udf::FunctionContext::TypeDesc>& arg_types, ConstFnAttr t,
+      int i = -1);
 
   /// UDFs may manipulate DecimalVal arguments via SIMD instructions such as 'movaps'
   /// that require 16-byte memory alignment.
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 0688805..7865a90 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -558,13 +558,12 @@ static int GetTypeByteSize(const FunctionContext::TypeDesc& type) {
 }
 
 int FunctionContextImpl::GetConstFnAttr(FunctionContextImpl::ConstFnAttr t, int i) {
-  return GetConstFnAttr(state_, return_type_, arg_types_, t, i);
+  return GetConstFnAttr(state_->decimal_v2(), return_type_, arg_types_, t, i);
 }
 
-int FunctionContextImpl::GetConstFnAttr(const RuntimeState* state,
+int FunctionContextImpl::GetConstFnAttr(bool uses_decimal_v2,
     const FunctionContext::TypeDesc& return_type,
-    const vector<FunctionContext::TypeDesc>& arg_types,
-    ConstFnAttr t, int i) {
+    const vector<FunctionContext::TypeDesc>& arg_types, ConstFnAttr t, int i) {
   switch (t) {
     case RETURN_TYPE_SIZE:
       assert(i == -1);
@@ -592,7 +591,7 @@ int FunctionContextImpl::GetConstFnAttr(const RuntimeState* state,
       assert(arg_types[i].type == FunctionContext::TYPE_DECIMAL);
       return arg_types[i].scale;
     case DECIMAL_V2:
-      return state->decimal_v2();
+      return uses_decimal_v2;
     default:
       assert(false);
       return -1;
diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc
index 7c46b59..d960bbe 100644
--- a/be/src/util/tuple-row-compare.cc
+++ b/be/src/util/tuple-row-compare.cc
@@ -23,6 +23,7 @@
 #include "codegen/llvm-codegen.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "runtime/fragment-state.h"
 #include "runtime/runtime-state.h"
 #include "runtime/multi-precision.h"
 #include "util/runtime-profile-counters.h"
@@ -64,7 +65,7 @@ void TupleRowComparator::Close(RuntimeState* state) {
   ScalarExprEvaluator::Close(ordering_expr_evals_lhs_, state);
 }
 
-Status TupleRowComparatorConfig::Codegen(RuntimeState* state) {
+Status TupleRowComparatorConfig::Codegen(FragmentState* state) {
   if (sorting_order_ == TSortingOrder::ZORDER) {
     return Status("Codegen not yet implemented for sorting order: ZORDER");
   }
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 59eb5f8..0083527 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -29,6 +29,7 @@
 
 namespace impala {
 
+class FragmentState;
 class RuntimeState;
 class ScalarExprEvaluator;
 
@@ -68,7 +69,7 @@ class TupleRowComparatorConfig {
       const TSortInfo& tsort_info, const std::vector<ScalarExpr*>& ordering_exprs);
 
   /// Codegens a Compare() function for this comparator that is used in Compare().
-  Status Codegen(RuntimeState* state);
+  Status Codegen(FragmentState* state);
 
   /// Indicates the sorting ordering used. Specified using the SORT BY clause.
   TSortingOrder::type sorting_order_;