You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/04/01 23:16:06 UTC
[impala] branch master updated: IMPALA-4080 [part 7]: Codegen once
per fragment
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new f2837e9 IMPALA-4080 [part 7]: Codegen once per fragment
f2837e9 is described below
commit f2837e97b25be37221ab475b0fd4f66320a4f66b
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Wed Mar 4 10:41:01 2020 -0800
IMPALA-4080 [part 7]: Codegen once per fragment
This is the final patch:
- Removes all temporary changes that called codegen on plan nodes and
data sink through the exec node.
- Introduces FragmentState class that basically contains the plan tree,
data sink config and the codegen state for every fragment.
- Every FragmentState object has its own runtime profile that helps to
map out every codegen object's profile to its fragment id.
- Plan nodes are generated before fragment instance threads are spawned.
- The first fragment instance thread to call codegen on its parent
parent fragment does the actual codegen work and the rest of the
instance threads wait till codegen is complete.
- This very closely mimics the timing of the state machines both for
query state and the fragment instance states.
Testing:
- Ran exhaustive tests successfully.
An example of how the codegen in the profile looks:
Per Node Profiles:
localhost:22002:
- AdmissionSlots: 1 (1)
- BloomFilterBytes: 16.00 MB (16777216)
- ScratchBytesRead: 0
- ScratchBytesWritten: 0
- ScratchFileUsedBytes: 0
- ScratchReads: 0 (0)
- ScratchWrites: 0 (0)
- TotalEncryptionTime: 0.000ns
- TotalReadBlockTime: 0.000ns
Buffer pool:
- AllocTime: 153.663us
- CumulativeAllocationBytes: 16.00 MB (16777216)
- CumulativeAllocations: 8 (8)
- PeakReservation: 16.00 MB (16777216)
- PeakUnpinnedBytes: 0
- PeakUsedReservation: 16.00 MB (16777216)
- ReadIoBytes: 0
- ReadIoOps: 0 (0)
- ReadIoWaitTime: 0.000ns
- ReservationLimit: 16.00 MB (16777216)
- SystemAllocTime: 133.841us
- WriteIoBytes: 0
- WriteIoOps: 0 (0)
- WriteIoWaitTime: 0.000ns
Fragment F02:
CodeGen:(Total: 695.207ms, non-child: 0.000ns, % non-child: 0.00%)
- CodegenInvoluntaryContextSwitches: 102 (102)
- CodegenTotalWallClockTime: 695.180ms
- CodegenSysTime: 0.000ns
- CodegenUserTime: 688.998ms
- CodegenVoluntaryContextSwitches: 22 (22)
- CompileTime: 39.657ms
- IrGenerationTime: 27.443ms
- LoadTime: 0.000ns
- ModuleBitcodeSize: 2.53 MB (2647876)
- NumFunctions: 161 (161)
- NumInstructions: 7.13K (7128)
- OptimizationTime: 596.075ms
- PeakMemoryUsage: 3.48 MB (3649536)
- PrepareTime: 30.782ms
Fragment F00:
CodeGen:(Total: 145.314ms, non-child: 0.000ns, % non-child: 0.00%)
- CodegenInvoluntaryContextSwitches: 11 (11)
- CodegenTotalWallClockTime: 145.304ms
- CodegenSysTime: 3.944ms
- CodegenUserTime: 137.738ms
- CodegenVoluntaryContextSwitches: 4 (4)
- CompileTime: 6.697ms
- IrGenerationTime: 3.173ms
- LoadTime: 0.000ns
- ModuleBitcodeSize: 2.53 MB (2647876)
- NumFunctions: 41 (41)
- NumInstructions: 1.02K (1024)
- OptimizationTime: 119.623ms
- PeakMemoryUsage: 512.00 KB (524288)
- PrepareTime: 14.405ms
Change-Id: I3aef8bc621f96caafe9a1c378617a2987e4ad452
Reviewed-on: http://gerrit.cloudera.org:8080/15408
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/benchmarks/expr-benchmark.cc | 20 ++-
be/src/benchmarks/hash-benchmark.cc | 8 +-
be/src/codegen/llvm-codegen-test.cc | 23 ++--
be/src/codegen/llvm-codegen.cc | 13 +-
be/src/codegen/llvm-codegen.h | 13 +-
be/src/exec/aggregation-node-base.cc | 13 +-
be/src/exec/aggregation-node-base.h | 4 +-
be/src/exec/aggregator.cc | 7 +-
be/src/exec/aggregator.h | 16 ++-
be/src/exec/analytic-eval-node.cc | 4 +-
be/src/exec/analytic-eval-node.h | 2 +-
be/src/exec/blocking-join-node.cc | 4 +-
be/src/exec/blocking-join-node.h | 2 +-
be/src/exec/data-sink.cc | 23 +++-
be/src/exec/data-sink.h | 28 ++--
be/src/exec/exchange-node.cc | 21 +--
be/src/exec/exchange-node.h | 5 +-
be/src/exec/exec-node.cc | 55 ++++----
be/src/exec/exec-node.h | 49 ++++---
be/src/exec/grouping-aggregator.cc | 21 ++-
be/src/exec/grouping-aggregator.h | 12 +-
be/src/exec/hdfs-avro-scanner.cc | 5 +-
be/src/exec/hdfs-avro-scanner.h | 4 +-
be/src/exec/hdfs-columnar-scanner.cc | 3 +-
be/src/exec/hdfs-columnar-scanner.h | 2 +-
be/src/exec/hdfs-scan-node-base.cc | 26 ++--
be/src/exec/hdfs-scan-node-base.h | 5 +-
be/src/exec/hdfs-scanner.cc | 18 +--
be/src/exec/hdfs-scanner.h | 4 +-
be/src/exec/hdfs-sequence-scanner.cc | 3 +-
be/src/exec/hdfs-sequence-scanner.h | 2 +-
be/src/exec/hdfs-table-sink.cc | 2 +-
be/src/exec/hdfs-table-sink.h | 2 +-
be/src/exec/hdfs-text-scanner.cc | 3 +-
be/src/exec/hdfs-text-scanner.h | 2 +-
be/src/exec/join-builder.cc | 2 +-
be/src/exec/join-builder.h | 2 +-
be/src/exec/nested-loop-join-builder.cc | 3 +-
be/src/exec/nested-loop-join-builder.h | 2 +-
be/src/exec/nested-loop-join-node.cc | 2 +-
be/src/exec/nested-loop-join-node.h | 2 +-
be/src/exec/non-grouping-aggregator.cc | 16 +--
be/src/exec/non-grouping-aggregator.h | 10 +-
be/src/exec/partial-sort-node.cc | 20 +--
be/src/exec/partial-sort-node.h | 5 +-
be/src/exec/partitioned-hash-join-builder.cc | 41 +++---
be/src/exec/partitioned-hash-join-builder.h | 22 ++-
be/src/exec/partitioned-hash-join-node.cc | 45 +++---
be/src/exec/partitioned-hash-join-node.h | 7 +-
be/src/exec/scan-node.cc | 4 +-
be/src/exec/scan-node.h | 2 +-
be/src/exec/select-node.cc | 16 +--
be/src/exec/select-node.h | 5 +-
be/src/exec/sort-node.cc | 18 +--
be/src/exec/sort-node.h | 5 +-
be/src/exec/subplan-node.cc | 4 +-
be/src/exec/subplan-node.h | 4 +-
be/src/exec/topn-node.cc | 17 +--
be/src/exec/topn-node.h | 5 +-
be/src/exec/union-node.cc | 16 +--
be/src/exec/union-node.h | 5 +-
be/src/exec/unnest-node.cc | 5 +-
be/src/exec/unnest-node.h | 4 +-
be/src/exprs/agg-fn.cc | 5 +-
be/src/exprs/agg-fn.h | 7 +-
be/src/exprs/expr-codegen-test.cc | 16 ++-
be/src/exprs/hive-udf-call.cc | 2 +-
be/src/exprs/hive-udf-call.h | 2 +-
be/src/exprs/is-not-empty-predicate.cc | 2 +-
be/src/exprs/is-not-empty-predicate.h | 2 +-
be/src/exprs/kudu-partition-expr.cc | 4 +-
be/src/exprs/kudu-partition-expr.h | 2 +-
be/src/exprs/scalar-expr.cc | 13 +-
be/src/exprs/scalar-expr.h | 15 +-
be/src/exprs/scalar-fn-call.cc | 3 +-
be/src/exprs/scalar-fn-call.h | 2 +-
be/src/exprs/slot-ref.cc | 3 +-
be/src/exprs/slot-ref.h | 2 +-
be/src/exprs/tuple-is-null-predicate.cc | 2 +-
be/src/exprs/tuple-is-null-predicate.h | 2 +-
be/src/exprs/valid-tuple-id.cc | 2 +-
be/src/exprs/valid-tuple-id.h | 2 +-
be/src/runtime/CMakeLists.txt | 1 +
be/src/runtime/data-stream-test.cc | 19 ++-
be/src/runtime/fragment-instance-state.cc | 51 ++-----
be/src/runtime/fragment-instance-state.h | 10 +-
be/src/runtime/fragment-state.cc | 165 ++++++++++++++++++++++
be/src/runtime/fragment-state.h | 196 +++++++++++++++++++++++++++
be/src/runtime/krpc-data-stream-sender.cc | 25 ++--
be/src/runtime/krpc-data-stream-sender.h | 14 +-
be/src/runtime/query-state.cc | 120 ++++++++++------
be/src/runtime/query-state.h | 16 ++-
be/src/runtime/runtime-state.cc | 21 ---
be/src/runtime/runtime-state.h | 71 ----------
be/src/runtime/test-env.cc | 5 +-
be/src/service/fe-support.cc | 19 ++-
be/src/udf/udf-internal.h | 6 +-
be/src/udf/udf.cc | 9 +-
be/src/util/tuple-row-compare.cc | 3 +-
be/src/util/tuple-row-compare.h | 3 +-
100 files changed, 930 insertions(+), 625 deletions(-)
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index 79ad8ae..028f137 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -50,8 +50,10 @@
#include "common/init.h"
#include "common/object-pool.h"
#include "common/status.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
#include "service/fe-support.h"
#include "service/frontend.h"
#include "service/impala-server.h"
@@ -70,6 +72,10 @@ class Planner {
ABORT_IF_ERROR(exec_env_.InitForFeTests());
}
+ ~Planner() {
+ if (fragment_state_ != nullptr) fragment_state_->ReleaseResources();
+ }
+
Status GeneratePlan(const string& stmt, TExecRequest* result) {
TQueryCtx query_ctx;
query_ctx.client_request.stmt = stmt;
@@ -78,18 +84,23 @@ class Planner {
TNetworkAddress dummy;
ImpalaServer::PrepareQueryContext(dummy, dummy, &query_ctx);
runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_));
+ TPlanFragmentCtx* fragment_ctx =
+ runtime_state_->obj_pool()->Add(new TPlanFragmentCtx());
+ fragment_state_ = runtime_state_->obj_pool()->Add(
+ new FragmentState(runtime_state_->query_state(), *fragment_ctx));
return frontend_.GetExecRequest(query_ctx, result);
}
- RuntimeState* GetRuntimeState() {
- return runtime_state_.get();
- }
+ RuntimeState* GetRuntimeState() { return runtime_state_.get(); }
+
+ FragmentState* GetFragmentState() { return fragment_state_; }
private:
Frontend frontend_;
ExecEnv exec_env_;
scoped_ptr<RuntimeState> runtime_state_;
+ FragmentState* fragment_state_;
TQueryOptions query_options_;
TSessionState session_state_;
@@ -114,7 +125,8 @@ static Status PrepareSelectList(
DCHECK_EQ(texprs.size(), 1);
RuntimeState* state = planner->GetRuntimeState();
ScalarExpr* expr;
- RETURN_IF_ERROR(ScalarExpr::Create(texprs[0], RowDescriptor(), state, &expr));
+ RETURN_IF_ERROR(
+ ScalarExpr::Create(texprs[0], RowDescriptor(), planner->GetFragmentState(), &expr));
RETURN_IF_ERROR(
ScalarExprEvaluator::Create(*expr, state, &pool, &mem_pool, &mem_pool, eval));
return Status::OK();
diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc
index 3f605c9..d8e7d5e 100644
--- a/be/src/benchmarks/hash-benchmark.cc
+++ b/be/src/benchmarks/hash-benchmark.cc
@@ -23,7 +23,9 @@
#include "codegen/llvm-codegen.h"
#include "common/init.h"
#include "experiments/data-provider.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
#include "runtime/test-env.h"
#include "service/fe-support.h"
#include "util/benchmark.h"
@@ -479,6 +481,10 @@ int main(int argc, char **argv) {
return -1;
}
status = test_env.CreateQueryState(0, nullptr, &state);
+ QueryState* qs = state->query_state();
+ TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx());
+ FragmentState* fragment_state =
+ qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx));
if (!status.ok()) {
cout << "Could not create RuntimeState";
return -1;
@@ -492,7 +498,7 @@ int main(int argc, char **argv) {
DataProvider mixed_provider(&mem_pool, mixed_profile);
scoped_ptr<LlvmCodeGen> codegen;
- status = LlvmCodeGen::CreateImpalaCodegen(state, NULL, "test", &codegen);
+ status = LlvmCodeGen::CreateImpalaCodegen(fragment_state, NULL, "test", &codegen);
if (!status.ok()) {
cout << "Could not start codegen.";
return -1;
diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc
index f77d8df..36b9bad 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -23,6 +23,8 @@
#include "codegen/llvm-codegen.h"
#include "common/init.h"
#include "common/object-pool.h"
+#include "runtime/fragment-state.h"
+#include "runtime/query-state.h"
#include "runtime/string-value.h"
#include "runtime/test-env.h"
#include "service/fe-support.h"
@@ -42,16 +44,21 @@ namespace impala {
class LlvmCodeGenTest : public testing:: Test {
protected:
scoped_ptr<TestEnv> test_env_;
- RuntimeState* runtime_state_;
+ FragmentState* fragment_state_;
virtual void SetUp() {
test_env_.reset(new TestEnv());
ASSERT_OK(test_env_->Init());
+ RuntimeState* runtime_state_;
ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
+ QueryState* qs = runtime_state_->query_state();
+ TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx());
+ fragment_state_ = qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx));
}
virtual void TearDown() {
- runtime_state_ = NULL;
+ fragment_state_->ReleaseResources();
+ fragment_state_ = nullptr;
test_env_.reset();
}
@@ -78,8 +85,8 @@ class LlvmCodeGenTest : public testing:: Test {
// Wrapper to call private test-only methods on LlvmCodeGen object
Status CreateFromFile(const string& filename, scoped_ptr<LlvmCodeGen>* codegen) {
- RETURN_IF_ERROR(LlvmCodeGen::CreateFromFile(runtime_state_,
- runtime_state_->obj_pool(), NULL, filename, "test", codegen));
+ RETURN_IF_ERROR(LlvmCodeGen::CreateFromFile(fragment_state_,
+ fragment_state_->obj_pool(), NULL, filename, "test", codegen));
return (*codegen)->MaterializeModule();
}
@@ -342,7 +349,7 @@ llvm::Function* CodegenStringTest(LlvmCodeGen* codegen) {
// and modify it.
TEST_F(LlvmCodeGenTest, StringValue) {
scoped_ptr<LlvmCodeGen> codegen;
- ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, NULL, "test", &codegen));
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", &codegen));
EXPECT_TRUE(codegen.get() != NULL);
string str("Test");
@@ -383,7 +390,7 @@ TEST_F(LlvmCodeGenTest, StringValue) {
// Test calling memcpy intrinsic
TEST_F(LlvmCodeGenTest, MemcpyTest) {
scoped_ptr<LlvmCodeGen> codegen;
- ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, NULL, "test", &codegen));
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", &codegen));
ASSERT_TRUE(codegen.get() != NULL);
LlvmCodeGen::FnPrototype prototype(codegen.get(), "MemcpyTest", codegen->void_type());
@@ -429,7 +436,7 @@ TEST_F(LlvmCodeGenTest, HashTest) {
// Loop to test both the sse4 on/off paths
for (int i = 0; i < 2; ++i) {
scoped_ptr<LlvmCodeGen> codegen;
- ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, NULL, "test", &codegen));
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, NULL, "test", &codegen));
ASSERT_TRUE(codegen.get() != NULL);
const auto close_codegen =
MakeScopeExitTrigger([&codegen]() { codegen->Close(); });
@@ -541,7 +548,7 @@ TEST_F(LlvmCodeGenTest, CpuAttrWhitelist) {
// finalizes the llvm module.
TEST_F(LlvmCodeGenTest, CleanupNonFinalizedMethodsTest) {
scoped_ptr<LlvmCodeGen> codegen;
- ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, nullptr, "test", &codegen));
+ ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(fragment_state_, nullptr, "test", &codegen));
ASSERT_TRUE(codegen.get() != nullptr);
const auto close_codegen = MakeScopeExitTrigger([&codegen]() { codegen->Close(); });
LlvmBuilder builder(codegen->context());
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 0d14bdf..333a645 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -70,6 +70,7 @@
#include "runtime/lib-cache.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
+#include "runtime/fragment-state.h"
#include "runtime/runtime-state.h"
#include "runtime/string-value.h"
#include "runtime/timestamp-value.h"
@@ -194,7 +195,7 @@ Status LlvmCodeGen::InitializeLlvm(bool load_backend) {
return Status::OK();
}
-LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
+LlvmCodeGen::LlvmCodeGen(FragmentState* state, ObjectPool* pool,
MemTracker* parent_mem_tracker, const string& id)
: state_(state),
id_(id),
@@ -221,7 +222,7 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool,
llvm_thread_counters_ = ADD_THREAD_COUNTERS(profile_, "Codegen");
}
-Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool,
+Status LlvmCodeGen::CreateFromFile(FragmentState* state, ObjectPool* pool,
MemTracker* parent_mem_tracker, const string& file, const string& id,
scoped_ptr<LlvmCodeGen>* codegen) {
codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
@@ -239,7 +240,7 @@ error:
return status;
}
-Status LlvmCodeGen::CreateFromMemory(RuntimeState* state, ObjectPool* pool,
+Status LlvmCodeGen::CreateFromMemory(FragmentState* state, ObjectPool* pool,
MemTracker* parent_mem_tracker, const string& id, scoped_ptr<LlvmCodeGen>* codegen) {
codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id));
SCOPED_TIMER((*codegen)->profile_->total_time_counter());
@@ -369,7 +370,7 @@ void LlvmCodeGen::StripGlobalCtorsDtors(llvm::Module* module) {
if (destructors != NULL) destructors->eraseFromParent();
}
-Status LlvmCodeGen::CreateImpalaCodegen(RuntimeState* state,
+Status LlvmCodeGen::CreateImpalaCodegen(FragmentState* state,
MemTracker* parent_mem_tracker, const string& id,
scoped_ptr<LlvmCodeGen>* codegen_ret) {
DCHECK(state != nullptr);
@@ -993,8 +994,8 @@ int LlvmCodeGen::InlineConstFnAttrs(const FunctionContext::TypeDesc& ret_type,
int i_val = static_cast<int>(i_arg->getSExtValue());
DCHECK(state_ != nullptr);
// All supported constants are currently integers.
- call_instr->replaceAllUsesWith(GetI32Constant(
- FunctionContextImpl::GetConstFnAttr(state_, ret_type, arg_types, t_val, i_val)));
+ call_instr->replaceAllUsesWith(GetI32Constant(FunctionContextImpl::GetConstFnAttr(
+ state_->query_options().decimal_v2, ret_type, arg_types, t_val, i_val)));
call_instr->eraseFromParent();
++replaced;
}
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 6e4ab95..f28fbe9 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -85,6 +85,7 @@ namespace impala {
class CodegenCallGraph;
class CodegenSymbolEmitter;
+class FragmentState;
class ImpalaMCJITMemoryManager;
class SubExprElimination;
class TupleDescriptor;
@@ -169,7 +170,7 @@ class LlvmCodeGen {
/// 'codegen' will contain the created object on success.
/// 'parent_mem_tracker' - if non-NULL, the CodeGen MemTracker is created under this.
/// 'id' is used for outputting the IR module for debugging.
- static Status CreateImpalaCodegen(RuntimeState* state, MemTracker* parent_mem_tracker,
+ static Status CreateImpalaCodegen(FragmentState* state, MemTracker* parent_mem_tracker,
const std::string& id, boost::scoped_ptr<LlvmCodeGen>* codegen);
~LlvmCodeGen();
@@ -610,7 +611,7 @@ class LlvmCodeGen {
friend class SubExprElimination;
/// Top level codegen object. 'module_id' is used for debugging when outputting the IR.
- LlvmCodeGen(RuntimeState* state, ObjectPool* pool, MemTracker* parent_mem_tracker,
+ LlvmCodeGen(FragmentState* state, ObjectPool* pool, MemTracker* parent_mem_tracker,
const std::string& module_id);
/// Initializes the jitter and execution engine with the given module.
@@ -620,7 +621,7 @@ class LlvmCodeGen {
/// 'codegen' will contain the created object on success. The functions in the module
/// are materialized lazily. Getting a reference to a function via GetFunction() will
/// materialize the function and its callees recursively.
- static Status CreateFromFile(RuntimeState* state, ObjectPool* pool,
+ static Status CreateFromFile(FragmentState* state, ObjectPool* pool,
MemTracker* parent_mem_tracker, const std::string& file,
const std::string& id, boost::scoped_ptr<LlvmCodeGen>* codegen);
@@ -628,7 +629,7 @@ class LlvmCodeGen {
/// 'codegen' will contain the created object on success. The functions in the module
/// are materialized lazily. Getting a reference to a function via GetFunction() will
/// materialize the function and its callees recursively.
- static Status CreateFromMemory(RuntimeState* state, ObjectPool* pool,
+ static Status CreateFromMemory(FragmentState* state, ObjectPool* pool,
MemTracker* parent_mem_tracker, const std::string& id,
boost::scoped_ptr<LlvmCodeGen>* codegen);
@@ -755,9 +756,9 @@ class LlvmCodeGen {
/// Used for determining dependencies when materializing IR functions.
static CodegenCallGraph shared_call_graph_;
- /// Pointer to the RuntimeState which owns this codegen object. Needed in
+ /// Pointer to the FragmentState which owns this codegen object. Needed in
/// InlineConstFnAttr() to access the query options.
- const RuntimeState* state_;
+ const FragmentState* state_;
/// ID used for debugging (can be e.g. the fragment instance ID)
std::string id_;
diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc
index 2a12a3e..0cad9a8 100644
--- a/be/src/exec/aggregation-node-base.cc
+++ b/be/src/exec/aggregation-node-base.cc
@@ -21,6 +21,7 @@
#include "exec/grouping-aggregator.h"
#include "exec/non-grouping-aggregator.h"
#include "exec/streaming-aggregation-node.h"
+#include "runtime/fragment-state.h"
#include "runtime/runtime-state.h"
#include "util/runtime-profile-counters.h"
@@ -28,7 +29,7 @@
namespace impala {
-Status AggregationPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status AggregationPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
int num_ags = tnode_->agg_node.aggregators.size();
for (int i = 0; i < num_ags; ++i) {
@@ -44,6 +45,7 @@ Status AggregationPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(aggs_[i]->Init(agg, state, this));
}
DCHECK(aggs_.size() > 0);
+ state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK();
}
@@ -94,15 +96,16 @@ Status AggregationNodeBase::Prepare(RuntimeState* state) {
agg->SetDebugOptions(debug_options_);
RETURN_IF_ERROR(agg->Prepare(state));
}
- state->CheckAndAddCodegenDisabledMessage(runtime_profile());
return Status::OK();
}
-void AggregationNodeBase::Codegen(RuntimeState* state) {
+void AggregationPlanNode::Codegen(FragmentState* state) {
DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
+ PlanNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
- for (auto& agg : aggs_) agg->Codegen(state);
+ for (auto& agg : aggs_) {
+ agg->Codegen(state);
+ }
}
Status AggregationNodeBase::Reset(RuntimeState* state, RowBatch* row_batch) {
diff --git a/be/src/exec/aggregation-node-base.h b/be/src/exec/aggregation-node-base.h
index efdc964..6d148b9 100644
--- a/be/src/exec/aggregation-node-base.h
+++ b/be/src/exec/aggregation-node-base.h
@@ -27,7 +27,8 @@ namespace impala {
class AggregationPlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
+ virtual void Codegen(FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
~AggregationPlanNode() {}
@@ -45,7 +46,6 @@ class AggregationNodeBase : public ExecNode {
ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs);
virtual Status Prepare(RuntimeState* state) override;
- virtual void Codegen(RuntimeState* state) override;
virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
protected:
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index 705102b..d6eb986 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -28,6 +28,7 @@
#include "exprs/scalar-expr-evaluator.h"
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.h"
@@ -43,7 +44,7 @@
namespace impala {
AggregatorConfig::AggregatorConfig(
- const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx)
+ const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx)
: agg_idx_(agg_idx),
intermediate_tuple_id_(taggregator.intermediate_tuple_id),
intermediate_tuple_desc_(
@@ -55,7 +56,7 @@ AggregatorConfig::AggregatorConfig(
needs_finalize_(taggregator.need_finalize) {}
Status AggregatorConfig::Init(
- const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) {
+ const TAggregator& taggregator, FragmentState* state, PlanNode* pnode) {
DCHECK(intermediate_tuple_desc_ != nullptr);
DCHECK(output_tuple_desc_ != nullptr);
DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
@@ -85,6 +86,7 @@ Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool,
const AggregatorConfig& config, const std::string& name)
: id_(exec_node->id()),
exec_node_(exec_node),
+ config_(config),
agg_idx_(config.agg_idx_),
pool_(pool),
intermediate_tuple_id_(config.intermediate_tuple_id_),
@@ -120,6 +122,7 @@ Status Aggregator::Prepare(RuntimeState* state) {
}
Status Aggregator::Open(RuntimeState* state) {
+ runtime_profile_->AppendExecOption(config_.codegen_status_msg_);
RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
RETURN_IF_ERROR(ScalarExprEvaluator::Open(conjunct_evals_, state));
return Status::OK();
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index 3293c9d..693716e 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -38,6 +38,7 @@ class PlanNode;
class CodegenAnyVal;
class DescriptorTbl;
class ExecNode;
+class FragmentState;
class LlvmBuilder;
class LlvmCodeGen;
class MemPool;
@@ -60,12 +61,12 @@ class AggregatorConfig {
public:
/// 'agg_idx' is the index of 'TAggregator' in the parent TAggregationNode.
AggregatorConfig(
- const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx);
+ const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx);
virtual Status Init(
- const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode);
+ const TAggregator& taggregator, FragmentState* state, PlanNode* pnode);
/// Closes the expressions created in Init();
virtual void Close();
- virtual Status Codegen(RuntimeState* state) = 0;
+ virtual void Codegen(FragmentState* state) = 0;
virtual ~AggregatorConfig() {}
/// The index of this Aggregator within the AggregationNode which is also equivalent to
@@ -98,6 +99,11 @@ class AggregatorConfig {
/// The list of all aggregate operations for this aggregator.
std::vector<AggFn*> aggregate_functions_;
+ /// A message that will eventually be added to the aggregator's (spawned using this
+ /// config) runtime profile to convey codegen related information. Populated in
+ /// Codegen().
+ std::string codegen_status_msg_;
+
virtual int GetNumGroupingExprs() const = 0;
protected:
@@ -134,7 +140,6 @@ class Aggregator {
/// Aggregators follow the same lifecycle as ExecNodes, except that after Open() and
/// before GetNext() rows should be added with AddBatch(), followed by InputDone()[
virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
- virtual void Codegen(RuntimeState* state) = 0;
virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
virtual Status GetNext(
RuntimeState* state, RowBatch* row_batch, bool* eos) WARN_UNUSED_RESULT = 0;
@@ -172,6 +177,9 @@ class Aggregator {
const int id_;
ExecNode* exec_node_;
+ /// Reference to the config object used to generate this instance.
+ const AggregatorConfig& config_;
+
/// The index of this Aggregator within the AggregationNode. When returning output, this
/// Aggregator should only write tuples at 'agg_idx_' within the row.
const int agg_idx_;
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 37d1d31..c3287e0 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -26,8 +26,8 @@
#include "exprs/scalar-expr.h"
#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "udf/udf-internal.h"
@@ -42,7 +42,7 @@ using namespace strings;
namespace impala {
-Status AnalyticEvalPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status AnalyticEvalPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
const TAnalyticNode& analytic_node = tnode.analytic_node;
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index d55f5f9..fc6967b 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -34,7 +34,7 @@ class ScalarExprEvaluator;
class AnalyticEvalPlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 2912e63..97c6fe6 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -23,9 +23,9 @@
#include "exec/join-builder.h"
#include "exprs/scalar-expr.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/thread-resource-mgr.h"
@@ -43,7 +43,7 @@ using namespace impala;
const char* BlockingJoinNode::LLVM_CLASS_NAME = "class.impala::BlockingJoinNode";
-Status BlockingJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status BlockingJoinPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
DCHECK(tnode.__isset.join_node);
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
join_op_ = tnode_->join_node.join_op;
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 18c6cf2..39211de 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -37,7 +37,7 @@ class BlockingJoinPlanNode : public PlanNode {
public:
/// Subclasses should call BlockingJoinNode::Init() and then perform any other Init()
/// work, e.g. creating expr trees.
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override = 0;
/// Returns true if this join node will use a separate builder that is the root sink
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index a49df19..30e1201 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -36,6 +36,7 @@
#include "gen-cpp/ImpalaInternalService_constants.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gutil/strings/substitute.h"
+#include "runtime/fragment-state.h"
#include "runtime/krpc-data-stream-sender.h"
#include "runtime/mem-tracker.h"
#include "util/container-util.h"
@@ -46,19 +47,30 @@ using strings::Substitute;
namespace impala {
+void DataSinkConfig::Codegen(FragmentState* state) {
+ return;
+}
+
void DataSinkConfig::Close() {
ScalarExpr::Close(output_exprs_);
}
Status DataSinkConfig::Init(
- const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
tsink_ = &tsink;
input_row_desc_ = input_row_desc;
return ScalarExpr::Create(tsink.output_exprs, *input_row_desc_, state, &output_exprs_);
}
+void DataSinkConfig::AddCodegenStatus(
+ const Status& codegen_status, const std::string& extra_label) {
+ codegen_status_msgs_.emplace_back(FragmentState::GenerateCodegenMsg(
+ codegen_status.ok(), codegen_status, extra_label));
+}
+
Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
- const RowDescriptor* row_desc, RuntimeState* state, DataSinkConfig** data_sink) {
+ const RowDescriptor* row_desc, FragmentState* state,
+ DataSinkConfig** data_sink) {
ObjectPool* pool = state->obj_pool();
*data_sink = nullptr;
switch (thrift_sink.type) {
@@ -151,12 +163,11 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
return Status::OK();
}
-void DataSink::Codegen(RuntimeState* state) {
- return;
-}
-
Status DataSink::Open(RuntimeState* state) {
DCHECK_EQ(output_exprs_.size(), output_expr_evals_.size());
+ for (const string& codegen_msg : sink_config_.codegen_status_msgs_) {
+ profile_->AppendExecOption(codegen_msg);
+ }
return ScalarExprEvaluator::Open(output_expr_evals_, state);
}
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 9445ff2..3dcb268 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -29,12 +29,11 @@
namespace impala {
class DataSink;
+class FragmentState;
class MemPool;
class MemTracker;
class ObjectPool;
class RowBatch;
-class RuntimeProfile;
-class RuntimeState;
class RowDescriptor;
class ScalarExpr;
class ScalarExprEvaluator;
@@ -57,11 +56,15 @@ class DataSinkConfig {
virtual DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const = 0;
+ /// Codegen expressions in the sink. Overridden by sink type which supports codegen.
+ /// No-op by default.
+ virtual void Codegen(FragmentState* state);
+
/// Close() releases all resources that were allocated during creation.
virtual void Close();
/// Pointer to the thrift data sink struct associated with this sink. Set in Init() and
- /// owned by QueryState.
+ /// owned by FragmentState.
const TDataSink* tsink_ = nullptr;
/// The row descriptor for the rows consumed by the sink. Owned by root plan node of
@@ -72,16 +75,24 @@ class DataSinkConfig {
/// Not used in some sub-classes.
std::vector<ScalarExpr*> output_exprs_;
+ /// A list of messages that will eventually be added to the data sink's runtime
+ /// profile to convey codegen related information. Populated in Codegen().
+ std::vector<std::string> codegen_status_msgs_;
+
/// Creates a new data sink config, allocated in state->obj_pool() and returned through
/// *sink, from the thrift sink object in fragment_ctx.
static Status CreateConfig(const TDataSink& thrift_sink, const RowDescriptor* row_desc,
- RuntimeState* state, DataSinkConfig** sink);
+ FragmentState* state, DataSinkConfig** data_sink);
protected:
/// Sets reference to TDataSink and initializes the expressions. Returns error status on
/// failure. If overridden in subclass, must first call superclass's Init().
- virtual Status Init(
- const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state);
+ virtual Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+ FragmentState* state);
+
+ /// Helper method to add codegen messages from status objects.
+ void AddCodegenStatus(
+ const Status& codegen_status, const std::string& extra_label = "");
private:
DISALLOW_COPY_AND_ASSIGN(DataSinkConfig);
@@ -111,11 +122,8 @@ class DataSink {
/// initializes their evaluators. Subclasses must call DataSink::Prepare().
virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
- /// Codegen expressions in the sink. Overridden by sink type which supports codegen.
- /// No-op by default.
- virtual void Codegen(RuntimeState* state);
-
/// Call before Send() to open the sink and initialize output expression evaluators.
+ /// Subclasses must call DataSink::Open().
virtual Status Open(RuntimeState* state);
/// Send a row batch into this sink. Generally, Send() should not retain any references
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index ec640eb..9a2894d 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -22,6 +22,7 @@
#include "exec/exec-node-util.h"
#include "exprs/scalar-expr.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/krpc-data-stream-recvr.h"
#include "runtime/row-batch.h"
@@ -42,7 +43,7 @@ using namespace impala;
DEFINE_int64(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
"(Advanced) Maximum size of per-query receive-side buffer");
-Status ExchangePlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status ExchangePlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
bool is_merging = tnode_->exchange_node.__isset.sort_info;
if (!is_merging) return Status::OK();
@@ -52,6 +53,7 @@ Status ExchangePlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
sort_info.ordering_exprs, *row_descriptor_, state, &ordering_exprs_));
row_comparator_config_ =
state->obj_pool()->Add(new TupleRowComparatorConfig(sort_info, ordering_exprs_));
+ state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK();
}
@@ -110,28 +112,19 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
FLAGS_exchg_node_buffer_size_bytes, is_merging_, runtime_profile(), mem_tracker(),
&recvr_buffer_pool_client_);
- if (is_merging_) state->CheckAndAddCodegenDisabledMessage(runtime_profile());
return Status::OK();
}
-void ExchangeNode::Codegen(RuntimeState* state) {
+void ExchangePlanNode::Codegen(FragmentState* state) {
DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
+ PlanNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
- if (is_merging_) {
- const ExchangePlanNode& exch_pnode = static_cast<const ExchangePlanNode&>(plan_node_);
- ExchangePlanNode& non_const_pnode = const_cast<ExchangePlanNode&>(exch_pnode);
- non_const_pnode.Codegen(state, runtime_profile());
+ if (row_comparator_config_ != nullptr) {
+ AddCodegenStatus(row_comparator_config_->Codegen(state));
}
}
-void ExchangePlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
- DCHECK(row_comparator_config_ != nullptr);
- Status codegen_status = row_comparator_config_->Codegen(state);
- runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
-}
-
Status ExchangeNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedOpenEventAdder ea(this);
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index 2342058..ab5c3c5 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -34,10 +34,10 @@ class TupleRowComparatorConfig;
class ExchangePlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
- void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+ virtual void Codegen(FragmentState* state) override;
~ExchangePlanNode(){}
@@ -66,7 +66,6 @@ class ExchangeNode : public ExecNode {
ObjectPool* pool, const ExchangePlanNode& pnode, const DescriptorTbl& descs);
virtual Status Prepare(RuntimeState* state);
- virtual void Codegen(RuntimeState* state);
/// Blocks until the first batch is available for consumption via GetNext().
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 55589e5..8d5d8f3 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -57,10 +57,10 @@
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/initial-reservations.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "util/debug-util.h"
@@ -72,7 +72,7 @@
using strings::Substitute;
namespace impala {
-Status PlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status PlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
tnode_ = &tnode;
row_descriptor_ = state->obj_pool()->Add(
new RowDescriptor(state->desc_tbl(), tnode_->row_tuples, tnode_->nullable_tuples));
@@ -93,14 +93,14 @@ void PlanNode::Close() {
}
}
-Status PlanNode::CreateTree(
- RuntimeState* state, const TPlan& plan, PlanNode** root) {
+Status PlanNode::CreateTree(FragmentState* state, const TPlan& plan, PlanNode** root) {
if (plan.nodes.size() == 0) {
*root = NULL;
return Status::OK();
}
int node_idx = 0;
- Status status = CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root);
+ Status status =
+ CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root);
if (status.ok() && node_idx + 1 != plan.nodes.size()) {
status = Status(
"Plan tree only partially reconstructed. Not all thrift nodes were used.");
@@ -112,9 +112,9 @@ Status PlanNode::CreateTree(
return status;
}
-Status PlanNode::CreateTreeHelper(RuntimeState* state,
- const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx,
- PlanNode** root) {
+Status PlanNode::CreateTreeHelper(FragmentState* state,
+ const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx,
+ PlanNode** root) {
// propagate error case
if (*node_idx >= tnodes.size()) {
return Status("Failed to reconstruct plan tree from thrift.");
@@ -123,7 +123,7 @@ Status PlanNode::CreateTreeHelper(RuntimeState* state,
int num_children = tnode.num_children;
PlanNode* node = NULL;
- RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node, state));
+ RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node));
if (parent != NULL) {
parent->children_.push_back(node);
} else {
@@ -131,7 +131,8 @@ Status PlanNode::CreateTreeHelper(RuntimeState* state,
}
for (int i = 0; i < num_children; ++i) {
++*node_idx;
- RETURN_IF_ERROR(CreateTreeHelper(state, tnodes, node, node_idx, NULL));
+ RETURN_IF_ERROR(
+ CreateTreeHelper(state, tnodes, node, node_idx, nullptr));
// we are expecting a child, but have used all nodes
// this means we have been given a bad tree and must fail
if (*node_idx >= tnodes.size()) {
@@ -144,8 +145,14 @@ Status PlanNode::CreateTreeHelper(RuntimeState* state,
return Status::OK();
}
-Status PlanNode::CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNode** node,
- RuntimeState* state) {
+void PlanNode::AddCodegenStatus(
+ const Status& codegen_status, const std::string& extra_label) {
+ codegen_status_msgs_.emplace_back(FragmentState::GenerateCodegenMsg(
+ codegen_status.ok(), codegen_status, extra_label));
+}
+
+Status PlanNode::CreatePlanNode(
+ ObjectPool* pool, const TPlanNode& tnode, PlanNode** node) {
switch (tnode.node_type) {
case TPlanNodeType::HDFS_SCAN_NODE:
*node = pool->Add(new HdfsScanPlanNode());
@@ -215,6 +222,14 @@ Status PlanNode::CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNo
return Status::OK();
}
+void PlanNode::Codegen(FragmentState* state) {
+ DCHECK(state->ShouldCodegen());
+ DCHECK(state->codegen() != nullptr);
+ for (PlanNode* child : children_) {
+ child->Codegen(state);
+ }
+}
+
const string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";
ExecNode::ExecNode(ObjectPool* pool, const PlanNode& pnode, const DescriptorTbl& descs)
@@ -231,7 +246,6 @@ ExecNode::ExecNode(ObjectPool* pool, const PlanNode& pnode, const DescriptorTbl&
rows_returned_counter_(nullptr),
rows_returned_rate_(nullptr),
containing_subplan_(nullptr),
- disable_codegen_(pnode.tnode_->disable_codegen),
num_rows_returned_(0),
is_closed_(false) {
runtime_profile_->SetPlanNodeId(id_);
@@ -271,17 +285,12 @@ Status ExecNode::Prepare(RuntimeState* state) {
return Status::OK();
}
-void ExecNode::Codegen(RuntimeState* state) {
- DCHECK(state->ShouldCodegen());
- DCHECK(state->codegen() != NULL);
- for (int i = 0; i < children_.size(); ++i) {
- children_[i]->Codegen(state);
- }
-}
-
Status ExecNode::Open(RuntimeState* state) {
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::OPEN, state));
DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
+ for (const string& codegen_msg : plan_node_.codegen_status_msgs_) {
+ runtime_profile_->AppendExecOption(codegen_msg);
+ }
return ScalarExprEvaluator::Open(conjunct_evals_, state);
}
@@ -470,10 +479,6 @@ Status ExecNode::QueryMaintenance(RuntimeState* state) {
return state->CheckQueryState();
}
-bool ExecNode::IsNodeCodegenDisabled() const {
- return disable_codegen_;
-}
-
// Codegen for EvalConjuncts. The generated signature is the same as EvalConjuncts().
//
// For a node with two conjunct predicates:
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index ec91276..998e19f 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -38,6 +38,7 @@ class Function;
namespace impala {
class DataSink;
+class FragmentState;
class MemPool;
class MemTracker;
class ObjectPool;
@@ -72,7 +73,7 @@ class PlanNode {
/// initialized here will be owned by state->obj_pool().
/// If overridden in subclass, must first call superclass's Init().
/// Should only be called after all children have been set and Init()-ed.
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state);
/// Close() releases all resources that were allocated in Init().
virtual void Close();
@@ -83,13 +84,24 @@ class PlanNode {
/// Creates plan node tree from list of nodes contained in plan via depth-first
/// traversal. All nodes are placed in state->obj_pool() and have Init() called on them.
/// Returns error if 'plan' is corrupted, otherwise success.
- static Status CreateTree(
- RuntimeState* state, const TPlan& plan, PlanNode** root) WARN_UNUSED_RESULT;
+ static Status CreateTree(FragmentState* state, const TPlan& plan, PlanNode** root);
+
+ /// Recursively calls Codegen() on all children.
+ /// Expected to be overriden in subclass to generate LLVM IR functions and register
+ /// them with the LlvmCodeGen object. The function pointers of the compiled IR functions
+ /// will be set up when the LlvmCodeGen object is finalized. If overridden in subclass,
+ /// must also call superclass's Codegen() before or after the code generation for this
+ /// plan node. Will only be called once in the node's lifetime.
+ virtual void Codegen(FragmentState* state);
virtual ~PlanNode(){}
bool IsInSubplan() const { return containing_subplan_ != nullptr; }
+ /// Return true if codegen was disabled by the planner for this ExecNode. Does not
+ /// check to see if codegen was enabled for the enclosing fragment.
+ bool IsNodeCodegenDisabled() const { return tnode_->disable_codegen; }
+
/// TODO: IMPALA-9216: Add accessor methods for these members instead of making
/// them public.
/// Reference to the thrift node that represents this PlanNode.
@@ -112,16 +124,24 @@ class PlanNode {
/// 'this' node. Not owned.
SubplanPlanNode* containing_subplan_ = nullptr;
+ /// A list of messages that will eventually be added to the exec node's runtime
+ /// profile to convey codegen related information. Populated in Codegen().
+ std::vector<std::string> codegen_status_msgs_;
+
+ protected:
+ /// Helper method to add codegen messages from status objects.
+ void AddCodegenStatus(
+ const Status& codegen_status, const std::string& extra_label = "");
+
private:
DISALLOW_COPY_AND_ASSIGN(PlanNode);
/// Create a single exec node derived from thrift node; place exec node in 'pool'.
- static Status CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNode** node,
- RuntimeState* state) WARN_UNUSED_RESULT;
+ static Status CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNode** node);
- static Status CreateTreeHelper(RuntimeState* state,
+ static Status CreateTreeHelper(FragmentState* state,
const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx,
- PlanNode** root) WARN_UNUSED_RESULT;
+ PlanNode** root);
};
class ExecNode {
@@ -139,14 +159,6 @@ class ExecNode {
/// If overridden in subclass, must first call superclass's Prepare().
virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
- /// Recursively calls Codegen() on all children.
- /// Expected to be overriden in subclass to generate LLVM IR functions and register
- /// them with the LlvmCodeGen object. The function pointers of the compiled IR functions
- /// will be set up in PlanFragmentExecutor::Open(). If overridden in subclass, must also
- /// call superclass's Codegen() before or after the code generation for this exec node.
- /// Will only be called once in the node's lifetime.
- virtual void Codegen(RuntimeState* state);
-
/// Performs any preparatory work prior to calling GetNext().
/// If overridden in subclass, must first call superclass's Open().
/// Open() is called after Prepare() or Reset(), i.e., possibly multiple times
@@ -321,10 +333,6 @@ class ExecNode {
const TBackendResourceProfile& resource_profile() { return resource_profile_; }
bool is_closed() const { return is_closed_; }
- /// Return true if codegen was disabled by the planner for this ExecNode. Does not
- /// check to see if codegen was enabled for the enclosing fragment.
- bool IsNodeCodegenDisabled() const;
-
/// Returns true if this node is inside the right-hand side plan tree of a SubplanNode.
bool IsInSubplan() const { return plan_node_.IsInSubplan(); }
@@ -428,9 +436,6 @@ class ExecNode {
/// Set by SubplanNode::Prepare() before Prepare() is called on 'this' node. Not owned.
SubplanNode* containing_subplan_;
- /// If true, codegen should be disabled for this exec node.
- const bool disable_codegen_;
-
virtual bool IsScanNode() const { return false; }
/// Executes 'debug_action_' if 'phase' matches 'debug_phase_'.
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index bed5c94..a66ed62 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -29,6 +29,7 @@
#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
@@ -84,14 +85,14 @@ static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
};
GroupingAggregatorConfig::GroupingAggregatorConfig(
- const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx)
+ const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx)
: AggregatorConfig(taggregator, state, pnode, agg_idx),
intermediate_row_desc_(intermediate_tuple_desc_, false),
is_streaming_preagg_(taggregator.use_streaming_preaggregation),
resource_profile_(taggregator.resource_profile){};
Status GroupingAggregatorConfig::Init(
- const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) {
+ const TAggregator& taggregator, FragmentState* state, PlanNode* pnode) {
RETURN_IF_ERROR(ScalarExpr::Create(
taggregator.grouping_exprs, input_row_desc_, state, &grouping_exprs_));
@@ -133,7 +134,6 @@ GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality)
: Aggregator(
exec_node, pool, config, Substitute("GroupingAggregator $0", config.agg_idx_)),
- agg_config_(config),
hash_table_config_(*config.hash_table_config_),
intermediate_row_desc_(config.intermediate_row_desc_),
is_streaming_preagg_(config.is_streaming_preagg_),
@@ -195,19 +195,14 @@ Status GroupingAggregator::Prepare(RuntimeState* state) {
return Status::OK();
}
-Status GroupingAggregatorConfig::Codegen(RuntimeState* state) {
+void GroupingAggregatorConfig::Codegen(FragmentState* state) {
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != nullptr);
TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
- return is_streaming_preagg_ ? CodegenAddBatchStreamingImpl(codegen, prefetch_mode) :
- CodegenAddBatchImpl(codegen, prefetch_mode);
-}
-
-void GroupingAggregator::Codegen(RuntimeState* state) {
- // TODO: This const cast will be removed once codegen call is moved before FIS creation
- Status codegen_status =
- const_cast<GroupingAggregatorConfig&>(agg_config_).Codegen(state);
- runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+ Status status = is_streaming_preagg_ ?
+ CodegenAddBatchStreamingImpl(codegen, prefetch_mode) :
+ CodegenAddBatchImpl(codegen, prefetch_mode);
+ codegen_status_msg_ = FragmentState::GenerateCodegenMsg(status.ok(), status);
}
Status GroupingAggregator::Open(RuntimeState* state) {
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index b4509bb..ffef305 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -36,6 +36,7 @@ class AggFnEvaluator;
class GroupingAggregator;
class PlanNode;
class LlvmCodeGen;
+class QueryState;
class RowBatch;
class RuntimeState;
struct ScalarExprsResultsRowLayout;
@@ -119,11 +120,11 @@ class Tuple;
class GroupingAggregatorConfig : public AggregatorConfig {
public:
GroupingAggregatorConfig(
- const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx);
+ const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx);
Status Init(
- const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) override;
+ const TAggregator& taggregator, FragmentState* state, PlanNode* pnode) override;
void Close() override;
- Status Codegen(RuntimeState* state) override;
+ void Codegen(FragmentState* state) override;
~GroupingAggregatorConfig() override {}
/// Row with the intermediate tuple as its only tuple.
@@ -193,7 +194,6 @@ class GroupingAggregator : public Aggregator {
const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality);
virtual Status Prepare(RuntimeState* state) override;
- virtual void Codegen(RuntimeState* state) override;
virtual Status Open(RuntimeState* state) override;
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
@@ -214,10 +214,6 @@ class GroupingAggregator : public Aggregator {
private:
struct Partition;
- /// TODO: Remove reference once codegen is performed before FIS creation.
- /// Reference to the config object and only used to call Codegen().
- const GroupingAggregatorConfig& agg_config_;
-
/// Reference to the hash table config which is a part of the GroupingAggregatorConfig
/// that was used to create this object. Its used to create an instance of the
/// HashTableCtx in Prepare(). Not Owned.
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 20fee8e..6e038a6 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -26,6 +26,7 @@
#include "exec/hdfs-scan-node.h"
#include "exec/read-write-util.h"
#include "exec/scanner-context.inline.h"
+#include "runtime/fragment-state.h"
#include "runtime/raw-value.h"
#include "runtime/runtime-state.h"
#include "util/codec.h"
@@ -79,7 +80,7 @@ Status HdfsAvroScanner::Open(ScannerContext* context) {
}
Status HdfsAvroScanner::Codegen(HdfsScanPlanNode* node,
- RuntimeState* state, llvm::Function** decode_avro_data_fn) {
+ FragmentState* state, llvm::Function** decode_avro_data_fn) {
*decode_avro_data_fn = nullptr;
DCHECK(state->ShouldCodegen());
DCHECK(state->codegen() != nullptr);
@@ -1098,7 +1099,7 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element,
}
Status HdfsAvroScanner::CodegenDecodeAvroData(const HdfsScanPlanNode* node,
- RuntimeState* state, llvm::Function** decode_avro_data_fn) {
+ FragmentState* state, llvm::Function** decode_avro_data_fn) {
const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
LlvmCodeGen* codegen = state->codegen();
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index b3bd355..40b52d5 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -96,7 +96,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
/// Codegen DecodeAvroData(). Stores the resulting function in 'decode_avro_data_fn' if
/// codegen was successful or nullptr otherwise.
static Status Codegen(
- HdfsScanPlanNode* node, RuntimeState* state, llvm::Function** decode_avro_data_fn);
+ HdfsScanPlanNode* node, FragmentState* state, llvm::Function** decode_avro_data_fn);
static const char* LLVM_CLASS_NAME;
@@ -208,7 +208,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
/// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted
/// functions. Stores the resulting function in 'decode_avro_data_fn' if codegen was
/// successful or returns an error.
- static Status CodegenDecodeAvroData(const HdfsScanPlanNode* node, RuntimeState* state,
+ static Status CodegenDecodeAvroData(const HdfsScanPlanNode* node, FragmentState* state,
llvm::Function** decode_avro_data_fn);
/// Codegens a version of MaterializeTuple() that reads records based on the table
diff --git a/be/src/exec/hdfs-columnar-scanner.cc b/be/src/exec/hdfs-columnar-scanner.cc
index b7a33fa..c84c2d8 100644
--- a/be/src/exec/hdfs-columnar-scanner.cc
+++ b/be/src/exec/hdfs-columnar-scanner.cc
@@ -22,6 +22,7 @@
#include "codegen/llvm-codegen.h"
#include "exec/hdfs-scan-node-base.h"
#include "exec/scratch-tuple-batch.h"
+#include "runtime/fragment-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -72,7 +73,7 @@ int HdfsColumnarScanner::TransferScratchTuples(RowBatch* dst_batch) {
return num_rows_to_commit;
}
-Status HdfsColumnarScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+Status HdfsColumnarScanner::Codegen(HdfsScanPlanNode* node, FragmentState* state,
llvm::Function** process_scratch_batch_fn) {
DCHECK(state->ShouldCodegen());
*process_scratch_batch_fn = nullptr;
diff --git a/be/src/exec/hdfs-columnar-scanner.h b/be/src/exec/hdfs-columnar-scanner.h
index a5c747c..a510f4e 100644
--- a/be/src/exec/hdfs-columnar-scanner.h
+++ b/be/src/exec/hdfs-columnar-scanner.h
@@ -38,7 +38,7 @@ class HdfsColumnarScanner : public HdfsScanner {
/// Codegen ProcessScratchBatch(). Stores the resulting function in
/// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
- static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+ static Status Codegen(HdfsScanPlanNode* node, FragmentState* state,
llvm::Function** process_scratch_batch_fn);
/// Class name in LLVM IR.
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index e515ab2..5e10f6a 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -41,6 +41,7 @@
#include "exprs/scalar-expr.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/io/request-context.h"
@@ -150,7 +151,7 @@ const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC =
// Determines how many unexpected remote bytes trigger an error in the runtime state
const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
-Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status HdfsScanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(ScanPlanNode::Init(tnode, state));
tuple_id_ = tnode.hdfs_scan_node.tuple_id;
@@ -222,10 +223,10 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
*min_max_row_desc, state, &min_max_conjuncts_));
}
- // TODO: Find formats to be read across all instances once codegen done per fragment.
- const TPlanFragmentInstanceCtx& instance_ctx = state->instance_ctx();
- auto ranges = instance_ctx.per_node_scan_ranges.find(tnode.node_id);
- if (ranges != instance_ctx.per_node_scan_ranges.end()) {
+ const vector<const TPlanFragmentInstanceCtx*>& instance_ctxs = state->instance_ctxs();
+ for (auto ctx : instance_ctxs) {
+ auto ranges = ctx->per_node_scan_ranges.find(tnode.node_id);
+ if (ranges == ctx->per_node_scan_ranges.end()) continue;
for (const TScanRangeParams& scan_range_param : ranges->second) {
const THdfsFileSplit& split = scan_range_param.scan_range.hdfs_file_split;
HdfsPartitionDescriptor* partition_desc =
@@ -233,6 +234,7 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
scanned_file_formats_.insert(partition_desc->file_format());
}
}
+ state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK();
}
@@ -406,21 +408,13 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
PrintHdfsSplitStats(per_volume_stats, &str);
runtime_profile()->AddInfoString("Table Name", hdfs_table_->fully_qualified_name());
runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
- state->CheckAndAddCodegenDisabledMessage(runtime_profile());
return Status::OK();
}
-void HdfsScanNodeBase::Codegen(RuntimeState* state) {
+void HdfsScanPlanNode::Codegen(FragmentState* state) {
DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
+ PlanNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
- const HdfsScanPlanNode& const_plan_node =
- static_cast<const HdfsScanPlanNode&>(plan_node_);
- HdfsScanPlanNode& hdfs_plan_node = const_cast<HdfsScanPlanNode&>(const_plan_node);
- hdfs_plan_node.Codegen(state, state->runtime_profile());
-}
-
-void HdfsScanPlanNode::Codegen(RuntimeState* state, RuntimeProfile* profile) {
for (THdfsFileFormat::type format: scanned_file_formats_) {
llvm::Function* fn;
Status status;
@@ -451,7 +445,7 @@ void HdfsScanPlanNode::Codegen(RuntimeState* state, RuntimeProfile* profile) {
codegen->AddFunctionToJit(
fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
}
- profile->AddCodegenMsg(status.ok(), status, format_name);
+ AddCodegenStatus(status, format_name);
}
}
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 66a15e9..6466778 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -101,10 +101,10 @@ struct ScanRangeMetadata {
class HdfsScanPlanNode : public ScanPlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
- void Codegen(RuntimeState* state, RuntimeProfile* profile);
+ virtual void Codegen(FragmentState* state) override;
/// Returns index into materialized_slots with 'path'. Returns SKIP_COLUMN if
/// that path is not materialized. Only valid to call after Init().
@@ -228,7 +228,6 @@ class HdfsScanNodeBase : public ScanNode {
~HdfsScanNodeBase();
virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
- virtual void Codegen(RuntimeState* state) override;
virtual Status Open(RuntimeState* state) override WARN_UNUSED_RESULT;
virtual Status Reset(
RuntimeState* state, RowBatch* row_batch) override WARN_UNUSED_RESULT;
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 44ad4b8..aa37ffa 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -26,6 +26,7 @@
#include "exec/text-converter.inline.h"
#include "exprs/scalar-expr-evaluator.h"
#include "runtime/collection-value-builder.h"
+#include "runtime/fragment-state.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/tuple-row.h"
@@ -326,7 +327,7 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
// ret i1 false
// }
Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
- RuntimeState* state, llvm::Function** write_complete_tuple_fn) {
+ FragmentState* state, llvm::Function** write_complete_tuple_fn) {
const vector<ScalarExpr*>& conjuncts = node->conjuncts_;
LlvmCodeGen* codegen = state->codegen();
*write_complete_tuple_fn = NULL;
@@ -339,7 +340,8 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
SlotDescriptor* slot_desc = node->materialized_slots_[i];
RETURN_IF_ERROR(TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc, &fn,
node->hdfs_table_->null_column_value().data(),
- node->hdfs_table_->null_column_value().size(), true, state->strict_mode()));
+ node->hdfs_table_->null_column_value().size(), true,
+ state->query_options().strict_mode));
if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) codegen->SetNoInline(fn);
slot_fns.push_back(fn);
}
@@ -489,14 +491,8 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
// tuple against that conjunct and start a new parse_block for the next conjunct
parse_block = llvm::BasicBlock::Create(context, "parse", fn, eval_fail_block);
llvm::Function* conjunct_fn;
- Status status =
- conjuncts[conjunct_idx]->GetCodegendComputeFn(codegen, false, &conjunct_fn);
- if (!status.ok()) {
- stringstream ss;
- ss << "Failed to codegen conjunct: " << status.GetDetail();
- state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
- return status;
- }
+ RETURN_IF_ERROR(
+ conjuncts[conjunct_idx]->GetCodegendComputeFn(codegen, false, &conjunct_fn));
if (node->materialized_slots_.size() + conjunct_idx
>= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
codegen->SetNoInline(conjunct_fn);
@@ -532,7 +528,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
}
Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanPlanNode* node,
- RuntimeState* state, llvm::Function* write_complete_tuple_fn,
+ FragmentState* state, llvm::Function* write_complete_tuple_fn,
llvm::Function** write_aligned_tuples_fn) {
LlvmCodeGen* codegen = state->codegen();
*write_aligned_tuples_fn = NULL;
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index d716b8d..18cd84b 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -489,13 +489,13 @@ class HdfsScanner {
/// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn'
/// if codegen was successful or NULL otherwise.
static Status CodegenWriteCompleteTuple(const HdfsScanPlanNode* node,
- RuntimeState* state, llvm::Function** write_complete_tuple_fn);
+ FragmentState* state, llvm::Function** write_complete_tuple_fn);
/// Codegen function to replace WriteAlignedTuples. WriteAlignedTuples is cross
/// compiled to IR. This function loads the precompiled IR function, modifies it,
/// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was
/// successful or NULL otherwise.
- static Status CodegenWriteAlignedTuples(const HdfsScanPlanNode*, RuntimeState*,
+ static Status CodegenWriteAlignedTuples(const HdfsScanPlanNode*, FragmentState*,
llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn);
/// Codegen function to replace InitTuple() removing runtime constants like the tuple
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 9520dcd..fee5ebc 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -36,6 +36,7 @@
#include "exec/text-converter.h"
#include "gen-cpp/ErrorCodes_types.h"
#include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-pool.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -74,7 +75,7 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
}
// Codegen for materialized parsed data into tuples.
-Status HdfsSequenceScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+Status HdfsSequenceScanner::Codegen(HdfsScanPlanNode* node, FragmentState* state,
llvm::Function** write_aligned_tuples_fn) {
*write_aligned_tuples_fn = nullptr;
DCHECK(state->ShouldCodegen());
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 0b79b78..3dc445e 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -171,7 +171,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
/// Codegen WriteAlignedTuples(). Stores the resulting function in
/// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
- static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+ static Status Codegen(HdfsScanPlanNode* node, FragmentState* state,
llvm::Function** write_aligned_tuples_fn);
protected:
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 9246bda..c1cde0b 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -54,7 +54,7 @@ using namespace strings;
namespace impala {
Status HdfsTableSinkConfig::Init(
- const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
DCHECK(tsink_->__isset.table_sink);
DCHECK(tsink_->table_sink.__isset.hdfs_table_sink);
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index ccfbd54..2ce9d5c 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -108,7 +108,7 @@ class HdfsTableSinkConfig : public DataSinkConfig {
protected:
Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
- RuntimeState* state) override;
+ FragmentState* state) override;
};
/// The sink consumes all row batches of its child execution tree, and writes the
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index e996f0b..78355dd 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -41,6 +41,7 @@
#include "gen-cpp/ErrorCodes_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
#include "runtime/io/request-context.h"
#include "runtime/io/request-ranges.h"
#include "runtime/mem-pool.h"
@@ -790,7 +791,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
// Codegen for materializing parsed data into tuples. The function WriteCompleteTuple is
// handcrafted using the IRBuilder for the specific tuple description. This function
// is then injected into the cross-compiled driving function, WriteAlignedTuples().
-Status HdfsTextScanner::Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+Status HdfsTextScanner::Codegen(HdfsScanPlanNode* node, FragmentState* state,
llvm::Function** write_aligned_tuples_fn) {
*write_aligned_tuples_fn = nullptr;
DCHECK(state->ShouldCodegen());
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index cbfbce8..d74db9f 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -61,7 +61,7 @@ class HdfsTextScanner : public HdfsScanner {
/// Codegen WriteAlignedTuples(). Stores the resulting function in
/// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
- static Status Codegen(HdfsScanPlanNode* node, RuntimeState* state,
+ static Status Codegen(HdfsScanPlanNode* node, FragmentState* state,
llvm::Function** write_aligned_tuples_fn);
/// Return true if we have builtin support for scanning text files compressed with this
diff --git a/be/src/exec/join-builder.cc b/be/src/exec/join-builder.cc
index 504c7db..880dbd1 100644
--- a/be/src/exec/join-builder.cc
+++ b/be/src/exec/join-builder.cc
@@ -25,7 +25,7 @@
namespace impala {
Status JoinBuilderConfig::Init(
- const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
join_node_id_ = tsink.join_build_sink.dest_node_id;
join_op_ = tsink.join_build_sink.join_op;
diff --git a/be/src/exec/join-builder.h b/be/src/exec/join-builder.h
index 65e63f0..ef7b7a8 100644
--- a/be/src/exec/join-builder.h
+++ b/be/src/exec/join-builder.h
@@ -37,7 +37,7 @@ class JoinBuilderConfig : public DataSinkConfig {
friend class PhjBuilder;
Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
- RuntimeState* state) override;
+ FragmentState* state) override;
/// The ID of the join plan node this is associated with.
int join_node_id_;
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index efdaa0d..3b9cd11 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -37,7 +37,7 @@ DataSink* NljBuilderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
}
Status NljBuilderConfig::Init(
- const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
RETURN_IF_ERROR(JoinBuilderConfig::Init(tsink, input_row_desc, state));
return Status::OK();
}
@@ -75,6 +75,7 @@ Status NljBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker)
}
Status NljBuilder::Open(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSink::Open(state));
return Status::OK();
}
diff --git a/be/src/exec/nested-loop-join-builder.h b/be/src/exec/nested-loop-join-builder.h
index b66fd23..bcaecbd 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -37,7 +37,7 @@ class NljBuilderConfig : public JoinBuilderConfig {
protected:
Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
- RuntimeState* state) override;
+ FragmentState* state) override;
};
/// Builder for the NestedLoopJoinNode that accumulates the build-side rows for the join.
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index dc8cc60..185e241 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -37,7 +37,7 @@
using namespace impala;
using namespace strings;
-Status NestedLoopJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status NestedLoopJoinPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(BlockingJoinPlanNode::Init(tnode, state));
DCHECK(tnode.join_node.__isset.nested_loop_join_node);
// join_conjunct_evals_ are evaluated in the context of rows assembled from
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index 437efe2..65c5db4 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -38,7 +38,7 @@ class NestedLoopJoinPlanNode : public BlockingJoinPlanNode {
/// Join conjuncts.
std::vector<ScalarExpr*> join_conjuncts_;
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc
index a2c4b0a..424143e 100644
--- a/be/src/exec/non-grouping-aggregator.cc
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -24,6 +24,7 @@
#include "exprs/agg-fn-evaluator.h"
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-pool.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -36,21 +37,21 @@
namespace impala {
NonGroupingAggregatorConfig::NonGroupingAggregatorConfig(
- const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx)
+ const TAggregator& taggregator, FragmentState* state, PlanNode* pnode, int agg_idx)
: AggregatorConfig(taggregator, state, pnode, agg_idx) {}
-Status NonGroupingAggregatorConfig::Codegen(RuntimeState* state) {
+void NonGroupingAggregatorConfig::Codegen(FragmentState* state) {
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != nullptr);
TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
- return CodegenAddBatchImpl(codegen, prefetch_mode);
+ Status status = CodegenAddBatchImpl(codegen, prefetch_mode);
+ codegen_status_msg_ = FragmentState::GenerateCodegenMsg(status.ok(), status);
}
NonGroupingAggregator::NonGroupingAggregator(
ExecNode* exec_node, ObjectPool* pool, const NonGroupingAggregatorConfig& config)
: Aggregator(
exec_node, pool, config, Substitute("NonGroupingAggregator $0", config.agg_idx_)),
- agg_config(config),
add_batch_impl_fn_(config.add_batch_impl_fn_) {}
Status NonGroupingAggregator::Prepare(RuntimeState* state) {
@@ -59,13 +60,6 @@ Status NonGroupingAggregator::Prepare(RuntimeState* state) {
return Status::OK();
}
-void NonGroupingAggregator::Codegen(RuntimeState* state) {
- // TODO: This const cast will be removed once codegen call is moved before FIS creation
- Status codegen_status =
- const_cast<NonGroupingAggregatorConfig&>(agg_config).Codegen(state);
- runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
-}
-
Status NonGroupingAggregator::Open(RuntimeState* state) {
RETURN_IF_ERROR(Aggregator::Open(state));
diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h
index 33b0091..95f1695 100644
--- a/be/src/exec/non-grouping-aggregator.h
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -30,6 +30,7 @@ class AggFnEvaluator;
class AggregationPlanNode;
class DescriptorTbl;
class ExecNode;
+class FragmentState;
class LlvmCodeGen;
class NonGroupingAggregator;
class ObjectPool;
@@ -40,9 +41,9 @@ class Tuple;
class NonGroupingAggregatorConfig : public AggregatorConfig {
public:
- NonGroupingAggregatorConfig(const TAggregator& taggregator, RuntimeState* state,
+ NonGroupingAggregatorConfig(const TAggregator& taggregator, FragmentState* state,
PlanNode* pnode, int agg_idx);
- Status Codegen(RuntimeState* state) override;
+ void Codegen(FragmentState* state) override;
~NonGroupingAggregatorConfig() override {}
typedef Status (*AddBatchImplFn)(NonGroupingAggregator*, RowBatch*);
@@ -70,7 +71,6 @@ class NonGroupingAggregator : public Aggregator {
ExecNode* exec_node, ObjectPool* pool, const NonGroupingAggregatorConfig& config);
virtual Status Prepare(RuntimeState* state) override;
- virtual void Codegen(RuntimeState* state) override;
virtual Status Open(RuntimeState* state) override;
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override {
@@ -97,10 +97,6 @@ class NonGroupingAggregator : public Aggregator {
virtual void DebugString(int indentation_level, std::stringstream* out) const override;
private:
- /// TODO: Remove reference once codegen is performed before FIS creation.
- /// Reference to the config object and only used to call Codegen().
- const NonGroupingAggregatorConfig& agg_config;
-
/// MemPool used to allocate memory for 'singleton_output_tuple_'. The ownership of the
/// pool's memory is transferred to the output batch on eos. The pool should not be
/// Reset() to allow amortizing memory allocation over a series of
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index acc17f0..62cfadb 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -18,6 +18,7 @@
#include "exec/partial-sort-node.h"
#include "exec/exec-node-util.h"
+#include "runtime/fragment-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/sorted-run-merger.h"
@@ -27,7 +28,7 @@
namespace impala {
-Status PartialSortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status PartialSortPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
DCHECK(!tnode.sort_node.__isset.offset || tnode.sort_node.offset == 0);
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
const TSortInfo& tsort_info = tnode.sort_node.sort_info;
@@ -38,6 +39,7 @@ Status PartialSortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
*children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
row_comparator_config_ =
state->obj_pool()->Add(new TupleRowComparatorConfig(tsort_info, ordering_exprs_));
+ state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK();
}
@@ -78,26 +80,16 @@ Status PartialSortNode::Prepare(RuntimeState* state) {
runtime_profile(), state, label(), false));
RETURN_IF_ERROR(sorter_->Prepare(pool_));
DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
- state->CheckAndAddCodegenDisabledMessage(runtime_profile());
input_batch_.reset(
new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
return Status::OK();
}
-void PartialSortNode::Codegen(RuntimeState* state) {
+void PartialSortPlanNode::Codegen(FragmentState* state) {
DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
+ PlanNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
- const PartialSortPlanNode& partial_sort_pnode =
- static_cast<const PartialSortPlanNode&>(plan_node_);
- PartialSortPlanNode& non_const_pnode =
- const_cast<PartialSortPlanNode&>(partial_sort_pnode);
- non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void PartialSortPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
- Status codegen_status = row_comparator_config_->Codegen(state);
- runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
+ AddCodegenStatus(row_comparator_config_->Codegen(state));
}
Status PartialSortNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index a6a59eb..0a3fb92 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -25,10 +25,10 @@ namespace impala {
class PartialSortPlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
- void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+ virtual void Codegen(FragmentState* state) override;
~PartialSortPlanNode(){}
@@ -68,7 +68,6 @@ class PartialSortNode : public ExecNode {
~PartialSortNode();
virtual Status Prepare(RuntimeState* state);
- virtual void Codegen(RuntimeState* state);
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 1bee0e9..b719de4 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -28,6 +28,7 @@
#include "exprs/scalar-expr.h"
#include "runtime/buffered-tuple-stream.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/row-batch.h"
@@ -71,7 +72,7 @@ PhjBuilder* PhjBuilderConfig::CreateSink(BufferPool::ClientHandle* buffer_pool_c
max_row_buffer_size, state));
}
-Status PhjBuilderConfig::CreateConfig(RuntimeState* state, int join_node_id,
+Status PhjBuilderConfig::CreateConfig(FragmentState* state, int join_node_id,
TJoinOp::type join_op, const RowDescriptor* build_row_desc,
const std::vector<TEqJoinCondition>& eq_join_conjuncts,
const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
@@ -91,7 +92,7 @@ void PhjBuilderConfig::Close() {
DataSinkConfig::Close();
}
-Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
+Status PhjBuilderConfig::InitExprsAndFilters(FragmentState* state,
const vector<TEqJoinCondition>& eq_join_conjuncts,
const vector<TRuntimeFilterDesc>& filter_descs) {
for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
@@ -107,10 +108,13 @@ Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
filter_desc.is_broadcast_join || state->query_options().num_nodes == 1);
DCHECK(!state->query_options().disable_row_runtime_filtering ||
filter_desc.applied_on_partition_columns);
- // Skip over filters that are not produced by this instance of the join, i.e.
+ // Skip over filters that are not produced by the instances of the builder, i.e.
// broadcast filters where this instance was not selected as a filter producer.
- const vector<TRuntimeFilterSource> filters_produced =
- state->instance_ctx().filters_produced;
+ const vector<const TPlanFragmentInstanceCtx*>& instance_ctxs = state->instance_ctxs();
+ // We can pick any instance since the filters produced should be the same for all
+ // instances.
+ const vector<TRuntimeFilterSource>& filters_produced =
+ instance_ctxs[0]->filters_produced;
auto it = std::find_if(filters_produced.begin(), filters_produced.end(),
[this, &filter_desc](const TRuntimeFilterSource f) {
return f.src_node_id == join_node_id_ && f.filter_id == filter_desc.filter_id;
@@ -126,14 +130,14 @@ Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
hash_table_config_ = state->obj_pool()->Add(new HashTableConfig(build_exprs_,
build_exprs_, PhjBuilder::HashTableStoresNulls(join_op_, is_not_distinct_from_),
is_not_distinct_from_));
+ state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK();
}
-Status PhjBuilderConfig::Init(RuntimeState* state, int join_node_id,
+Status PhjBuilderConfig::Init(FragmentState* state, int join_node_id,
TJoinOp::type join_op, const RowDescriptor* build_row_desc,
- const std::vector<TEqJoinCondition>& eq_join_conjuncts,
- const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
- TDataSink* tsink) {
+ const vector<TEqJoinCondition>& eq_join_conjuncts,
+ const vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed, TDataSink* tsink) {
tsink->__isset.join_build_sink = true;
tsink->join_build_sink.__set_dest_node_id(join_node_id);
tsink->join_build_sink.__set_join_op(join_op);
@@ -142,8 +146,8 @@ Status PhjBuilderConfig::Init(RuntimeState* state, int join_node_id,
return InitExprsAndFilters(state, eq_join_conjuncts, filters);
}
-Status PhjBuilderConfig::Init(
- const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+Status PhjBuilderConfig::Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+ FragmentState* state) {
RETURN_IF_ERROR(JoinBuilderConfig::Init(tsink, input_row_desc, state));
const TJoinBuildSink& build_sink = tsink.join_build_sink;
hash_seed_ = build_sink.hash_seed;
@@ -263,12 +267,12 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker)
num_hash_table_builds_skipped_ =
ADD_COUNTER(profile(), "NumHashTableBuildsSkipped", TUnit::UNIT);
repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime");
- state->CheckAndAddCodegenDisabledMessage(profile());
return Status::OK();
}
Status PhjBuilder::Open(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
+ RETURN_IF_ERROR(DataSink::Open(state));
if (!buffer_pool_client_->is_registered()) {
DCHECK(is_separate_build_) << "Client is registered by PhjNode if not separate";
DCHECK_GE(resource_profile_->min_reservation, MinReservation().second);
@@ -1226,13 +1230,7 @@ std::string PhjBuilder::Partition::DebugString() {
return ss.str();
}
-void PhjBuilder::Codegen(RuntimeState* state) {
- const PhjBuilderConfig& phj_config = static_cast<const PhjBuilderConfig&>(sink_config_);
- PhjBuilderConfig& non_const_config = const_cast<PhjBuilderConfig&>(phj_config);
- non_const_config.Codegen(state, profile());
-}
-
-void PhjBuilderConfig::Codegen(RuntimeState* state, RuntimeProfile* profile) {
+void PhjBuilderConfig::Codegen(FragmentState* state) {
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != nullptr);
Status build_codegen_status;
@@ -1266,9 +1264,8 @@ void PhjBuilderConfig::Codegen(RuntimeState* state, RuntimeProfile* profile) {
build_codegen_status = codegen_status;
insert_codegen_status = codegen_status;
}
- profile->AddCodegenMsg(build_codegen_status.ok(), build_codegen_status, "Build Side");
- profile->AddCodegenMsg(
- insert_codegen_status.ok(), insert_codegen_status, "Hash Table Construction");
+ AddCodegenStatus(build_codegen_status, "Build Side");
+ AddCodegenStatus(insert_codegen_status, "Hash Table Construction");
}
string PhjBuilder::DebugString() const {
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index c009211..22a4127 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -63,15 +63,14 @@ class PhjBuilderConfig : public JoinBuilderConfig {
/// Creates a PhjBuilderConfig for embedded use within a PartitionedHashJoinNode.
/// Creates the object in the state's object pool. To be used only by
/// PartitionedHashJoinPlanNode.
- static Status CreateConfig(RuntimeState* state, int join_node_id, TJoinOp::type join_op,
- const RowDescriptor* build_row_desc,
+ static Status CreateConfig(FragmentState* state, int join_node_id,
+ TJoinOp::type join_op, const RowDescriptor* build_row_desc,
const std::vector<TEqJoinCondition>& eq_join_conjuncts,
const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
PhjBuilderConfig** sink);
void Close() override;
-
- void Codegen(RuntimeState* state, RuntimeProfile* profile);
+ void Codegen(FragmentState* state) override;
~PhjBuilderConfig() override {}
@@ -117,13 +116,13 @@ class PhjBuilderConfig : public JoinBuilderConfig {
protected:
/// Initialization for separate sink.
Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
- RuntimeState* state) override;
+ FragmentState* state) override;
private:
/// Helper method used by CreateConfig() to initialize embedded builder.
/// 'tsink' does not need to be initialized by the caller - all values to be used are
/// passed in as arguments and this function fills in required fields in 'tsink'.
- Status Init(RuntimeState* state, int join_node_id, TJoinOp::type join_op,
+ Status Init(FragmentState* state, int join_node_id, TJoinOp::type join_op,
const RowDescriptor* build_row_desc,
const std::vector<TEqJoinCondition>& eq_join_conjuncts,
const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
@@ -132,7 +131,7 @@ class PhjBuilderConfig : public JoinBuilderConfig {
/// Initializes the build and filter expressions, creates a copy of the filter
/// descriptors that will be generated by this sink and initializes the hash table
/// config object.
- Status InitExprsAndFilters(RuntimeState* state,
+ Status InitExprsAndFilters(FragmentState* state,
const std::vector<TEqJoinCondition>& eq_join_conjuncts,
const std::vector<TRuntimeFilterDesc>& filters);
@@ -150,8 +149,8 @@ class PhjBuilderConfig : public JoinBuilderConfig {
/// Codegen inserting rows into runtime filters. Identical signature to
/// InsertRuntimeFilters(). Returns non-OK if codegen was not possible.
- Status CodegenInsertRuntimeFilters(
- LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, llvm::Function** fn);
+ Status CodegenInsertRuntimeFilters(LlvmCodeGen* codegen,
+ const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn);
};
/// See partitioned-hash-join-node.h for explanation of the top-level algorithm and how
@@ -272,11 +271,6 @@ class PhjBuilder : public JoinBuilder {
virtual Status FlushFinal(RuntimeState* state) override;
virtual void Close(RuntimeState* state) override;
- /// Does all codegen for the builder (if codegen is enabled).
- /// Updates the the builder's runtime profile with info about whether any errors
- /// occured during codegen.
- virtual void Codegen(RuntimeState* state) override;
-
/////////////////////////////////////////
// The following functions are used only by PartitionedHashJoinNode.
/////////////////////////////////////////
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 5f53f1d..bdc5a4e 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -29,6 +29,7 @@
#include "exprs/scalar-expr.h"
#include "exprs/slot-ref.h"
#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -48,7 +49,8 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
using namespace impala;
using strings::Substitute;
-Status PartitionedHashJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status PartitionedHashJoinPlanNode::Init(
+ const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(BlockingJoinPlanNode::Init(tnode, state));
DCHECK(tnode.__isset.join_node);
DCHECK(tnode.join_node.__isset.hash_join_node);
@@ -83,10 +85,11 @@ Status PartitionedHashJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* s
// Create the config always. It is only used if UseSeparateBuild() is true, but in
// Init(), IsInSubplan() isn't available yet.
// TODO: simplify this by ensuring that UseSeparateBuild() is accurate in Init().
- RETURN_IF_ERROR(PhjBuilderConfig::CreateConfig(state, tnode_->node_id,
- tnode_->join_node.join_op, &build_row_desc(), eq_join_conjuncts,
- tnode_->runtime_filters, tnode_->join_node.hash_join_node.hash_seed,
- &phj_builder_config));
+ RETURN_IF_ERROR(
+ PhjBuilderConfig::CreateConfig(state, tnode_->node_id, tnode_->join_node.join_op,
+ &build_row_desc(), eq_join_conjuncts, tnode_->runtime_filters,
+ tnode_->join_node.hash_join_node.hash_seed, &phj_builder_config_));
+ state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK();
}
@@ -94,7 +97,7 @@ void PartitionedHashJoinPlanNode::Close() {
ScalarExpr::Close(probe_exprs_);
ScalarExpr::Close(build_exprs_);
ScalarExpr::Close(other_join_conjuncts_);
- if (phj_builder_config != nullptr) phj_builder_config->Close();
+ if (phj_builder_config_ != nullptr) phj_builder_config_->Close();
PlanNode::Close();
}
@@ -129,7 +132,7 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
runtime_state_ = state;
if (!UseSeparateBuild(state->query_options())) {
const PhjBuilderConfig& builder_config =
- *static_cast<const PartitionedHashJoinPlanNode&>(plan_node_).phj_builder_config;
+ *static_cast<const PartitionedHashJoinPlanNode&>(plan_node_).phj_builder_config_;
builder_ = builder_config.CreateSink(buffer_pool_client(),
resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size,
state);
@@ -158,14 +161,13 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
num_probe_rows_partitioned_ =
ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
- state->CheckAndAddCodegenDisabledMessage(runtime_profile());
return Status::OK();
}
-void PartitionedHashJoinNode::Codegen(RuntimeState* state) {
+void PartitionedHashJoinPlanNode::Codegen(FragmentState* state) {
DCHECK(state->ShouldCodegen());
- // Codegen the children node;
- ExecNode::Codegen(state);
+ // Codegen the children nodes.
+ PlanNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
LlvmCodeGen* codegen = state->codegen();
@@ -173,27 +175,12 @@ void PartitionedHashJoinNode::Codegen(RuntimeState* state) {
// Codegen the build side (if integrated into this join node).
if (!UseSeparateBuild(state->query_options())) {
- // TODO: invoke codegen on the PhjBuilderConfig obj inside
- // PartitionedHashJoinPlanNode::Codegen() once codegen invocation is completely moved
- // to the plan nodes.
- DCHECK(builder_ != nullptr);
- builder_->Codegen(state);
+ DCHECK(phj_builder_config_ != nullptr);
+ phj_builder_config_->Codegen(state);
}
- // Codegen the probe side.
- const PartitionedHashJoinPlanNode& phj_pnode =
- static_cast<const PartitionedHashJoinPlanNode&>(plan_node_);
- PartitionedHashJoinPlanNode& non_const_pnode =
- const_cast<PartitionedHashJoinPlanNode&>(phj_pnode);
- non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void PartitionedHashJoinPlanNode::Codegen(RuntimeState* state, RuntimeProfile* profile) {
- LlvmCodeGen* codegen = state->codegen();
- DCHECK(codegen != nullptr);
TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
- Status probe_codegen_status = CodegenProcessProbeBatch(codegen, prefetch_mode);
- profile->AddCodegenMsg(probe_codegen_status.ok(), probe_codegen_status, "Probe Side");
+ AddCodegenStatus(CodegenProcessProbeBatch(codegen, prefetch_mode), "Probe Side");
}
Status PartitionedHashJoinNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index b6bcca9..4be8868 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -40,10 +40,10 @@ class TupleRow;
class PartitionedHashJoinPlanNode : public BlockingJoinPlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
- void Codegen(RuntimeState* state, RuntimeProfile* profile);
+ virtual void Codegen(FragmentState* state) override;
~PartitionedHashJoinPlanNode(){}
@@ -61,7 +61,7 @@ class PartitionedHashJoinPlanNode : public BlockingJoinPlanNode {
/// Data sink config object for creating a phj builder that will be eventually used by
/// the exec node.
- PhjBuilderConfig* phj_builder_config;
+ PhjBuilderConfig* phj_builder_config_;
/// Seed used for hashing rows.
uint32_t hash_seed_;
@@ -170,7 +170,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
virtual ~PartitionedHashJoinNode();
virtual Status Prepare(RuntimeState* state) override;
- virtual void Codegen(RuntimeState* state) override;
virtual Status Open(RuntimeState* state) override;
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 1615cad..6d9b9b2 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -27,8 +27,8 @@
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "runtime/blocking-row-batch-queue.h"
+#include "runtime/fragment-state.h"
#include "runtime/io/disk-io-mgr.h"
-#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
@@ -98,7 +98,7 @@ PROFILE_DEFINE_HIGH_WATER_MARK_COUNTER(PeakScannerThreadConcurrency, STABLE_LOW,
const string ScanNode::SCANNER_THREAD_COUNTERS_PREFIX = "ScannerThreads";
-Status ScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status ScanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
const TQueryOptions& query_options = state->query_options();
for (const TRuntimeFilterDesc& filter_desc : tnode.runtime_filters) {
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 23aaa88..6978f07 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -31,7 +31,7 @@ class TScanRange;
class ScanPlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
};
diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc
index 9e96a69..50cd3cd 100644
--- a/be/src/exec/select-node.cc
+++ b/be/src/exec/select-node.cc
@@ -22,6 +22,7 @@
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "gen-cpp/PlanNodes_types.h"
+#include "runtime/fragment-state.h"
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -51,21 +52,14 @@ Status SelectNode::Prepare(RuntimeState* state) {
return Status::OK();
}
-void SelectNode::Codegen(RuntimeState* state) {
+void SelectPlanNode::Codegen(FragmentState* state) {
DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
+ PlanNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
- const SelectPlanNode& select_pnode = static_cast<const SelectPlanNode&>(plan_node_);
- SelectPlanNode& non_const_pnode = const_cast<SelectPlanNode&>(select_pnode);
- non_const_pnode.Codegen(state, runtime_profile());
+ AddCodegenStatus(CodegenCopyRows(state));
}
-void SelectPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
- Status codegen_status = CodegenCopyRows(state);
- runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
-}
-
-Status SelectPlanNode::CodegenCopyRows(RuntimeState* state) {
+Status SelectPlanNode::CodegenCopyRows(FragmentState* state) {
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != nullptr);
llvm::Function* copy_rows_fn =
diff --git a/be/src/exec/select-node.h b/be/src/exec/select-node.h
index 74818d9..4104318 100644
--- a/be/src/exec/select-node.h
+++ b/be/src/exec/select-node.h
@@ -34,7 +34,7 @@ class TupleRow;
class SelectPlanNode : public PlanNode {
public:
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
- void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+ virtual void Codegen(FragmentState* state) override;
~SelectPlanNode(){}
@@ -44,7 +44,7 @@ class SelectPlanNode : public PlanNode {
private:
/// Codegen SelectNode::CopyRows().
- Status CodegenCopyRows(RuntimeState* state);
+ Status CodegenCopyRows(FragmentState* state);
};
/// Node that evaluates conjuncts and enforces a limit but otherwise passes along
@@ -55,7 +55,6 @@ class SelectNode : public ExecNode {
SelectNode(ObjectPool* pool, const SelectPlanNode& pnode, const DescriptorTbl& descs);
virtual Status Prepare(RuntimeState* state) override;
- virtual void Codegen(RuntimeState* state) override;
virtual Status Open(RuntimeState* state) override;
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 833010a..e44b72c 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -18,6 +18,7 @@
#include "exec/sort-node.h"
#include "exec/exec-node-util.h"
+#include "runtime/fragment-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/sorted-run-merger.h"
@@ -27,7 +28,7 @@
namespace impala {
-Status SortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status SortPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
const TSortInfo& tsort_info = tnode.sort_node.sort_info;
RETURN_IF_ERROR(ScalarExpr::Create(
@@ -37,6 +38,7 @@ Status SortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
*children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
row_comparator_config_ =
state->obj_pool()->Add(new TupleRowComparatorConfig(tsort_info, ordering_exprs_));
+ state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK();
}
@@ -74,22 +76,14 @@ Status SortNode::Prepare(RuntimeState* state) {
resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), true));
RETURN_IF_ERROR(sorter_->Prepare(pool_));
DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
- state->CheckAndAddCodegenDisabledMessage(runtime_profile());
return Status::OK();
}
-void SortNode::Codegen(RuntimeState* state) {
+void SortPlanNode::Codegen(FragmentState* state) {
DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
+ PlanNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
- const SortPlanNode& sort_pnode = static_cast<const SortPlanNode&>(plan_node_);
- SortPlanNode& non_const_pnode = const_cast<SortPlanNode&>(sort_pnode);
- non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void SortPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
- Status codegen_status = row_comparator_config_->Codegen(state);
- runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
+ AddCodegenStatus(row_comparator_config_->Codegen(state));
}
Status SortNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index a5d4bee..b2d8cda 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -25,10 +25,10 @@ namespace impala {
class SortPlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
- void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+ virtual void Codegen(FragmentState* state) override;
~SortPlanNode(){}
@@ -59,7 +59,6 @@ class SortNode : public ExecNode {
~SortNode();
virtual Status Prepare(RuntimeState* state);
- virtual void Codegen(RuntimeState* state);
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
diff --git a/be/src/exec/subplan-node.cc b/be/src/exec/subplan-node.cc
index 85addfd..26b7b4c 100644
--- a/be/src/exec/subplan-node.cc
+++ b/be/src/exec/subplan-node.cc
@@ -27,7 +27,7 @@
namespace impala {
-Status SubplanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status SubplanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
DCHECK_EQ(children_.size(), 2);
RETURN_IF_ERROR(SetContainingSubplan(state, this, children_[1]));
@@ -35,7 +35,7 @@ Status SubplanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
}
Status SubplanPlanNode::SetContainingSubplan(
- RuntimeState* state, SubplanPlanNode* ancestor, PlanNode* node) {
+ FragmentState* state, SubplanPlanNode* ancestor, PlanNode* node) {
node->containing_subplan_ = ancestor;
if (node->tnode_->node_type == TPlanNodeType::SUBPLAN_NODE) {
// Only traverse the first child and not the second one, because the Subplan
diff --git a/be/src/exec/subplan-node.h b/be/src/exec/subplan-node.h
index 6517df8..c8201e9 100644
--- a/be/src/exec/subplan-node.h
+++ b/be/src/exec/subplan-node.h
@@ -27,7 +27,7 @@ class TupleRow;
class SubplanPlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
/// Sets 'ancestor' as the containing Subplan in all plan nodes inside the plan-node
@@ -35,7 +35,7 @@ class SubplanPlanNode : public PlanNode {
/// setting the subplan. Doesn't traverse the second child of SubplanPlanNodes within
/// 'node'.
Status SetContainingSubplan(
- RuntimeState* state, SubplanPlanNode* ancestor, PlanNode* node);
+ FragmentState* state, SubplanPlanNode* ancestor, PlanNode* node);
~SubplanPlanNode(){}
};
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 3108040..276e08d 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -24,6 +24,7 @@
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
@@ -41,7 +42,7 @@
using std::priority_queue;
using namespace impala;
-Status TopNPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status TopNPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
const TSortInfo& tsort_info = tnode.sort_node.sort_info;
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
RETURN_IF_ERROR(ScalarExpr::Create(
@@ -53,6 +54,7 @@ Status TopNPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
row_comparator_config_ =
state->obj_pool()->Add(new TupleRowComparatorConfig(tsort_info, ordering_exprs_));
DCHECK_EQ(conjuncts_.size(), 0) << "TopNNode should never have predicates to evaluate.";
+ state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK();
}
@@ -90,22 +92,15 @@ Status TopNNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_tuple_exprs_, state, pool_,
expr_perm_pool(), expr_results_pool(), &output_tuple_expr_evals_));
insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime");
- state->CheckAndAddCodegenDisabledMessage(runtime_profile());
tuple_pool_reclaim_counter_ = ADD_COUNTER(runtime_profile(), "TuplePoolReclamations",
TUnit::UNIT);
return Status::OK();
}
-void TopNNode::Codegen(RuntimeState* state) {
+void TopNPlanNode::Codegen(FragmentState* state) {
DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
+ PlanNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
- const TopNPlanNode& topn_pnode = static_cast<const TopNPlanNode&>(plan_node_);
- TopNPlanNode& non_const_pnode = const_cast<TopNPlanNode&>(topn_pnode);
- non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void TopNPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile) {
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != NULL);
@@ -146,7 +141,7 @@ void TopNPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile)
}
}
}
- runtime_profile->AddCodegenMsg(codegen_status.ok(), codegen_status);
+ AddCodegenStatus(codegen_status);
}
Status TopNNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index 24be9fa..57dde08 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -36,10 +36,10 @@ class Tuple;
class TopNPlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
- void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+ virtual void Codegen(FragmentState* state) override;
~TopNPlanNode(){}
@@ -71,7 +71,6 @@ class TopNNode : public ExecNode {
TopNNode(ObjectPool* pool, const TopNPlanNode& pnode, const DescriptorTbl& descs);
virtual Status Prepare(RuntimeState* state);
- virtual void Codegen(RuntimeState* state);
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index a929748..88b6351 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -22,6 +22,7 @@
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "gen-cpp/PlanNodes_types.h"
+#include "runtime/fragment-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
@@ -32,7 +33,7 @@
using namespace impala;
-Status UnionPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status UnionPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
DCHECK(tnode_->__isset.union_node);
DCHECK_EQ(conjuncts_.size(), 0);
@@ -116,16 +117,10 @@ Status UnionNode::Prepare(RuntimeState* state) {
return Status::OK();
}
-void UnionNode::Codegen(RuntimeState* state) {
+void UnionPlanNode::Codegen(FragmentState* state){
DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
+ PlanNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
- const UnionPlanNode& union_pnode = static_cast<const UnionPlanNode&>(plan_node_);
- UnionPlanNode& non_const_pnode = const_cast<UnionPlanNode&>(union_pnode);
- non_const_pnode.Codegen(state, runtime_profile());
-}
-
-void UnionPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile){
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != nullptr);
std::stringstream codegen_message;
@@ -161,8 +156,7 @@ void UnionPlanNode::Codegen(RuntimeState* state, RuntimeProfile* runtime_profile
codegen->AddFunctionToJit(union_materialize_batch_fn,
reinterpret_cast<void**>(&(codegend_union_materialize_batch_fns_.data()[i])));
}
- runtime_profile->AddCodegenMsg(
- codegen_status.ok(), codegen_status, codegen_message.str());
+ AddCodegenStatus(codegen_status, codegen_message.str());
}
Status UnionNode::Open(RuntimeState* state) {
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index 04bdcf4..96aca8b 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -39,10 +39,10 @@ class UnionNode;
class UnionPlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
- void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
+ virtual void Codegen(FragmentState* state) override;
~UnionPlanNode(){}
@@ -89,7 +89,6 @@ class UnionNode : public ExecNode {
UnionNode(ObjectPool* pool, const UnionPlanNode& pnode, const DescriptorTbl& descs);
virtual Status Prepare(RuntimeState* state);
- virtual void Codegen(RuntimeState* state);
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index a20be34..82907da 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -22,6 +22,7 @@
#include "exec/subplan-node.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/slot-ref.h"
+#include "runtime/fragment-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
@@ -31,7 +32,7 @@ namespace impala {
const CollectionValue UnnestNode::EMPTY_COLLECTION_VALUE;
-Status UnnestPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status UnnestPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
DCHECK(tnode.__isset.unnest_node);
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
return Status::OK();
@@ -42,7 +43,7 @@ void UnnestPlanNode::Close() {
PlanNode::Close();
}
-Status UnnestPlanNode::InitCollExpr(RuntimeState* state) {
+Status UnnestPlanNode::InitCollExpr(FragmentState* state) {
DCHECK(containing_subplan_ != nullptr)
<< "set_containing_subplan() must have been called";
const RowDescriptor& row_desc = *containing_subplan_->children_[0]->row_descriptor_;
diff --git a/be/src/exec/unnest-node.h b/be/src/exec/unnest-node.h
index 0b8bf6d..8045fab 100644
--- a/be/src/exec/unnest-node.h
+++ b/be/src/exec/unnest-node.h
@@ -28,12 +28,12 @@ class TupleDescriptor;
class UnnestPlanNode : public PlanNode {
public:
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+ virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
virtual void Close() override;
virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
/// Initializes the expression which produces the collection to be unnested.
/// Called by the containing subplan plan-node.
- Status InitCollExpr(RuntimeState* state);
+ Status InitCollExpr(FragmentState* state);
~UnnestPlanNode(){}
diff --git a/be/src/exprs/agg-fn.cc b/be/src/exprs/agg-fn.cc
index 2c619f9..8eaacdd 100644
--- a/be/src/exprs/agg-fn.cc
+++ b/be/src/exprs/agg-fn.cc
@@ -21,6 +21,7 @@
#include "exprs/anyval-util.h"
#include "exprs/scalar-expr.h"
#include "runtime/descriptors.h"
+#include "runtime/fragment-state.h"
#include "runtime/lib-cache.h"
#include "common/names.h"
@@ -60,7 +61,7 @@ AggFn::AggFn(const TExprNode& tnode, const SlotDescriptor& intermediate_slot_des
}
}
-Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) {
+Status AggFn::Init(const RowDescriptor& row_desc, FragmentState* state) {
// Initialize all children (i.e. input exprs to this aggregate expr).
for (ScalarExpr* input_expr : children()) {
RETURN_IF_ERROR(input_expr->Init(row_desc, /*is_entry_point*/ false, state));
@@ -116,7 +117,7 @@ Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) {
Status AggFn::Create(const TExpr& texpr, const RowDescriptor& row_desc,
const SlotDescriptor& intermediate_slot_desc, const SlotDescriptor& output_slot_desc,
- RuntimeState* state, AggFn** agg_fn) {
+ FragmentState* state, AggFn** agg_fn) {
*agg_fn = nullptr;
ObjectPool* pool = state->obj_pool();
const TExprNode& texpr_node = texpr.nodes[0];
diff --git a/be/src/exprs/agg-fn.h b/be/src/exprs/agg-fn.h
index c97effe..d9f6ef2 100644
--- a/be/src/exprs/agg-fn.h
+++ b/be/src/exprs/agg-fn.h
@@ -31,6 +31,7 @@ namespace impala {
using impala_udf::FunctionContext;
+class FragmentState;
class LlvmCodeGen;
class MemPool;
class MemTracker;
@@ -103,8 +104,8 @@ class AggFn : public Expr {
/// of the output value. On failure, returns error status and sets 'agg_fn' to NULL.
static Status Create(const TExpr& texpr, const RowDescriptor& row_desc,
const SlotDescriptor& intermediate_slot_desc,
- const SlotDescriptor& output_slot_desc, RuntimeState* state, AggFn** agg_fn)
- WARN_UNUSED_RESULT;
+ const SlotDescriptor& output_slot_desc, FragmentState* state,
+ AggFn** agg_fn) WARN_UNUSED_RESULT;
bool is_merge() const { return is_merge_; }
AggregationOp agg_op() const { return agg_op_; }
@@ -176,7 +177,7 @@ class AggFn : public Expr {
/// Initializes the AggFn and its input expressions. May load the UDAF from LibCache
/// if necessary.
- virtual Status Init(const RowDescriptor& desc, RuntimeState* state) WARN_UNUSED_RESULT;
+ virtual Status Init(const RowDescriptor& desc, FragmentState* state) WARN_UNUSED_RESULT;
};
}
diff --git a/be/src/exprs/expr-codegen-test.cc b/be/src/exprs/expr-codegen-test.cc
index e1c5c50..c5d95a8 100644
--- a/be/src/exprs/expr-codegen-test.cc
+++ b/be/src/exprs/expr-codegen-test.cc
@@ -18,6 +18,8 @@
// The following is cross-compiled to native code and IR, and used in the test below
#include "exprs/decimal-operators.h"
#include "exprs/scalar-expr.h"
+#include "runtime/fragment-state.h"
+#include "runtime/query-state.h"
#include "udf/udf.h"
#ifdef IR_COMPILE
@@ -88,6 +90,7 @@ class ExprCodegenTest : public ::testing::Test {
protected:
scoped_ptr<TestEnv> test_env_;
RuntimeState* runtime_state_;
+ FragmentState* fragment_state_;
FunctionContext* fn_ctx_;
FnAttr fn_type_attr_;
@@ -101,8 +104,8 @@ class ExprCodegenTest : public ::testing::Test {
}
Status CreateFromFile(const string& filename, scoped_ptr<LlvmCodeGen>* codegen) {
- RETURN_IF_ERROR(LlvmCodeGen::CreateFromFile(runtime_state_,
- runtime_state_->obj_pool(), NULL, filename, "test", codegen));
+ RETURN_IF_ERROR(LlvmCodeGen::CreateFromFile(fragment_state_,
+ fragment_state_->obj_pool(), NULL, filename, "test", codegen));
return (*codegen)->MaterializeModule();
}
@@ -113,6 +116,9 @@ class ExprCodegenTest : public ::testing::Test {
test_env_.reset(new TestEnv());
ASSERT_OK(test_env_->Init());
ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_));
+ QueryState* qs = runtime_state_->query_state();
+ TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx());
+ fragment_state_ = qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx));
FunctionContext::TypeDesc return_type;
return_type.type = FunctionContext::TYPE_DECIMAL;
@@ -145,7 +151,9 @@ class ExprCodegenTest : public ::testing::Test {
virtual void TearDown() {
fn_ctx_->impl()->Close();
delete fn_ctx_;
- runtime_state_ = NULL;
+ fragment_state_->ReleaseResources();
+ fragment_state_ = nullptr;
+ runtime_state_ = nullptr;
test_env_.reset();
}
@@ -308,7 +316,7 @@ TEST_F(ExprCodegenTest, TestInlineConstFnAttrs) {
// Create Expr
MemTracker tracker;
ScalarExpr* expr;
- ASSERT_OK(ScalarExpr::Create(texpr, RowDescriptor(), runtime_state_, &expr));
+ ASSERT_OK(ScalarExpr::Create(texpr, RowDescriptor(), fragment_state_, &expr));
// Get TestGetFnAttrs() IR function
stringstream test_udf_file;
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index f8a851f..8f8399c 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -175,7 +175,7 @@ Status HiveUdfCall::InitEnv() {
}
Status HiveUdfCall::Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
// Initialize children first.
RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h
index 7cf9ae6..19109b9 100644
--- a/be/src/exprs/hive-udf-call.h
+++ b/be/src/exprs/hive-udf-call.h
@@ -91,7 +91,7 @@ class HiveUdfCall : public ScalarExpr {
HiveUdfCall(const TExprNode& node);
virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
- RuntimeState* state) override WARN_UNUSED_RESULT;
+ FragmentState* state) override WARN_UNUSED_RESULT;
virtual Status OpenEvaluator(FunctionContext::FunctionStateScope scope,
RuntimeState* state, ScalarExprEvaluator* eval) const override WARN_UNUSED_RESULT;
virtual void CloseEvaluator(FunctionContext::FunctionStateScope scope,
diff --git a/be/src/exprs/is-not-empty-predicate.cc b/be/src/exprs/is-not-empty-predicate.cc
index a51ad36..7627fdd 100644
--- a/be/src/exprs/is-not-empty-predicate.cc
+++ b/be/src/exprs/is-not-empty-predicate.cc
@@ -43,7 +43,7 @@ BooleanVal IsNotEmptyPredicate::GetBooleanValInterpreted(
}
Status IsNotEmptyPredicate::Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
DCHECK_EQ(children_.size(), 1);
return Status::OK();
diff --git a/be/src/exprs/is-not-empty-predicate.h b/be/src/exprs/is-not-empty-predicate.h
index 313252c..b6a41fb 100644
--- a/be/src/exprs/is-not-empty-predicate.h
+++ b/be/src/exprs/is-not-empty-predicate.h
@@ -41,7 +41,7 @@ class IsNotEmptyPredicate : public Predicate {
friend class ScalarExpr;
virtual Status Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) override;
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) override;
explicit IsNotEmptyPredicate(const TExprNode& node);
};
diff --git a/be/src/exprs/kudu-partition-expr.cc b/be/src/exprs/kudu-partition-expr.cc
index 3fa69ef..6b351e6 100644
--- a/be/src/exprs/kudu-partition-expr.cc
+++ b/be/src/exprs/kudu-partition-expr.cc
@@ -22,7 +22,7 @@
#include "exec/kudu-util.h"
#include "exprs/scalar-expr-evaluator.h"
#include "runtime/exec-env.h"
-#include "runtime/query-state.h"
+#include "runtime/fragment-state.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "runtime/tuple.h"
@@ -34,7 +34,7 @@ KuduPartitionExpr::KuduPartitionExpr(const TExprNode& node)
: ScalarExpr(node), tkudu_partition_expr_(node.kudu_partition_expr) {}
Status KuduPartitionExpr::Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
DCHECK_EQ(tkudu_partition_expr_.referenced_columns.size(), children_.size());
diff --git a/be/src/exprs/kudu-partition-expr.h b/be/src/exprs/kudu-partition-expr.h
index 35722cc..9f90e53 100644
--- a/be/src/exprs/kudu-partition-expr.h
+++ b/be/src/exprs/kudu-partition-expr.h
@@ -41,7 +41,7 @@ class KuduPartitionExpr : public ScalarExpr {
KuduPartitionExpr(const TExprNode& node);
virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
- RuntimeState* state) override WARN_UNUSED_RESULT;
+ FragmentState* state) override WARN_UNUSED_RESULT;
virtual IntVal GetIntValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const override;
diff --git a/be/src/exprs/scalar-expr.cc b/be/src/exprs/scalar-expr.cc
index b634f9e..ad5d4b3 100644
--- a/be/src/exprs/scalar-expr.cc
+++ b/be/src/exprs/scalar-expr.cc
@@ -46,6 +46,7 @@
#include "exprs/udf-builtins.h"
#include "exprs/utility-functions.h"
#include "exprs/valid-tuple-id.h"
+#include "runtime/fragment-state.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "runtime/tuple.h"
@@ -76,7 +77,7 @@ ScalarExpr::ScalarExpr(const TExprNode& node)
}
Status ScalarExpr::Create(const TExpr& texpr, const RowDescriptor& row_desc,
- RuntimeState* state, ObjectPool* pool, ScalarExpr** scalar_expr) {
+ FragmentState* state, ObjectPool* pool, ScalarExpr** scalar_expr) {
*scalar_expr = nullptr;
ScalarExpr* root;
RETURN_IF_ERROR(CreateNode(texpr.nodes[0], pool, &root));
@@ -98,7 +99,7 @@ Status ScalarExpr::Create(const TExpr& texpr, const RowDescriptor& row_desc,
}
Status ScalarExpr::Create(const vector<TExpr>& texprs, const RowDescriptor& row_desc,
- RuntimeState* state, ObjectPool* pool, vector<ScalarExpr*>* exprs) {
+ FragmentState* state, ObjectPool* pool, vector<ScalarExpr*>* exprs) {
exprs->clear();
for (const TExpr& texpr: texprs) {
ScalarExpr* expr;
@@ -110,12 +111,12 @@ Status ScalarExpr::Create(const vector<TExpr>& texprs, const RowDescriptor& row_
}
Status ScalarExpr::Create(const TExpr& texpr, const RowDescriptor& row_desc,
- RuntimeState* state, ScalarExpr** scalar_expr) {
+ FragmentState* state, ScalarExpr** scalar_expr) {
return ScalarExpr::Create(texpr, row_desc, state, state->obj_pool(), scalar_expr);
}
Status ScalarExpr::Create(const vector<TExpr>& texprs, const RowDescriptor& row_desc,
- RuntimeState* state, vector<ScalarExpr*>* exprs) {
+ FragmentState* state, vector<ScalarExpr*>* exprs) {
return ScalarExpr::Create(texprs, row_desc, state, state->obj_pool(), exprs);
}
@@ -283,7 +284,7 @@ ScalarExprsResultsRowLayout::ScalarExprsResultsRowLayout(
}
Status ScalarExpr::Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
DCHECK(type_.type != INVALID_TYPE);
for (int i = 0; i < children_.size(); ++i) {
RETURN_IF_ERROR(children_[i]->Init(row_desc, false, state));
@@ -318,7 +319,7 @@ string ScalarExpr::DebugString(const vector<ScalarExpr*>& exprs) {
return out.str();
}
-bool ScalarExpr::ShouldCodegen(const RuntimeState* state) const {
+bool ScalarExpr::ShouldCodegen(const FragmentState* state) const {
// Use the interpreted path and call the builtin without codegen if any of the
// followings is true:
// 1. The expression does not have an associated RuntimeState, e.g. is a partition
diff --git a/be/src/exprs/scalar-expr.h b/be/src/exprs/scalar-expr.h
index fceb83d..86cb6a4 100644
--- a/be/src/exprs/scalar-expr.h
+++ b/be/src/exprs/scalar-expr.h
@@ -56,6 +56,7 @@ using impala_udf::DecimalVal;
using impala_udf::DateVal;
using impala_udf::CollectionVal;
+class FragmentState;
struct LibCacheEntry;
class LlvmCodeGen;
class MemTracker;
@@ -146,22 +147,22 @@ class ScalarExpr : public Expr {
/// tuple row descriptor of the input tuple row. On failure, 'expr' is set to NULL and
/// the expr tree (if created) will be closed. Error status will be returned too.
static Status Create(const TExpr& texpr, const RowDescriptor& row_desc,
- RuntimeState* state, ObjectPool* pool, ScalarExpr** expr) WARN_UNUSED_RESULT;
+ FragmentState* state, ObjectPool* pool, ScalarExpr** expr) WARN_UNUSED_RESULT;
/// Create a new ScalarExpr based on thrift Expr 'texpr'. The newly created ScalarExpr
/// is stored in ObjectPool 'state->obj_pool()' and returned in 'expr'. 'row_desc' is
/// the tuple row descriptor of the input tuple row. Returns error status on failure.
static Status Create(const TExpr& texpr, const RowDescriptor& row_desc,
- RuntimeState* state, ScalarExpr** expr) WARN_UNUSED_RESULT;
+ FragmentState* state, ScalarExpr** expr) WARN_UNUSED_RESULT;
/// Convenience functions creating multiple ScalarExpr.
static Status Create(const std::vector<TExpr>& texprs, const RowDescriptor& row_desc,
- RuntimeState* state, ObjectPool* pool, std::vector<ScalarExpr*>* exprs)
- WARN_UNUSED_RESULT;
+ FragmentState* state, ObjectPool* pool,
+ std::vector<ScalarExpr*>* exprs) WARN_UNUSED_RESULT;
/// Convenience functions creating multiple ScalarExpr.
static Status Create(const std::vector<TExpr>& texprs, const RowDescriptor& row_desc,
- RuntimeState* state, std::vector<ScalarExpr*>* exprs) WARN_UNUSED_RESULT;
+ FragmentState* state, std::vector<ScalarExpr*>* exprs) WARN_UNUSED_RESULT;
/// Returns true if this expression is a SlotRef. Overridden by SlotRef.
virtual bool IsSlotRef() const { return false; }
@@ -308,7 +309,7 @@ class ScalarExpr : public Expr {
/// point into the codegen'd code. Currently we assume all roots of ScalarExpr subtrees
/// exprs are potential entry points.
virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
- RuntimeState* state) WARN_UNUSED_RESULT;
+ FragmentState* state) WARN_UNUSED_RESULT;
/// Initializes 'eval' for execution. If scope if FRAGMENT_LOCAL, both
/// fragment-local and thread-local states should be initialized. If scope is
@@ -364,7 +365,7 @@ class ScalarExpr : public Expr {
protected:
/// Return true if we should codegen this expression node, based on query options
/// and the properties of this ScalarExpr node.
- bool ShouldCodegen(const RuntimeState* state) const;
+ bool ShouldCodegen(const FragmentState* state) const;
/// Return true if it is possible to evaluate this expression node without codegen.
/// The vast majority of exprs support interpretation, so default to true. Scalars
diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc
index ca283ba..51d11a9 100644
--- a/be/src/exprs/scalar-fn-call.cc
+++ b/be/src/exprs/scalar-fn-call.cc
@@ -31,6 +31,7 @@
#include "codegen/llvm-codegen.h"
#include "exprs/anyval-util.h"
#include "exprs/scalar-expr-evaluator.h"
+#include "runtime/fragment-state.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/lib-cache.h"
#include "runtime/runtime-state.h"
@@ -71,7 +72,7 @@ Status ScalarFnCall::LoadPrepareAndCloseFn(LlvmCodeGen* codegen) {
}
Status ScalarFnCall::Init(
- const RowDescriptor& desc, bool is_entry_point, RuntimeState* state) {
+ const RowDescriptor& desc, bool is_entry_point, FragmentState* state) {
// Initialize children first.
RETURN_IF_ERROR(ScalarExpr::Init(desc, is_entry_point, state));
diff --git a/be/src/exprs/scalar-fn-call.h b/be/src/exprs/scalar-fn-call.h
index 82f687b..4190feb 100644
--- a/be/src/exprs/scalar-fn-call.h
+++ b/be/src/exprs/scalar-fn-call.h
@@ -79,7 +79,7 @@ class ScalarFnCall : public ScalarExpr {
ScalarFnCall(const TExprNode& node);
virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
- RuntimeState* state) override WARN_UNUSED_RESULT;
+ FragmentState* state) override WARN_UNUSED_RESULT;
virtual Status OpenEvaluator(FunctionContext::FunctionStateScope scope,
RuntimeState* state, ScalarExprEvaluator* eval) const override WARN_UNUSED_RESULT;
virtual void CloseEvaluator(FunctionContext::FunctionStateScope scope,
diff --git a/be/src/exprs/slot-ref.cc b/be/src/exprs/slot-ref.cc
index b5f758f..634a989 100644
--- a/be/src/exprs/slot-ref.cc
+++ b/be/src/exprs/slot-ref.cc
@@ -26,6 +26,7 @@
#include "gen-cpp/Exprs_types.h"
#include "runtime/collection-value.h"
#include "runtime/decimal-value.h"
+#include "runtime/fragment-state.h"
#include "runtime/multi-precision.h"
#include "runtime/runtime-state.h"
#include "runtime/string-value.inline.h"
@@ -72,7 +73,7 @@ SlotRef::SlotRef(const ColumnType& type, int offset, const bool nullable /* = fa
slot_id_(-1) {}
Status SlotRef::Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
DCHECK_EQ(children_.size(), 0);
RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
if (slot_id_ != -1) {
diff --git a/be/src/exprs/slot-ref.h b/be/src/exprs/slot-ref.h
index e9bd4e8..0449bfb 100644
--- a/be/src/exprs/slot-ref.h
+++ b/be/src/exprs/slot-ref.h
@@ -50,7 +50,7 @@ class SlotRef : public ScalarExpr {
/// Exposed as public so AGG node can initialize its build expressions.
virtual Status Init(const RowDescriptor& row_desc, bool is_entry_point,
- RuntimeState* state) override WARN_UNUSED_RESULT;
+ FragmentState* state) override WARN_UNUSED_RESULT;
virtual std::string DebugString() const override;
virtual Status GetCodegendComputeFnImpl(
LlvmCodeGen* codegen, llvm::Function** fn) override WARN_UNUSED_RESULT;
diff --git a/be/src/exprs/tuple-is-null-predicate.cc b/be/src/exprs/tuple-is-null-predicate.cc
index 769ca33..4225863 100644
--- a/be/src/exprs/tuple-is-null-predicate.cc
+++ b/be/src/exprs/tuple-is-null-predicate.cc
@@ -44,7 +44,7 @@ TupleIsNullPredicate::TupleIsNullPredicate(const TExprNode& node)
node.tuple_is_null_pred.tuple_ids.end()) {}
Status TupleIsNullPredicate::Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
DCHECK_EQ(0, children_.size());
// Resolve tuple ids to tuple indexes.
diff --git a/be/src/exprs/tuple-is-null-predicate.h b/be/src/exprs/tuple-is-null-predicate.h
index 552b325..e028ca3 100644
--- a/be/src/exprs/tuple-is-null-predicate.h
+++ b/be/src/exprs/tuple-is-null-predicate.h
@@ -40,7 +40,7 @@ class TupleIsNullPredicate: public Predicate {
TupleIsNullPredicate(const TExprNode& node);
virtual Status Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) override;
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) override;
virtual Status GetCodegendComputeFnImpl(
LlvmCodeGen* codegen, llvm::Function** fn) override WARN_UNUSED_RESULT;
virtual std::string DebugString() const override;
diff --git a/be/src/exprs/valid-tuple-id.cc b/be/src/exprs/valid-tuple-id.cc
index 2884dcb..e6fe640 100644
--- a/be/src/exprs/valid-tuple-id.cc
+++ b/be/src/exprs/valid-tuple-id.cc
@@ -32,7 +32,7 @@ const char* ValidTupleIdExpr::LLVM_CLASS_NAME = "class.impala::ValidTupleIdExpr"
ValidTupleIdExpr::ValidTupleIdExpr(const TExprNode& node) : ScalarExpr(node) {}
Status ValidTupleIdExpr::Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) {
RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
DCHECK_EQ(0, children_.size());
tuple_ids_.reserve(row_desc.tuple_descriptors().size());
diff --git a/be/src/exprs/valid-tuple-id.h b/be/src/exprs/valid-tuple-id.h
index 342baf8..6422bb1 100644
--- a/be/src/exprs/valid-tuple-id.h
+++ b/be/src/exprs/valid-tuple-id.h
@@ -36,7 +36,7 @@ class ValidTupleIdExpr : public ScalarExpr {
explicit ValidTupleIdExpr(const TExprNode& node);
virtual Status Init(
- const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) override;
+ const RowDescriptor& row_desc, bool is_entry_point, FragmentState* state) override;
virtual Status GetCodegendComputeFnImpl(
LlvmCodeGen* codegen, llvm::Function** fn) override WARN_UNUSED_RESULT;
virtual std::string DebugString() const override;
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index b6550e2..6990421 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -45,6 +45,7 @@ add_library(Runtime
descriptors.cc
dml-exec-state.cc
exec-env.cc
+ fragment-state.cc
fragment-instance-state.cc
hbase-table.cc
hbase-table-factory.cc
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 3ac64bd..ddcd84d 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -28,6 +28,8 @@
#include "rpc/auth-provider.h"
#include "rpc/thrift-server.h"
#include "rpc/rpc-mgr.h"
+#include "runtime/fragment-state.h"
+#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/exec-env.h"
@@ -152,6 +154,10 @@ class DataStreamTest : public testing::Test {
ABORT_IF_ERROR(exec_env_->InitForFeTests());
exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024);
runtime_state_.reset(new RuntimeState(TQueryCtx(), exec_env_.get()));
+ TPlanFragmentCtx* fragment_ctx =
+ runtime_state_->obj_pool()->Add(new TPlanFragmentCtx());
+ fragment_state_ = runtime_state_->obj_pool()->Add(
+ new FragmentState(runtime_state_->query_state(), *fragment_ctx));
mem_pool_.reset(new MemPool(&tracker_));
// Register a BufferPool client for allocating buffers for row batches.
@@ -163,7 +169,7 @@ class DataStreamTest : public testing::Test {
CreateRowDesc();
SlotRef* lhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
- ASSERT_OK(lhs_slot->Init(RowDescriptor(), true, runtime_state_.get()));
+ ASSERT_OK(lhs_slot->Init(RowDescriptor(), true, fragment_state_));
ordering_exprs_.push_back(lhs_slot);
tsort_info_.sorting_order = TSortingOrder::LEXICAL;
@@ -227,6 +233,8 @@ class DataStreamTest : public testing::Test {
less_than_comparator->Close(runtime_state_.get());
}
ScalarExpr::Close(ordering_exprs_);
+ fragment_state_->ReleaseResources();
+ fragment_state_ = nullptr;
mem_pool_->FreeAll();
StopKrpcBackend();
exec_env_->buffer_pool()->DeregisterClient(&buffer_pool_client_);
@@ -247,6 +255,7 @@ class DataStreamTest : public testing::Test {
vector<TupleRowComparator*> less_than_comparators_;
boost::scoped_ptr<ExecEnv> exec_env_;
scoped_ptr<RuntimeState> runtime_state_;
+ FragmentState* fragment_state_;
TUniqueId next_instance_id_;
string stmt_;
// The sorting expression for the single BIGINT column.
@@ -582,9 +591,13 @@ class DataStreamTest : public testing::Test {
TPartitionType::type partition_type, SenderInfo* info, bool reset_hash_seed) {
RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
VLOG_QUERY << "create sender " << sender_num;
- const TDataSink& sink = GetSink(partition_type);
+ const TDataSink sink = GetSink(partition_type);
+ TPlanFragmentCtx fragment_ctx;
+ fragment_ctx.fragment.output_sink = sink;
+ fragment_ctx.destinations = dest_;
+ FragmentState fragment_state(state.query_state(), fragment_ctx);
DataSinkConfig* data_sink = nullptr;
- EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &state, &data_sink));
+ EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &fragment_state, &data_sink));
// We create an object of the base class DataSink and cast to the appropriate sender
// according to the 'is_thrift' option.
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index c5a1d50..94e3105 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -38,6 +38,7 @@
#include "runtime/backend-client.h"
#include "runtime/client-cache.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
@@ -65,13 +66,12 @@ static const string OPEN_TIMER_NAME = "OpenTime";
static const string PREPARE_TIMER_NAME = "PrepareTime";
static const string EXEC_TIMER_NAME = "ExecTime";
-FragmentInstanceState::FragmentInstanceState(
- QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
- const TPlanFragmentInstanceCtx& instance_ctx)
+FragmentInstanceState::FragmentInstanceState(QueryState* query_state,
+ FragmentState* fragment_state, const TPlanFragmentInstanceCtx& instance_ctx)
: query_state_(query_state),
- fragment_ctx_(fragment_ctx),
- instance_ctx_(instance_ctx) {
-}
+ fragment_state_(fragment_state),
+ fragment_ctx_(fragment_state->fragment_ctx()),
+ instance_ctx_(instance_ctx) {}
Status FragmentInstanceState::Exec() {
bool is_prepared = false;
@@ -174,11 +174,11 @@ Status FragmentInstanceState::Prepare() {
bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
runtime_state_->resource_pool()));
- RETURN_IF_ERROR(
- PlanNode::CreateTree(runtime_state_, fragment_ctx_.fragment.plan, &plan_tree_));
- // set up plan
+ // Create the exec tree.
+ const PlanNode* plan_tree = fragment_state_->plan_tree();
+ DCHECK(plan_tree != nullptr);
RETURN_IF_ERROR(ExecNode::CreateTree(
- runtime_state_, *plan_tree_, query_state_->desc_tbl(), &exec_tree_));
+ runtime_state_, *plan_tree, query_state_->desc_tbl(), &exec_tree_));
runtime_state_->set_fragment_root_id(exec_tree_->id());
if (instance_ctx_.__isset.debug_options) {
ExecNode::SetDebugOptions(instance_ctx_.debug_options, exec_tree_);
@@ -214,11 +214,9 @@ Status FragmentInstanceState::Prepare() {
PrintVolumeIds();
// prepare sink_
- DCHECK(fragment_ctx_.fragment.__isset.output_sink);
- const TDataSink& thrift_sink = fragment_ctx_.fragment.output_sink;
- RETURN_IF_ERROR(DataSinkConfig::CreateConfig(
- thrift_sink, plan_tree_->row_descriptor_, runtime_state_, &sink_config_));
- sink_ = sink_config_->CreateSink(fragment_ctx_, instance_ctx_, runtime_state_);
+ const DataSinkConfig* sink_config = fragment_state_->sink_config();
+ DCHECK(sink_config != nullptr);
+ sink_ = sink_config->CreateSink(fragment_ctx_, instance_ctx_, runtime_state_);
RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
RuntimeProfile* sink_profile = sink_->profile();
if (sink_profile != nullptr) profile()->AddChild(sink_profile);
@@ -332,26 +330,9 @@ Status FragmentInstanceState::Open() {
ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
- if (runtime_state_->ShouldCodegen()) {
+ if (fragment_state_->ShouldCodegen()) {
UpdateState(StateEvent::CODEGEN_START);
- RETURN_IF_ERROR(runtime_state_->CreateCodegen());
- {
- SCOPED_TIMER2(runtime_state_->codegen()->ir_generation_timer(),
- runtime_state_->codegen()->runtime_profile()->total_time_counter());
- SCOPED_THREAD_COUNTER_MEASUREMENT(
- runtime_state_->codegen()->llvm_thread_counters());
- exec_tree_->Codegen(runtime_state_);
- sink_->Codegen(runtime_state_);
-
- // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
- // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
- // the error status for now.
- RETURN_IF_ERROR(runtime_state_->CodegenScalarExprs());
- }
-
- LlvmCodeGen* codegen = runtime_state_->codegen();
- DCHECK(codegen != nullptr);
- RETURN_IF_ERROR(codegen->FinalizeModule());
+ RETURN_IF_ERROR(fragment_state_->InvokeCodegen());
}
{
@@ -421,7 +402,6 @@ void FragmentInstanceState::Close() {
// guard against partially-finished Prepare()
if (sink_ != nullptr) sink_->Close(runtime_state_);
- if (sink_config_ != nullptr) sink_config_->Close();
// Stop updating profile counters in background.
profile()->StopPeriodicCounters();
@@ -429,7 +409,6 @@ void FragmentInstanceState::Close() {
// Delete row_batch_ to free resources associated with it.
row_batch_.reset();
if (exec_tree_ != nullptr) exec_tree_->Close(runtime_state_);
- if (plan_tree_ != nullptr) plan_tree_->Close();
runtime_state_->ReleaseResources();
// Sanity timer checks
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 20856be..36c81d8 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -43,6 +43,7 @@ class RpcContext;
namespace impala {
+class FragmentState;
class TPlanFragmentCtx;
class TPlanFragmentInstanceCtx;
class TBloomFilter;
@@ -79,7 +80,7 @@ class JoinBuilder;
/// - absorb RuntimeState?
class FragmentInstanceState {
public:
- FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
+ FragmentInstanceState(QueryState* query_state, FragmentState* fragment_state,
const TPlanFragmentInstanceCtx& instance_ctx);
/// Main loop of fragment instance execution. Blocks until execution finishes and
@@ -156,6 +157,7 @@ class FragmentInstanceState {
private:
QueryState* query_state_;
+ FragmentState* fragment_state_;
const TPlanFragmentCtx& fragment_ctx_;
const TPlanFragmentInstanceCtx& instance_ctx_;
@@ -163,9 +165,6 @@ class FragmentInstanceState {
/// in Prepare().
ExecNode* exec_tree_ = nullptr; // lives in obj_pool()
RuntimeState* runtime_state_ = nullptr; // lives in obj_pool()
- /// Lives in obj_pool(). Not mutated after being initialized except for being closed.
- PlanNode* plan_tree_ = nullptr;
- DataSinkConfig* sink_config_ = nullptr;
/// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
/// There should be only one thread doing status report at the same time.
@@ -229,9 +228,6 @@ class FragmentInstanceState {
/// should live in obj_pool(), but managed separately so we can delete it in Close()
boost::scoped_ptr<RowBatch> row_batch_;
- /// Set when Prepare() returns.
- Promise<Status> prepared_promise_;
-
/// Set when OpenInternal() returns.
Promise<Status> opened_promise_;
diff --git a/be/src/runtime/fragment-state.cc b/be/src/runtime/fragment-state.cc
new file mode 100644
index 0000000..9c27312
--- /dev/null
+++ b/be/src/runtime/fragment-state.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/fragment-state.h"
+
+#include <gutil/strings/substitute.h>
+
+#include "codegen/llvm-codegen.h"
+#include "exec/exec-node.h"
+#include "exec/data-sink.h"
+#include "runtime/exec-env.h"
+#include "runtime/query-state.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "util/debug-util.h"
+#include "util/impalad-metrics.h"
+#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+const string FragmentState::FSTATE_THREAD_GROUP_NAME = "fragment-init";
+const string FragmentState::FSTATE_THREAD_NAME_PREFIX = "init-and-codegen";
+
+Status FragmentState::CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info,
+ QueryState* state, std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map) {
+ int fragment_ctx_idx = 0;
+ const TPlanFragmentCtx& frag_ctx = fragment_info.fragment_ctxs[fragment_ctx_idx];
+ FragmentState* fragment_state =
+ state->obj_pool()->Add(new FragmentState(state, frag_ctx));
+ fragment_map[fragment_state->fragment_idx()] = fragment_state;
+ for (const TPlanFragmentInstanceCtx& instance_ctx :
+ fragment_info.fragment_instance_ctxs) {
+ // determine corresponding TPlanFragmentCtx
+ if (fragment_state->fragment_idx() != instance_ctx.fragment_idx) {
+ ++fragment_ctx_idx;
+ DCHECK_LT(fragment_ctx_idx, fragment_info.fragment_ctxs.size());
+ const TPlanFragmentCtx& fragment_ctx =
+ fragment_info.fragment_ctxs[fragment_ctx_idx];
+ fragment_state = state->obj_pool()->Add(new FragmentState(state, fragment_ctx));
+ fragment_map[fragment_state->fragment_idx()] = fragment_state;
+ // we expect fragment and instance contexts to follow the same order
+ DCHECK_EQ(fragment_state->fragment_idx(), instance_ctx.fragment_idx);
+ }
+ fragment_state->AddInstance(&instance_ctx);
+ }
+ // Init all fragments.
+ for (auto& elem : fragment_map) {
+ RETURN_IF_ERROR(elem.second->Init());
+ }
+ return Status::OK();
+}
+
+Status FragmentState::Init() {
+ RETURN_IF_ERROR(PlanNode::CreateTree(this, fragment_ctx_.fragment.plan, &plan_tree_));
+ RETURN_IF_ERROR(DataSinkConfig::CreateConfig(fragment_ctx_.fragment.output_sink,
+ plan_tree_->row_descriptor_, this, &sink_config_));
+ return Status::OK();
+}
+
+Status FragmentState::InvokeCodegen() {
+ unique_lock<mutex> l(codegen_lock_);
+ if (!codegen_invoked_) {
+ codegen_invoked_ = true;
+ codegen_status_ = CodegenHelper();
+ if (!codegen_status_.ok()) {
+ string error_ctx = Substitute("Fragment failed during codegen, fragment index: $0",
+ fragment_ctx_.fragment.display_name);
+ codegen_status_.AddDetail(error_ctx);
+ query_state_->ErrorDuringFragmentCodegen(codegen_status_);
+ }
+ }
+ return codegen_status_;
+}
+
+Status FragmentState::CodegenHelper() {
+ DCHECK(plan_tree_ != nullptr);
+ DCHECK(sink_config_ != nullptr);
+ DCHECK(ShouldCodegen());
+ RETURN_IF_ERROR(CreateCodegen());
+ {
+ SCOPED_TIMER2(codegen()->ir_generation_timer(),
+ codegen()->runtime_profile()->total_time_counter());
+ SCOPED_THREAD_COUNTER_MEASUREMENT(codegen()->llvm_thread_counters());
+ plan_tree_->Codegen(this);
+ sink_config_->Codegen(this);
+ // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
+ // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
+ // the error status for now. Now that IMPALA-4233 is fixed, revisit this comment.
+ RETURN_IF_ERROR(CodegenScalarExprs());
+ }
+
+ LlvmCodeGen* llvm_codegen = codegen();
+ DCHECK(llvm_codegen != nullptr);
+ RETURN_IF_ERROR(llvm_codegen->FinalizeModule());
+ return Status::OK();
+}
+
+FragmentState::FragmentState(QueryState* query_state,
+ const TPlanFragmentCtx& fragment_ctx)
+ : query_state_(query_state),
+ fragment_ctx_(fragment_ctx) {
+ runtime_profile_ = RuntimeProfile::Create(query_state->obj_pool(),
+ Substitute("Fragment $0", fragment_ctx_.fragment.display_name));
+ query_state_->host_profile()->AddChild(runtime_profile_);
+}
+
+FragmentState::~FragmentState() {}
+
+void FragmentState::ReleaseResources() {
+ if (codegen_ != nullptr) codegen_->Close();
+ if (plan_tree_ != nullptr) plan_tree_->Close();
+ if (sink_config_ != nullptr) sink_config_->Close();
+}
+
+
+Status FragmentState::CreateCodegen() {
+ if (codegen_.get() != NULL) return Status::OK();
+ RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(
+ this, query_mem_tracker(), PrintId(query_id()), &codegen_));
+ codegen_->EnableOptimizations(true);
+ runtime_profile_->AddChild(codegen_->runtime_profile());
+ return Status::OK();
+}
+
+Status FragmentState::CodegenScalarExprs() {
+ for (auto& item : scalar_exprs_to_codegen_) {
+ llvm::Function* fn;
+ RETURN_IF_ERROR(item.first->GetCodegendComputeFn(codegen_.get(), item.second, &fn));
+ }
+ return Status::OK();
+}
+
+
+std::string FragmentState::GenerateCodegenMsg(
+ bool codegen_enabled, const Status& codegen_status, const std::string& extra_label) {
+ const string& err_msg = codegen_status.ok() ? "" : codegen_status.msg().msg();
+ return GenerateCodegenMsg(codegen_enabled, err_msg, extra_label);
+}
+
+std::string FragmentState::GenerateCodegenMsg(bool codegen_enabled,
+ const std::string& extra_info, const std::string& extra_label) {
+ std::stringstream str;
+ if (!extra_label.empty()) str << extra_label << " ";
+ str << (codegen_enabled ? "Codegen Enabled" : "Codegen Disabled");
+ if (!extra_info.empty()) str << ": " + extra_info;
+ return str.str();
+}
+
+}
diff --git a/be/src/runtime/fragment-state.h b/be/src/runtime/fragment-state.h
new file mode 100644
index 0000000..54cf45d
--- /dev/null
+++ b/be/src/runtime/fragment-state.h
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <boost/scoped_ptr.hpp>
+
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "runtime/query-state.h"
+
+namespace impala {
+
+class FragmentInstanceState;
+class QueryCtx;
+class RuntimeProfile;
+
+/// This encapsulates all the static state for a fragment that will be shared across its
+/// instances which includes the thrift structures representing fragments and all its
+/// instances, plan node tree and data sink config. It also contains state and methods
+/// required for creating, invoking and managing codegen. Is not thread safe unless
+/// specified.
+
+class FragmentState {
+ public:
+ /// Create a map of fragment index to its FragmentState object and only populate the
+ /// thrift references of the fragment and instance context objects from 'fragment_info'.
+ static Status CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info,
+ QueryState* state, std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map);
+ FragmentState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx);
+ ~FragmentState();
+
+ /// Called by all the fragment instance threads that execute this fragment. The first
+ /// fragment instance to call this does the actual codegen work. The rest either wait
+ /// till codegen is complete or simple return immediately if it is already completed. In
+ /// case codegen fails, it attempts to set an error status in the query state and
+ /// returns that status on every subsequent call. Is thread-safe.
+ Status InvokeCodegen();
+
+ /// Release resources held by codegen, the plan tree and data sink config.
+ void ReleaseResources();
+
+ ObjectPool* obj_pool() { return &obj_pool_; }
+ int fragment_idx() const { return fragment_ctx_.fragment.idx; }
+ const TQueryOptions& query_options() const { return query_state_->query_options(); }
+ const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; }
+ const std::vector<const TPlanFragmentInstanceCtx*>& instance_ctxs() const {
+ return instance_ctxs_;
+ }
+ const PlanNode* plan_tree() const { return plan_tree_; }
+ const DataSinkConfig* sink_config() const { return sink_config_; }
+ const TUniqueId& query_id() const { return query_state_->query_id(); }
+ const DescriptorTbl& desc_tbl() const { return query_state_->desc_tbl(); }
+ MemTracker* query_mem_tracker() const { return query_state_->query_mem_tracker(); }
+ RuntimeProfile* runtime_profile() { return runtime_profile_; }
+
+ static const std::string FSTATE_THREAD_GROUP_NAME;
+ static const std::string FSTATE_THREAD_NAME_PREFIX;
+
+ /// Methods relevant for codegen.
+
+ /// Create a codegen object accessible via codegen() if it doesn't exist already.
+ Status CreateCodegen();
+
+ /// Codegen all ScalarExpr expressions in 'scalar_exprs_to_codegen_'. If codegen fails
+ /// for any expressions, return immediately with the error status. Once IMPALA-4233 is
+ /// fixed, it's not fatal to fail codegen if the expression can be interpreted.
+ /// TODO: Now that IMPALA-4233 is fixed, revisit this comment.
+ Status CodegenScalarExprs();
+
+ /// Add ScalarExpr expression 'expr' to be codegen'd later if it's not disabled by
+ /// query option. If 'is_codegen_entry_point' is true, 'expr' will be an entry
+ /// point into codegen'd evaluation (i.e. it will have a function pointer populated).
+ /// Adding an expr here ensures that it will be codegen'd (i.e. fragment execution
+ /// will fail with an error if the expr cannot be codegen'd).
+ void AddScalarExprToCodegen(ScalarExpr* expr, bool is_codegen_entry_point) {
+ scalar_exprs_to_codegen_.push_back({expr, is_codegen_entry_point});
+ }
+
+ /// Returns true if there are ScalarExpr expressions in the fragments that we want
+ /// to codegen (because they can't be interpreted or based on options/hints).
+ /// This should only be used after the plan tree and the data sink configs have been
+ /// created, init'ed in which all expressions' Prepare() are invoked.
+ bool ScalarExprNeedsCodegen() const { return !scalar_exprs_to_codegen_.empty(); }
+
+ /// Check if codegen was disabled and if so, add a message to the runtime profile.
+ /// Call this only after expressions have been have been created.
+ void CheckAndAddCodegenDisabledMessage(std::vector<std::string>& codegen_status_msgs) {
+ if (CodegenDisabledByQueryOption()) {
+ codegen_status_msgs.emplace_back(
+ GenerateCodegenMsg(false, "disabled by query option DISABLE_CODEGEN"));
+ } else if (CodegenDisabledByHint()) {
+ codegen_status_msgs.emplace_back(
+ GenerateCodegenMsg(false, "disabled due to optimization hints"));
+ }
+ }
+
+ /// Returns true if there is a hint to disable codegen. This can be true for single node
+ /// optimization or expression evaluation request from FE to BE (see fe-support.cc).
+ /// Note that this internal flag is advisory and it may be ignored if the fragment has
+ /// any UDF which cannot be interpreted. See ScalarExpr::Prepare() for details.
+ inline bool CodegenHasDisableHint() const {
+ return query_state_->query_ctx().disable_codegen_hint;
+ }
+
+ /// Returns true iff there is a hint to disable codegen and all expressions in the
+ /// fragment can be interpreted. This should only be used after the Prepare() phase
+ /// in which all expressions' Prepare() are invoked.
+ inline bool CodegenDisabledByHint() const {
+ return CodegenHasDisableHint() && !ScalarExprNeedsCodegen();
+ }
+
+ /// Returns true if codegen is disabled by query option.
+ inline bool CodegenDisabledByQueryOption() const {
+ return query_options().disable_codegen;
+ }
+
+ /// Returns true if codegen should be enabled for this fragment. Codegen is enabled
+ /// if all the following conditions hold:
+ /// 1. it's enabled by query option
+ /// 2. it's not disabled by internal hints or there are expressions in the fragment
+ /// which cannot be interpreted.
+ inline bool ShouldCodegen() const {
+ return !CodegenDisabledByQueryOption() && !CodegenDisabledByHint();
+ }
+
+ LlvmCodeGen* codegen() { return codegen_.get(); }
+
+ /// Utility methods for generating a messages from Status objects by adding context
+ /// relevant to codegen.
+ static std::string GenerateCodegenMsg(bool codegen_enabled,
+ const Status& codegen_status, const std::string& extra_label = "");
+ static std::string GenerateCodegenMsg(bool codegen_enabled,
+ const std::string& extra_info = "", const std::string& extra_label = "");
+
+ private:
+ ObjectPool obj_pool_;
+
+ /// Reference to the query state object that owns this.
+ QueryState* query_state_;
+
+ /// References to the thrift structs for this fragment.
+ const TPlanFragmentCtx& fragment_ctx_;
+ std::vector<const TPlanFragmentInstanceCtx*> instance_ctxs_;
+
+ /// Lives in obj_pool(). Not mutated after being initialized in InitAndCodegen() except
+ /// for being closed.
+ PlanNode* plan_tree_ = nullptr;
+ DataSinkConfig* sink_config_ = nullptr;
+
+ boost::scoped_ptr<LlvmCodeGen> codegen_;
+
+ /// Stores the result of calling InitAndCodegen() to check for any errors encountered
+ /// during that call.
+ Status codegen_status_;
+
+ /// Contains all ScalarExpr expressions which need to be codegen'd. The second element
+ /// is true if we want to generate a codegen entry point for this expr.
+ std::vector<std::pair<ScalarExpr*, bool>> scalar_exprs_to_codegen_;
+
+ RuntimeProfile* runtime_profile_ = nullptr;
+
+ /// Serializes access to InvokeCodegen().
+ /// Lock ordering: QueryState::status_lock_ must *not be obtained* prior to this.
+ std::mutex codegen_lock_;
+
+ /// Indicates whether codegen has been invoked. Used to make sure only the first
+ /// fragment instance to call InvokeCodegen() does the actual codegen work.
+ bool codegen_invoked_ = false;
+
+ /// Used by the CreateFragmentStateMap to add TPlanFragmentInstanceCtx thrift objects
+ /// for the fragment that this object represents.
+ void AddInstance(const TPlanFragmentInstanceCtx* instance_ctx) {
+ instance_ctxs_.push_back(instance_ctx);
+ }
+
+ /// Helper method used by InvokeCodegen(). Does the actual codegen work.
+ Status CodegenHelper();
+
+ /// Create the plan tree, data sink config.
+ Status Init();
+};
+}
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 4865757..5f9becc 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -38,6 +38,7 @@
#include "rpc/rpc-mgr.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.inline.h"
#include "runtime/row-batch.h"
@@ -76,7 +77,7 @@ const char* KrpcDataStreamSender::LLVM_CLASS_NAME = "class.impala::KrpcDataStrea
const char* KrpcDataStreamSender::TOTAL_BYTES_SENT_COUNTER = "TotalBytesSent";
Status KrpcDataStreamSenderConfig::Init(
- const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
+ const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
DCHECK(tsink_->__isset.stream_sink);
partition_type_ = tsink_->stream_sink.output_partition.type;
@@ -88,6 +89,8 @@ Status KrpcDataStreamSenderConfig::Init(
exchange_hash_seed_ =
KrpcDataStreamSender::EXCHANGE_HASH_SEED_CONST ^ state->query_id().hi;
}
+ num_channels_ = state->fragment_ctx().destinations.size();
+ state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK();
}
@@ -773,12 +776,12 @@ Status KrpcDataStreamSender::Prepare(
for (int i = 0; i < channels_.size(); ++i) {
RETURN_IF_ERROR(channels_[i]->Init(state));
}
- state->CheckAndAddCodegenDisabledMessage(profile());
return Status::OK();
}
Status KrpcDataStreamSender::Open(RuntimeState* state) {
SCOPED_TIMER(profile_->total_time_counter());
+ RETURN_IF_ERROR(DataSink::Open(state));
return ScalarExprEvaluator::Open(partition_expr_evals_, state);
}
@@ -921,22 +924,15 @@ string KrpcDataStreamSenderConfig::PartitionTypeName() const {
}
}
-void KrpcDataStreamSender::Codegen(RuntimeState* state) {
- const KrpcDataStreamSenderConfig& config =
- static_cast<const KrpcDataStreamSenderConfig&>(sink_config_);
- KrpcDataStreamSenderConfig& non_const_config =
- const_cast<KrpcDataStreamSenderConfig&>(config);
- non_const_config.Codegen(state, profile());
-}
-
-void KrpcDataStreamSenderConfig::Codegen(RuntimeState* state, RuntimeProfile* profile) {
+void KrpcDataStreamSenderConfig::Codegen(FragmentState* state) {
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != nullptr);
const string sender_name = PartitionTypeName() + " Sender";
if (partition_type_ != TPartitionType::HASH_PARTITIONED) {
const string& msg = Substitute("not $0",
partition_type_ == TPartitionType::KUDU ? "supported" : "needed");
- profile->AddCodegenMsg(false, msg, sender_name);
+ codegen_status_msgs_.emplace_back(
+ FragmentState::GenerateCodegenMsg(false, msg, sender_name));
return;
}
@@ -949,9 +945,8 @@ void KrpcDataStreamSenderConfig::Codegen(RuntimeState* state, RuntimeProfile* pr
int num_replaced;
// Replace GetNumChannels() with a constant.
- int num_channels = state->fragment_ctx().destinations.size();
num_replaced = codegen->ReplaceCallSitesWithValue(hash_and_add_rows_fn,
- codegen->GetI32Constant(num_channels), "GetNumChannels");
+ codegen->GetI32Constant(num_channels_), "GetNumChannels");
DCHECK_EQ(num_replaced, 1);
// Replace HashRow() with the handcrafted IR function.
@@ -968,7 +963,7 @@ void KrpcDataStreamSenderConfig::Codegen(RuntimeState* state, RuntimeProfile* pr
reinterpret_cast<void**>(&hash_and_add_rows_fn_));
}
}
- profile->AddCodegenMsg(codegen_status.ok(), codegen_status, sender_name);
+ AddCodegenStatus(codegen_status, sender_name);
}
Status KrpcDataStreamSender::AddRowToChannel(const int channel_id, TupleRow* row) {
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index e74e82f..a7e55a1 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -47,7 +47,10 @@ class KrpcDataStreamSenderConfig : public DataSinkConfig {
RuntimeState* state) const override;
void Close() override;
- void Codegen(RuntimeState* state, RuntimeProfile* profile);
+ /// Codegen KrpcDataStreamSender::HashAndAddRows() if partitioning type is
+ /// HASH_PARTITIONED. Replaces KrpcDataStreamSender::HashRow() and
+ /// KrpcDataStreamSender::GetNumChannels() based on runtime information.
+ void Codegen(FragmentState* state) override;
/// The type of partitioning to perform.
TPartitionType::type partition_type_ = TPartitionType::UNPARTITIONED;
@@ -56,6 +59,9 @@ class KrpcDataStreamSenderConfig : public DataSinkConfig {
/// per-row partition values for shuffling exchange;
std::vector<ScalarExpr*> partition_exprs_;
+ /// The number of channels that this node will create.
+ int num_channels_;
+
/// Hash seed used for exchanges. Query id will be used to seed the hash function.
uint64_t exchange_hash_seed_;
@@ -68,7 +74,7 @@ class KrpcDataStreamSenderConfig : public DataSinkConfig {
protected:
Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
- RuntimeState* state) override;
+ FragmentState* state) override;
private:
/// Codegen the KrpcDataStreamSender::HashRow() function and returns the codegen'd
@@ -111,10 +117,6 @@ class KrpcDataStreamSender : public DataSink {
/// the stat counters. Return error status if any channels failed to initialize.
virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
- /// Codegen HashAndAddRows() if partitioning type is HASH_PARTITIONED.
- /// Replaces HashRow() and GetNumChannels() based on runtime information.
- virtual void Codegen(RuntimeState* state) override;
-
/// Initializes the evaluator of the partitioning expressions. Return error status
/// if initialization failed.
virtual Status Open(RuntimeState* state) override;
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index a07e737..a154d59 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -33,6 +33,7 @@
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/bufferpool/reservation-util.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/initial-reservations.h"
#include "runtime/mem-tracker.h"
@@ -116,6 +117,10 @@ void QueryState::ReleaseBackendResources() {
if (initial_reservations_ != nullptr) initial_reservations_->ReleaseResources();
if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources();
+ // Release any memory associated with codegen.
+ for (auto& elem : fragment_state_map_) {
+ elem.second->ReleaseResources();
+ }
// Mark the query as finished on the query MemTracker so that admission control will
// not consider the whole query memory limit to be "reserved".
query_mem_tracker_->set_query_exec_finished();
@@ -258,6 +263,26 @@ Status QueryState::InitBufferPoolState() {
return Status::OK();
}
+// Verifies the filters produced by all instances on the same backend are the same.
+bool VerifyFiltersProduced(const vector<TPlanFragmentInstanceCtx>& instance_ctxs) {
+ int fragment_idx = -1;
+ std::unordered_set<int> first_set;
+ for (const TPlanFragmentInstanceCtx& instance_ctx : instance_ctxs) {
+ bool first_instance_of_fragment =
+ fragment_idx == -1 || fragment_idx != instance_ctx.fragment_idx;
+ if (first_instance_of_fragment) {
+ fragment_idx = instance_ctx.fragment_idx;
+ first_set.clear();
+ for (auto f : instance_ctx.filters_produced) first_set.insert(f.filter_id);
+ }
+ if (first_set.size() != instance_ctx.filters_produced.size()) return false;
+ for (auto f : instance_ctx.filters_produced) {
+ if (first_set.find(f.filter_id) == first_set.end()) return false;
+ }
+ }
+ return true;
+}
+
Status QueryState::InitFilterBank() {
int64_t runtime_filters_reservation_bytes = 0;
int fragment_ctx_idx = -1;
@@ -274,6 +299,8 @@ Status QueryState::InitFilterBank() {
}
}
}
+ DCHECK(VerifyFiltersProduced(instance_ctxs))
+ << "Filters produced by all instances on the same backend should be the same";
for (const TPlanFragmentInstanceCtx& instance_ctx : instance_ctxs) {
bool first_instance_of_fragment = fragment_ctx_idx == -1
|| fragment_ctxs[fragment_ctx_idx].fragment.idx != instance_ctx.fragment_idx;
@@ -516,6 +543,14 @@ int64_t QueryState::GetReportWaitTimeMs() const {
return report_interval * (num_failed_reports_ + 1);
}
+void QueryState::ErrorDuringFragmentCodegen(const Status& status) {
+ unique_lock<SpinLock> l(status_lock_);
+ if (!HasErrorStatus()) {
+ overall_status_ = status;
+ failed_finstance_id_ = TUniqueId();
+ }
+}
+
void QueryState::ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id) {
{
unique_lock<SpinLock> l(status_lock_);
@@ -561,9 +596,8 @@ bool QueryState::StartFInstances() {
DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
DCHECK_GT(fragment_info_.fragment_ctxs.size(), 0);
- TPlanFragmentCtx* fragment_ctx = &fragment_info_.fragment_ctxs[0];
+ vector<unique_ptr<Thread>> codegen_threads;
int num_unstarted_instances = fragment_info_.fragment_instance_ctxs.size();
- int fragment_ctx_idx = 0;
// set up desc tbl
DCHECK(query_ctx().__isset.desc_tbl_serialized);
@@ -573,49 +607,49 @@ bool QueryState::StartFInstances() {
VLOG(2) << "descriptor table for query=" << PrintId(query_id())
<< "\n" << desc_tbl_->DebugString();
+ start_finstances_status =
+ FragmentState::CreateFragmentStateMap(fragment_info_, this, fragment_state_map_);
+ if (UNLIKELY(!start_finstances_status.ok())) goto error;
+
fragment_events_start_time_ = MonotonicStopWatch::Now();
- for (const TPlanFragmentInstanceCtx& instance_ctx :
- fragment_info_.fragment_instance_ctxs) {
- // determine corresponding TPlanFragmentCtx
- if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {
- ++fragment_ctx_idx;
- DCHECK_LT(fragment_ctx_idx, fragment_info_.fragment_ctxs.size());
- fragment_ctx = &fragment_info_.fragment_ctxs[fragment_ctx_idx];
- // we expect fragment and instance contexts to follow the same order
- DCHECK_EQ(fragment_ctx->fragment.idx, instance_ctx.fragment_idx);
- }
- FragmentInstanceState* fis = obj_pool_.Add(
- new FragmentInstanceState(this, *fragment_ctx, instance_ctx));
-
- // start new thread to execute instance
- refcnt_.Add(1); // decremented in ExecFInstance()
- AcquireBackendResourceRefcount(); // decremented in ExecFInstance()
-
- // Add the fragment instance ID to the 'fis_map_'. Has to happen before the thread is
- // spawned or we may race with users of 'fis_map_'.
- fis_map_.emplace(fis->instance_id(), fis);
-
- string thread_name = Substitute("$0 (finst:$1)",
- FragmentInstanceState::FINST_THREAD_NAME_PREFIX,
- PrintId(instance_ctx.fragment_instance_id));
- unique_ptr<Thread> t;
-
- // Inject thread creation failures through debug actions if enabled.
- Status debug_action_status = DebugAction(query_options(), "FIS_FAIL_THREAD_CREATION");
- start_finstances_status = !debug_action_status.ok() ? debug_action_status :
- Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
- [this, fis]() { this->ExecFInstance(fis); }, &t, true);
- if (!start_finstances_status.ok()) {
- fis_map_.erase(fis->instance_id());
- // Undo refcnt increments done immediately prior to Thread::Create(). The
- // reference counts were both greater than zero before the increments, so
- // neither of these decrements will free any structures.
- ReleaseBackendResourceRefcount();
- ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
- goto error;
+ for (auto& fragment : fragment_state_map_) {
+ FragmentState* fragment_state = fragment.second;
+ for (const TPlanFragmentInstanceCtx* instance_ctx : fragment_state->instance_ctxs()) {
+ FragmentInstanceState* fis =
+ obj_pool_.Add(new FragmentInstanceState(this, fragment_state, *instance_ctx));
+
+ // start new thread to execute instance
+ refcnt_.Add(1); // decremented in ExecFInstance()
+ AcquireBackendResourceRefcount(); // decremented in ExecFInstance()
+
+ // Add the fragment instance ID to the 'fis_map_'. Has to happen before the thread
+ // is spawned or we may race with users of 'fis_map_'.
+ fis_map_.emplace(fis->instance_id(), fis);
+
+ string thread_name =
+ Substitute("$0 (finst:$1)", FragmentInstanceState::FINST_THREAD_NAME_PREFIX,
+ PrintId(instance_ctx->fragment_instance_id));
+ unique_ptr<Thread> t;
+
+ // Inject thread creation failures through debug actions if enabled.
+ Status debug_action_status =
+ DebugAction(query_options(), "FIS_FAIL_THREAD_CREATION");
+ start_finstances_status = !debug_action_status.ok() ?
+ debug_action_status :
+ Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
+ [this, fis]() { this->ExecFInstance(fis); }, &t, true);
+ if (!start_finstances_status.ok()) {
+ fis_map_.erase(fis->instance_id());
+ // Undo refcnt increments done immediately prior to Thread::Create(). The
+ // reference counts were both greater than zero before the increments, so
+ // neither of these decrements will free any structures.
+ ReleaseBackendResourceRefcount();
+ ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
+ goto error;
+ }
+ t->Detach();
+ --num_unstarted_instances;
}
- t->Detach();
- --num_unstarted_instances;
}
return true;
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index eb6938c..8654d44 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -45,15 +45,20 @@ class RpcContext;
namespace impala {
class ControlServiceProxy;
+class DataSinkConfig;
class DescriptorTbl;
+class FragmentState;
class FragmentInstanceState;
class InitialReservations;
+class LlvmCodeGen;
class MemTracker;
+class PlanNode;
class PublishFilterParamsPB;
class ReservationTracker;
class RuntimeFilterBank;
class RuntimeProfile;
class RuntimeState;
+class ScalarExpr;
class ScannerMemLimiter;
class TmpFileGroup;
class TRuntimeProfileForest;
@@ -240,6 +245,11 @@ class QueryState {
/// Called by a FragmentInstanceState thread to notify that it's done executing.
void DoneExecuting() { discard_result(instances_finished_barrier_->Notify()); }
+ /// Called to notify that an error was encountered during codegen. This is called by the
+ /// first fragment instance thread that invoked its corresponding FragmentState's
+ /// Codegen() and encountered the error.
+ void ErrorDuringFragmentCodegen(const Status& status);
+
/// Called by a fragment instance thread to notify that it hit an error during Prepare()
/// Updates the query status and the failed instance ID if it's not set already.
/// Also notifies anyone waiting on WaitForPrepare() if this is called by the last
@@ -370,11 +380,15 @@ class QueryState {
/// and so is 'failed_instance_id_' if an error is hit.
std::unique_ptr<CountingBarrier> instances_finished_barrier_;
- /// map from instance id to its state (owned by obj_pool_), populated in
+ /// Map from instance id to its state (owned by obj_pool_), populated in
/// StartFInstances(); Not valid to read from until 'instances_prepared_barrier_'
/// is set (i.e. readers should always call WaitForPrepare()).
std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;
+ /// Map from fragment index to its fragment state (owned by obj_pool_), populated in
+ /// StartFInstances();
+ std::unordered_map<TFragmentIdx, FragmentState*> fragment_state_map_;
+
ObjectPool obj_pool_;
AtomicInt32 refcnt_;
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 8da7259..4cb68e3 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -173,24 +173,6 @@ void RuntimeState::Init() {
}
}
-Status RuntimeState::CreateCodegen() {
- if (codegen_.get() != NULL) return Status::OK();
- // TODO: add the fragment ID to the codegen ID as well
- RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(this,
- instance_mem_tracker_, PrintId(fragment_instance_id()), &codegen_));
- codegen_->EnableOptimizations(true);
- profile_->AddChild(codegen_->runtime_profile());
- return Status::OK();
-}
-
-Status RuntimeState::CodegenScalarExprs() {
- for (auto& item : scalar_exprs_to_codegen_) {
- llvm::Function* fn;
- RETURN_IF_ERROR(item.first->GetCodegendComputeFn(codegen_.get(), item.second, &fn));
- }
- return Status::OK();
-}
-
Status RuntimeState::StartSpilling(MemTracker* mem_tracker) {
return query_state_->StartSpilling(this, mem_tracker);
}
@@ -322,9 +304,6 @@ void RuntimeState::ReleaseResources() {
if (resource_pool_ != nullptr) {
ExecEnv::GetInstance()->thread_mgr()->DestroyPool(move(resource_pool_));
}
- // Release any memory associated with codegen.
- if (codegen_ != nullptr) codegen_->Close();
-
// Release the reservation, which should be unused at the point.
if (instance_buffer_reservation_ != nullptr) instance_buffer_reservation_->Close();
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 2735d50..102db23 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -143,64 +143,8 @@ class RuntimeState {
/// Returns runtime state profile
RuntimeProfile* runtime_profile() { return profile_; }
- /// Returns the LlvmCodeGen object for this fragment instance.
- LlvmCodeGen* codegen() { return codegen_.get(); }
-
const std::string& GetEffectiveUser() const;
- /// Add ScalarExpr expression 'expr' to be codegen'd later if it's not disabled by
- /// query option. If 'is_codegen_entry_point' is true, 'expr' will be an entry
- /// point into codegen'd evaluation (i.e. it will have a function pointer populated).
- /// Adding an expr here ensures that it will be codegen'd (i.e. fragment execution
- /// will fail with an error if the expr cannot be codegen'd).
- void AddScalarExprToCodegen(ScalarExpr* expr, bool is_codegen_entry_point) {
- scalar_exprs_to_codegen_.push_back({expr, is_codegen_entry_point});
- }
-
- /// Returns true if there are ScalarExpr expressions in the fragments that we want
- /// to codegen (because they can't be interpreted or based on options/hints).
- /// This should only be used after the Prepare() phase in which all expressions'
- /// Prepare() are invoked.
- bool ScalarExprNeedsCodegen() const { return !scalar_exprs_to_codegen_.empty(); }
-
- /// Check if codegen was disabled and if so, add a message to the runtime profile.
- void CheckAndAddCodegenDisabledMessage(RuntimeProfile* profile) {
- if (CodegenDisabledByQueryOption()) {
- profile->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
- } else if (CodegenDisabledByHint()) {
- profile->AddCodegenMsg(false, "disabled due to optimization hints");
- }
- }
-
- /// Returns true if there is a hint to disable codegen. This can be true for single node
- /// optimization or expression evaluation request from FE to BE (see fe-support.cc).
- /// Note that this internal flag is advisory and it may be ignored if the fragment has
- /// any UDF which cannot be interpreted. See ScalarExpr::Prepare() for details.
- inline bool CodegenHasDisableHint() const {
- return query_ctx().disable_codegen_hint;
- }
-
- /// Returns true iff there is a hint to disable codegen and all expressions in the
- /// fragment can be interpreted. This should only be used after the Prepare() phase
- /// in which all expressions' Prepare() are invoked.
- inline bool CodegenDisabledByHint() const {
- return CodegenHasDisableHint() && !ScalarExprNeedsCodegen();
- }
-
- /// Returns true if codegen is disabled by query option.
- inline bool CodegenDisabledByQueryOption() const {
- return query_options().disable_codegen;
- }
-
- /// Returns true if codegen should be enabled for this fragment. Codegen is enabled
- /// if all the following conditions hold:
- /// 1. it's enabled by query option
- /// 2. it's not disabled by internal hints or there are expressions in the fragment
- /// which cannot be interpreted.
- inline bool ShouldCodegen() const {
- return !CodegenDisabledByQueryOption() && !CodegenDisabledByHint();
- }
-
inline Status GetQueryStatus() {
// Do a racy check for query_status_ to avoid unnecessary spinlock acquisition.
if (UNLIKELY(!query_status_.ok())) {
@@ -309,15 +253,6 @@ class RuntimeState {
/// called after ReleaseResources().
Status CheckQueryState();
- /// Create a codegen object accessible via codegen() if it doesn't exist already.
- Status CreateCodegen();
-
- /// Codegen all ScalarExpr expressions in 'scalar_exprs_to_codegen_'. If codegen fails
- /// for any expressions, return immediately with the error status. Once IMPALA-4233 is
- /// fixed, it's not fatal to fail codegen if the expression can be interpreted.
- /// TODO: Fix IMPALA-4233
- Status CodegenScalarExprs();
-
/// Helper to call QueryState::StartSpilling().
Status StartSpilling(MemTracker* mem_tracker);
@@ -387,12 +322,6 @@ class RuntimeState {
/// instead.
const Timezone* time_zone_for_unix_time_conversions_;
- boost::scoped_ptr<LlvmCodeGen> codegen_;
-
- /// Contains all ScalarExpr expressions which need to be codegen'd. The second element
- /// is true if we want to generate a codegen entry point for this expr.
- std::vector<std::pair<ScalarExpr*, bool>> scalar_exprs_to_codegen_;
-
/// Thread resource management object for this fragment's execution. The runtime
/// state is responsible for returning this pool to the thread mgr.
std::unique_ptr<ThreadResourcePool> resource_pool_;
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 27747d9..f6ef970 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -23,6 +23,7 @@
#include "gutil/strings/substitute.h"
#include "rpc/rpc-mgr.h"
#include "runtime/fragment-instance-state.h"
+#include "runtime/fragment-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-exec-mgr.h"
#include "runtime/query-state.h"
@@ -166,8 +167,10 @@ Status TestEnv::CreateQueryState(
fragment_info.__set_fragment_instance_ctxs(
vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
RETURN_IF_ERROR(qs->Init(&rpc_params, fragment_info));
+ FragmentState* frag_state =
+ qs->obj_pool()->Add(new FragmentState(qs, qs->fragment_info_.fragment_ctxs[0]));
FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(qs,
- qs->fragment_info_.fragment_ctxs[0], qs->fragment_info_.fragment_instance_ctxs[0]));
+ frag_state, qs->fragment_info_.fragment_instance_ctxs[0]));
RuntimeState* rs = qs->obj_pool()->Add(
new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
runtime_states_.push_back(rs);
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 830169f..c7ba623 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -37,6 +37,7 @@
#include "runtime/client-cache.h"
#include "runtime/decimal-value.inline.h"
#include "runtime/exec-env.h"
+#include "runtime/fragment-state.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/lib-cache.h"
#include "runtime/mem-pool.h"
@@ -200,9 +201,13 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
query_ctx.request_pool = "fe-eval-exprs";
RuntimeState state(query_ctx, ExecEnv::GetInstance());
+ TPlanFragmentCtx fragment_ctx;
+ FragmentState fragment_state(state.query_state(), fragment_ctx);
// Make sure to close the runtime state no matter how this scope is exited.
- const auto close_runtime_state =
- MakeScopeExitTrigger([&state]() { state.ReleaseResources(); });
+ const auto close_runtime_state = MakeScopeExitTrigger([&state, &fragment_state]() {
+ fragment_state.ReleaseResources();
+ state.ReleaseResources();
+ });
THROW_IF_ERROR_RET(
jni_frame.push(env), env, JniUtil::internal_exc_class(), result_bytes);
@@ -214,7 +219,7 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
vector<ScalarExprEvaluator*> evals;
for (const TExpr& texpr : texprs) {
ScalarExpr* expr;
- status = ScalarExpr::Create(texpr, RowDescriptor(), &state, &expr);
+ status = ScalarExpr::Create(texpr, RowDescriptor(), &fragment_state, &expr);
if (!status.ok()) goto error;
exprs.push_back(expr);
ScalarExprEvaluator* eval;
@@ -225,12 +230,12 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
}
// UDFs which cannot be interpreted need to be handled by codegen.
- if (state.ScalarExprNeedsCodegen()) {
- status = state.CreateCodegen();
+ if (fragment_state.ScalarExprNeedsCodegen()) {
+ status = fragment_state.CreateCodegen();
if (!status.ok()) goto error;
- LlvmCodeGen* codegen = state.codegen();
+ LlvmCodeGen* codegen = fragment_state.codegen();
DCHECK(codegen != NULL);
- status = state.CodegenScalarExprs();
+ status = fragment_state.CodegenScalarExprs();
if (!status.ok()) goto error;
codegen->EnableOptimizations(false);
status = codegen->FinalizeModule();
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index d970d91..0c820a1 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -170,10 +170,10 @@ class FunctionContextImpl {
int GetConstFnAttr(ConstFnAttr t, int i = -1);
/// Return the function attribute 't' defined in ConstFnAttr above.
- static int GetConstFnAttr(const RuntimeState* state,
+ static int GetConstFnAttr(bool uses_decimal_v2,
const impala_udf::FunctionContext::TypeDesc& return_type,
- const std::vector<impala_udf::FunctionContext::TypeDesc>& arg_types,
- ConstFnAttr t, int i = -1);
+ const std::vector<impala_udf::FunctionContext::TypeDesc>& arg_types, ConstFnAttr t,
+ int i = -1);
/// UDFs may manipulate DecimalVal arguments via SIMD instructions such as 'movaps'
/// that require 16-byte memory alignment.
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 0688805..7865a90 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -558,13 +558,12 @@ static int GetTypeByteSize(const FunctionContext::TypeDesc& type) {
}
int FunctionContextImpl::GetConstFnAttr(FunctionContextImpl::ConstFnAttr t, int i) {
- return GetConstFnAttr(state_, return_type_, arg_types_, t, i);
+ return GetConstFnAttr(state_->decimal_v2(), return_type_, arg_types_, t, i);
}
-int FunctionContextImpl::GetConstFnAttr(const RuntimeState* state,
+int FunctionContextImpl::GetConstFnAttr(bool uses_decimal_v2,
const FunctionContext::TypeDesc& return_type,
- const vector<FunctionContext::TypeDesc>& arg_types,
- ConstFnAttr t, int i) {
+ const vector<FunctionContext::TypeDesc>& arg_types, ConstFnAttr t, int i) {
switch (t) {
case RETURN_TYPE_SIZE:
assert(i == -1);
@@ -592,7 +591,7 @@ int FunctionContextImpl::GetConstFnAttr(const RuntimeState* state,
assert(arg_types[i].type == FunctionContext::TYPE_DECIMAL);
return arg_types[i].scale;
case DECIMAL_V2:
- return state->decimal_v2();
+ return uses_decimal_v2;
default:
assert(false);
return -1;
diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc
index 7c46b59..d960bbe 100644
--- a/be/src/util/tuple-row-compare.cc
+++ b/be/src/util/tuple-row-compare.cc
@@ -23,6 +23,7 @@
#include "codegen/llvm-codegen.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
+#include "runtime/fragment-state.h"
#include "runtime/runtime-state.h"
#include "runtime/multi-precision.h"
#include "util/runtime-profile-counters.h"
@@ -64,7 +65,7 @@ void TupleRowComparator::Close(RuntimeState* state) {
ScalarExprEvaluator::Close(ordering_expr_evals_lhs_, state);
}
-Status TupleRowComparatorConfig::Codegen(RuntimeState* state) {
+Status TupleRowComparatorConfig::Codegen(FragmentState* state) {
if (sorting_order_ == TSortingOrder::ZORDER) {
return Status("Codegen not yet implemented for sorting order: ZORDER");
}
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 59eb5f8..0083527 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -29,6 +29,7 @@
namespace impala {
+class FragmentState;
class RuntimeState;
class ScalarExprEvaluator;
@@ -68,7 +69,7 @@ class TupleRowComparatorConfig {
const TSortInfo& tsort_info, const std::vector<ScalarExpr*>& ordering_exprs);
/// Codegens a Compare() function for this comparator that is used in Compare().
- Status Codegen(RuntimeState* state);
+ Status Codegen(FragmentState* state);
/// Indicates the sorting ordering used. Specified using the SORT BY clause.
TSortingOrder::type sorting_order_;