You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2016/12/11 07:06:15 UTC
[3/4] incubator-impala git commit: IMPALA-4014: Introduce query-wide
execution state.
IMPALA-4014: Introduce query-wide execution state.
This introduces a global structure to coordinate execution
of fragment instances on a backend for a single query.
New classes:
- QueryExecMgr: subsumes FragmentMgr
- QueryState
- FragmentInstanceState: replaces FragmentExecState
Change-Id: I962ae6b7cb7dc0d07fbb8f70317aeb01d88d400b
Reviewed-on: http://gerrit.cloudera.org:8080/4418
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4b2d76db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4b2d76db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4b2d76db
Branch: refs/heads/master
Commit: 4b2d76dbb523c3761a6f53983b635ce88bc67a0c
Parents: 48792eb
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Wed Oct 26 14:02:44 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sun Dec 11 02:29:28 2016 +0000
----------------------------------------------------------------------
be/src/benchmarks/expr-benchmark.cc | 4 +-
be/src/exec/catalog-op-executor.cc | 2 +
be/src/exec/data-source-scan-node.cc | 2 +-
be/src/exec/exchange-node.cc | 1 +
be/src/exec/hash-table-test.cc | 7 +-
be/src/exec/hdfs-scan-node-base.cc | 4 +-
be/src/exec/hdfs-scan-node.cc | 2 +-
be/src/exec/union-node.cc | 2 +-
be/src/exprs/expr-test.cc | 6 +-
be/src/runtime/CMakeLists.txt | 3 +
be/src/runtime/buffered-block-mgr-test.cc | 8 +-
be/src/runtime/buffered-tuple-stream-test.cc | 3 +-
be/src/runtime/coordinator.cc | 150 +++++++++--------
be/src/runtime/coordinator.h | 39 +++--
be/src/runtime/data-stream-test.cc | 16 +-
be/src/runtime/exec-env.cc | 6 +-
be/src/runtime/exec-env.h | 6 +-
be/src/runtime/fragment-instance-state.cc | 165 +++++++++++++++++++
be/src/runtime/fragment-instance-state.h | 118 +++++++++++++
be/src/runtime/plan-fragment-executor.cc | 76 ++++-----
be/src/runtime/plan-fragment-executor.h | 16 +-
be/src/runtime/query-exec-mgr.cc | 165 +++++++++++++++++++
be/src/runtime/query-exec-mgr.h | 78 +++++++++
be/src/runtime/query-state.cc | 69 ++++++++
be/src/runtime/query-state.h | 117 +++++++++++++
be/src/runtime/runtime-filter-bank.cc | 6 +-
be/src/runtime/runtime-state.cc | 99 +++++++----
be/src/runtime/runtime-state.h | 90 +++++-----
be/src/runtime/test-env.cc | 51 +++---
be/src/runtime/test-env.h | 7 +-
be/src/runtime/thread-resource-mgr.h | 2 +
be/src/scheduling/query-schedule.cc | 2 +-
be/src/scheduling/request-pool-service.cc | 2 +-
be/src/scheduling/simple-scheduler.cc | 3 +-
be/src/service/CMakeLists.txt | 3 +-
be/src/service/fe-support.cc | 10 +-
be/src/service/fragment-exec-state.cc | 145 ----------------
be/src/service/fragment-exec-state.h | 105 ------------
be/src/service/fragment-mgr.cc | 154 -----------------
be/src/service/fragment-mgr.h | 89 ----------
be/src/service/impala-beeswax-server.cc | 22 +--
be/src/service/impala-hs2-server.cc | 34 ++--
be/src/service/impala-http-handler.cc | 1 +
be/src/service/impala-internal-service.cc | 103 ++++++++++++
be/src/service/impala-internal-service.h | 54 ++----
be/src/service/impala-server.cc | 9 +-
be/src/service/impala-server.h | 6 +-
be/src/service/query-exec-state.cc | 25 +--
be/src/service/query-exec-state.h | 6 +-
be/src/testutil/desc-tbl-builder.h | 5 +
be/src/udf/udf.cc | 4 +-
be/src/util/container-util.h | 1 +
be/src/util/thread.h | 5 +
be/src/util/uid-util.h | 18 +-
common/thrift/ImpalaInternalService.thrift | 3 +-
.../apache/impala/analysis/AnalysisContext.java | 2 +-
.../org/apache/impala/analysis/Analyzer.java | 2 +-
.../impala/analysis/ColumnLineageGraph.java | 6 +-
.../org/apache/impala/analysis/SelectStmt.java | 2 +-
.../org/apache/impala/planner/HdfsScanNode.java | 2 +-
.../impala/planner/SingleNodePlanner.java | 2 +-
.../org/apache/impala/planner/UnionNode.java | 2 +-
.../org/apache/impala/service/Frontend.java | 18 +-
.../apache/impala/common/FrontendTestBase.java | 2 +-
.../org/apache/impala/planner/PlannerTest.java | 6 +-
.../apache/impala/planner/PlannerTestBase.java | 16 +-
.../org/apache/impala/service/FrontendTest.java | 4 +-
.../org/apache/impala/testutil/TestUtils.java | 4 +-
68 files changed, 1296 insertions(+), 901 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index 444df3a..cda183f 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -73,8 +73,8 @@ class Planner {
Status GeneratePlan(const string& stmt, TExecRequest* result) {
TQueryCtx query_ctx;
- query_ctx.request.stmt = stmt;
- query_ctx.request.query_options = query_options_;
+ query_ctx.client_request.stmt = stmt;
+ query_ctx.client_request.query_options = query_options_;
query_ctx.__set_session(session_state_);
ImpalaServer::PrepareQueryContext(&query_ctx);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 4d39e95..f3aed05 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -22,6 +22,8 @@
#include "exec/incr-stats-util.h"
#include "common/status.h"
#include "runtime/lib-cache.h"
+#include "runtime/client-cache-types.h"
+#include "runtime/exec-env.h"
#include "service/impala-server.h"
#include "service/hs2-util.h"
#include "util/string-parser.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 0faae94..3198a73 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -111,7 +111,7 @@ Status DataSourceScanNode::Open(RuntimeState* state) {
params.__set_query_id(state->query_id());
params.__set_table_name(tuple_desc_->table_desc()->name());
params.__set_init_string(data_src_node_.init_string);
- params.__set_authenticated_user_name(state->effective_user());
+ params.__set_authenticated_user_name(state->GetEffectiveUser());
params.__set_row_schema(row_schema);
params.__set_batch_size(FLAGS_data_source_batch_size);
params.__set_predicates(data_src_node_.accepted_predicates);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 0bbd42b..0f86339 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -23,6 +23,7 @@
#include "runtime/data-stream-recvr.h"
#include "runtime/runtime-state.h"
#include "runtime/row-batch.h"
+#include "runtime/exec-env.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"
#include "util/time.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index a539bb1..9f428f1 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -185,8 +185,8 @@ class HashTableTest : public testing::Test {
bool CreateHashTable(bool quadratic, int64_t initial_num_buckets,
scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024,
int max_num_blocks = 100, int reserved_blocks = 10) {
- EXPECT_OK(test_env_->CreatePerQueryState(
- next_query_id_++, max_num_blocks, block_size, &runtime_state_));
+ EXPECT_OK(test_env_->CreateQueryState(
+ next_query_id_++, max_num_blocks, block_size, nullptr, &runtime_state_));
MemTracker* client_tracker = pool_.Add(
new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
BufferedBlockMgr::Client* client;
@@ -604,7 +604,8 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
// Test that hashing empty string updates hash value.
TEST_F(HashTableTest, HashEmpty) {
EXPECT_TRUE(
- test_env_->CreatePerQueryState(0, 100, 8 * 1024 * 1024, &runtime_state_).ok());
+ test_env_->CreateQueryState(
+ 0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_).ok());
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
probe_expr_ctxs_, false /* !stores_nulls_ */,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index f7677eb..738ef36 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -235,7 +235,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
// TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
<< " partition_id=" << split.partition_id
- << "\n" << PrintThrift(state->fragment_params());
+ << "\n" << PrintThrift(state->instance_ctx());
return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
" Try rerunning the query.");
}
@@ -367,7 +367,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
<< " partition_id=" << partition_id
- << "\n" << PrintThrift(state->fragment_params());
+ << "\n" << PrintThrift(state->instance_ctx());
partition_template_tuple_map_[partition_id] = InitTemplateTuple(
partition_desc->partition_key_value_ctxs(), scan_node_pool_.get(), state);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 305fa28..03217e0 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -501,7 +501,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
DCHECK(partition != NULL) << "table_id=" << hdfs_table_->id()
<< " partition_id=" << partition_id
- << "\n" << PrintThrift(runtime_state_->fragment_params());
+ << "\n" << PrintThrift(runtime_state_->instance_ctx());
// IMPALA-3798: Filtering before the scanner is created can cause hangs if a header
// split is filtered out, for sequence-based file formats. If the scanner does not
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index f4d3b69..5457737 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -172,7 +172,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
}
// Only evaluate the const expr lists by the first fragment instance.
- if (state->fragment_ctx().per_fragment_instance_idx == 0) {
+ if (state->instance_ctx().per_fragment_instance_idx == 0) {
// Evaluate and materialize the const expr lists exactly once.
while (const_expr_list_idx_ < const_expr_lists_.size()) {
MaterializeExprs(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index f439aa9..4185ea1 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -41,6 +41,7 @@
#include "gen-cpp/hive_metastore_types.h"
#include "rpc/thrift-client.h"
#include "rpc/thrift-server.h"
+#include "runtime/runtime-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.inline.h"
#include "runtime/string-value.h"
@@ -52,6 +53,7 @@
#include "util/debug-util.h"
#include "util/string-parser.h"
#include "util/test-info.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
#include "common/names.h"
@@ -1025,7 +1027,7 @@ template <typename T> void TestSingleLiteralConstruction(
const ColumnType& type, const T& value, const string& string_val) {
ObjectPool pool;
RowDescriptor desc;
- RuntimeState state(TExecPlanFragmentParams(), NULL);
+ RuntimeState state{TQueryCtx()};
MemTracker tracker;
Expr* expr = pool.Add(new Literal(type, value));
@@ -1041,7 +1043,7 @@ TEST_F(ExprTest, NullLiteral) {
for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) {
NullLiteral expr(static_cast<PrimitiveType>(type));
ExprContext ctx(&expr);
- RuntimeState state(TExecPlanFragmentParams(), NULL);
+ RuntimeState state{TQueryCtx()};
MemTracker tracker;
EXPECT_OK(ctx.Prepare(&state, RowDescriptor(), &tracker));
EXPECT_OK(ctx.Open(&state));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 6602f07..640ab39 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -38,6 +38,7 @@ add_library(Runtime
disk-io-mgr-stress.cc
exec-env.cc
free-pool.cc
+ fragment-instance-state.cc
hbase-table.cc
hbase-table-factory.cc
hdfs-fs-cache.cc
@@ -47,6 +48,8 @@ add_library(Runtime
multi-precision.cc
parallel-executor.cc
plan-fragment-executor.cc
+ query-exec-mgr.cc
+ query-state.cc
test-env.cc
types.cc
raw-value.cc
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index 5e99c09..1828ff8 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -142,8 +142,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int block_size,
RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) {
RuntimeState* state;
- EXPECT_OK(test_env_->CreatePerQueryState(
- query_id, max_buffers, block_size, &state, query_options));
+ EXPECT_OK(test_env_->CreateQueryState(
+ query_id, max_buffers, block_size, query_options, &state));
if (query_state != NULL) *query_state = state;
return state->block_mgr();
}
@@ -558,8 +558,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
thread_group workers;
// Create a shared RuntimeState with no BufferedBlockMgr.
RuntimeState* shared_state =
- new RuntimeState(TExecPlanFragmentParams(), test_env_->exec_env());
- shared_state->InitMemTrackers(TUniqueId(), NULL, -1);
+ new RuntimeState(TQueryCtx(), test_env_->exec_env());
+ shared_state->InitMemTrackers(NULL, -1);
for (int i = 0; i < num_threads; ++i) {
thread* t = new thread(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index e98c334..1d36064 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -101,7 +101,8 @@ class SimpleTupleStreamTest : public testing::Test {
/// Setup a block manager with the provided settings and client with no reservation,
/// tracked by tracker_.
void InitBlockMgr(int64_t limit, int block_size) {
- ASSERT_OK(test_env_->CreatePerQueryState(0, limit, block_size, &runtime_state_));
+ ASSERT_OK(
+ test_env_->CreateQueryState(0, limit, block_size, nullptr, &runtime_state_));
MemTracker* client_tracker = pool_.Add(
new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
ASSERT_OK(runtime_state_->block_mgr()->RegisterClient(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 651d349..0cfb340 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -58,9 +58,11 @@
#include "runtime/parallel-executor.h"
#include "runtime/plan-fragment-executor.h"
#include "runtime/row-batch.h"
+#include "runtime/query-exec-mgr.h"
+#include "runtime/query-state.h"
+#include "runtime/fragment-instance-state.h"
#include "runtime/tuple-row.h"
#include "scheduling/scheduler.h"
-#include "service/fragment-exec-state.h"
#include "util/bloom-filter.h"
#include "util/container-util.h"
#include "util/counting-barrier.h"
@@ -123,9 +125,9 @@ struct DebugOptions {
///
/// Concurrent accesses:
/// - updates through UpdateFragmentExecStatus()
-class Coordinator::FragmentInstanceState {
+class Coordinator::InstanceState {
public:
- FragmentInstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool)
+ InstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool)
: exec_params_(params),
total_split_size_(0),
profile_(nullptr),
@@ -333,7 +335,7 @@ class Coordinator::FilterState {
};
-void Coordinator::FragmentInstanceState::ComputeTotalSplitSize(
+void Coordinator::InstanceState::ComputeTotalSplitSize(
const PerNodeScanRanges& per_node_scan_ranges) {
total_split_size_ = 0;
@@ -345,7 +347,7 @@ void Coordinator::FragmentInstanceState::ComputeTotalSplitSize(
}
}
-int64_t Coordinator::FragmentInstanceState::UpdateNumScanRangesCompleted() {
+int64_t Coordinator::InstanceState::UpdateNumScanRangesCompleted() {
int64_t total = 0;
CounterMap& complete = aggregate_counters_.scan_ranges_complete_counters;
for (CounterMap::iterator i = complete.begin(); i != complete.end(); ++i) {
@@ -382,6 +384,10 @@ Coordinator::~Coordinator() {
query_mem_tracker_.reset();
}
+PlanFragmentExecutor* Coordinator::executor() {
+ return coord_instance_->executor();
+}
+
TExecNodePhase::type GetExecNodePhase(const string& key) {
map<int, const char*>::const_iterator entry =
_TExecNodePhase_VALUES_TO_NAMES.begin();
@@ -439,7 +445,7 @@ Status Coordinator::Exec() {
if (needs_finalization_) finalize_params_ = request.finalize_params;
VLOG_QUERY << "Exec() query_id=" << schedule_.query_id()
- << " stmt=" << request.query_ctx.request.stmt;
+ << " stmt=" << request.query_ctx.client_request.stmt;
stmt_type_ = request.stmt_type;
query_id_ = schedule_.query_id();
desc_tbl_ = request.desc_tbl;
@@ -457,7 +463,7 @@ Status Coordinator::Exec() {
progress_.Init(str, schedule_.num_scan_ranges());
// runtime filters not yet supported for mt execution
- bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
+ bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0;
if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
// to keep things simple, make async Cancel() calls wait until plan fragment
@@ -468,9 +474,9 @@ Status Coordinator::Exec() {
// The coordinator may require a query mem tracker for result-caching, which tracks
// memory via the query mem tracker.
int64_t query_limit = -1;
- if (query_ctx_.request.query_options.__isset.mem_limit
- && query_ctx_.request.query_options.mem_limit > 0) {
- query_limit = query_ctx_.request.query_options.mem_limit;
+ if (query_ctx_.client_request.query_options.__isset.mem_limit
+ && query_ctx_.client_request.query_options.mem_limit > 0) {
+ query_limit = query_ctx_.client_request.query_options.mem_limit;
}
MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
schedule_.request_pool(), exec_env_->process_mem_tracker());
@@ -484,38 +490,42 @@ Status Coordinator::Exec() {
InitExecSummary();
StartFInstances();
- // In the error case, it's safe to return and not to get root_sink_ here to close - if
+ // In the error case, it's safe to return and not to get coord_sink_ here to close - if
// there was an error, but the coordinator fragment was successfully started, it should
// cancel itself when it receives an error status after reporting its profile.
RETURN_IF_ERROR(FinishInstanceStartup());
// Grab executor and wait until Prepare() has finished so that runtime state etc. will
- // be set up. Must do this here in order to get a reference to root_fragment_instance_
- // so that root_sink_ remains valid throughout query lifetime.
+ // be set up. Must do this here in order to get a reference to coord_instance_
+ // so that coord_sink_ remains valid throughout query lifetime.
if (schedule_.GetCoordFragment() != nullptr) {
- // Coordinator fragment instance has same ID as query.
- root_fragment_instance_ =
- ExecEnv::GetInstance()->fragment_mgr()->GetFragmentExecState(query_id_);
- // Fragment instance might have been failed and unregistered itself even though it was
- // successfully started (e.g. Prepare() might have failed).
- if (root_fragment_instance_.get() == nullptr) {
- FragmentInstanceState* root_state = fragment_instance_states_[0];
- DCHECK(root_state != nullptr);
- lock_guard<mutex> instance_state_lock(*root_state->lock());
+ QueryState* qs = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id_);
+ if (qs != nullptr) coord_instance_ = qs->GetFInstanceState(query_id_);
+ if (coord_instance_ == nullptr) {
+ // Coordinator instance might have failed and unregistered itself even
+ // though it was successfully started (e.g. Prepare() might have failed).
+ if (qs != nullptr) {
+ ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(qs);
+ qs = nullptr;
+ }
+ InstanceState* coord_state = fragment_instance_states_[0];
+ DCHECK(coord_state != nullptr);
+ lock_guard<mutex> instance_state_lock(*coord_state->lock());
// Try and return the fragment instance status if it was already set.
- // TODO: Consider waiting for root_state->done() here.
- RETURN_IF_ERROR(*root_state->status());
- return Status(Substitute("Root fragment instance ($0) failed", PrintId(query_id_)));
+ // TODO: Consider waiting for coord_state->done() here.
+ RETURN_IF_ERROR(*coord_state->status());
+ return Status(
+ Substitute("Coordinator fragment instance ($0) failed", PrintId(query_id_)));
}
- executor_ = root_fragment_instance_->executor();
+
// When WaitForPrepare() returns OK(), the executor's root sink will be set up. At
// that point, the coordinator must be sure to call root_sink()->CloseConsumer(); the
// fragment instance's executor will not complete until that point.
// TODO: Consider moving this to Wait().
- Status prepare_status = executor_->WaitForPrepare();
- root_sink_ = executor_->root_sink();
+ Status prepare_status = executor()->WaitForPrepare();
+ coord_sink_ = executor()->root_sink();
RETURN_IF_ERROR(prepare_status);
- DCHECK(root_sink_ != nullptr);
+ DCHECK(coord_sink_ != nullptr);
}
PrintFragmentInstanceInfo();
@@ -523,7 +533,7 @@ Status Coordinator::Exec() {
}
void Coordinator::UpdateFilterRoutingTable(const FragmentExecParams& fragment_params) {
- DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0);
+ DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0);
int num_hosts = fragment_params.instance_exec_params.size();
DCHECK_GT(num_hosts, 0);
DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
@@ -614,8 +624,8 @@ void Coordinator::StartFInstances() {
num_instances += fragment_params.instance_exec_params.size();
for (const FInstanceExecParams& instance_params:
fragment_params.instance_exec_params) {
- FragmentInstanceState* exec_state = obj_pool()->Add(
- new FragmentInstanceState(instance_params, obj_pool()));
+ InstanceState* exec_state = obj_pool()->Add(
+ new InstanceState(instance_params, obj_pool()));
int instance_state_idx = GetInstanceIdx(instance_params.instance_id);
fragment_instance_states_[instance_state_idx] = exec_state;
@@ -638,7 +648,7 @@ Status Coordinator::FinishInstanceStartup() {
const TMetricDef& def =
MakeTMetricDef("fragment-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS);
HistogramMetric latencies(def, 20000, 3);
- for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+ for (InstanceState* exec_state: fragment_instance_states_) {
lock_guard<mutex> l(*exec_state->lock());
// Preserve the first non-OK status, if there is one
if (status.ok()) status = *exec_state->status();
@@ -859,7 +869,7 @@ Status Coordinator::FinalizeSuccessfulInsert() {
HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
DCHECK(part != NULL) << "table_id=" << hdfs_table->id()
<< " partition_id=" << partition.second.id
- << "\n" << PrintThrift(runtime_state()->fragment_params());
+ << "\n" << PrintThrift(runtime_state()->instance_ctx());
part_path_ss << part->location();
}
const string& part_path = part_path_ss.str();
@@ -924,7 +934,8 @@ Status Coordinator::FinalizeSuccessfulInsert() {
partition_create_ops.Add(CREATE_DIR, part_path);
}
}
- } else if (!is_s3_path || !query_ctx_.request.query_options.s3_skip_insert_staging) {
+ } else if (!is_s3_path
+ || !query_ctx_.client_request.query_options.s3_skip_insert_staging) {
// If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
// would have already been created by the table sinks.
if (FLAGS_insert_inherit_permissions && !is_s3_path) {
@@ -1071,8 +1082,8 @@ Status Coordinator::Wait() {
has_called_wait_ = true;
if (stmt_type_ == TStmtType::QUERY) {
- DCHECK(executor_ != nullptr);
- return UpdateStatus(executor_->WaitForOpen(), runtime_state()->fragment_instance_id(),
+ DCHECK(executor() != nullptr);
+ return UpdateStatus(executor()->WaitForOpen(), runtime_state()->fragment_instance_id(),
FLAGS_hostname);
}
@@ -1106,15 +1117,15 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
if (returned_all_results_) {
// May be called after the first time we set *eos. Re-set *eos and return here;
- // already torn-down root_sink_ so no more work to do.
+ // already torn-down coord_sink_ so no more work to do.
*eos = true;
return Status::OK();
}
- DCHECK(root_sink_ != nullptr)
+ DCHECK(coord_sink_ != nullptr)
<< "GetNext() called without result sink. Perhaps Prepare() failed and was not "
<< "checked?";
- Status status = root_sink_->GetNext(runtime_state(), results, max_rows, eos);
+ Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos);
// if there was an error, we need to return the query's error status rather than
// the status we just got back from the local executor (which may well be CANCELLED
@@ -1126,8 +1137,8 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
returned_all_results_ = true;
// Trigger tear-down of coordinator fragment by closing the consumer. Must do before
// WaitForAllInstances().
- root_sink_->CloseConsumer();
- root_sink_ = nullptr;
+ coord_sink_->CloseConsumer();
+ coord_sink_ = nullptr;
// Don't return final NULL until all instances have completed. GetNext must wait for
// all instances to complete before ultimately signalling the end of execution via a
@@ -1148,12 +1159,12 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
}
void Coordinator::PrintFragmentInstanceInfo() {
- for (FragmentInstanceState* state: fragment_instance_states_) {
+ for (InstanceState* state: fragment_instance_states_) {
SummaryStats& acc = fragment_profiles_[state->fragment_idx()].bytes_assigned;
acc(state->total_split_size());
}
- for (int id = (executor_ == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) {
+ for (int id = (executor() == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) {
SummaryStats& acc = fragment_profiles_[id].bytes_assigned;
double min = accumulators::min(acc);
double max = accumulators::max(acc);
@@ -1168,7 +1179,7 @@ void Coordinator::PrintFragmentInstanceInfo() {
if (VLOG_FILE_IS_ON) {
VLOG_FILE << "Byte split for fragment " << id << " " << ss.str();
- for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+ for (InstanceState* exec_state: fragment_instance_states_) {
if (exec_state->fragment_idx() != id) continue;
VLOG_FILE << "data volume for ipaddress " << exec_state << ": "
<< PrettyPrinter::Print(exec_state->total_split_size(), TUnit::BYTES);
@@ -1284,7 +1295,7 @@ void Coordinator::ExecRemoteFInstance(
rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase);
}
int instance_state_idx = GetInstanceIdx(exec_params.instance_id);
- FragmentInstanceState* exec_state = fragment_instance_states_[instance_state_idx];
+ InstanceState* exec_state = fragment_instance_states_[instance_state_idx];
exec_state->ComputeTotalSplitSize(
rpc_params.fragment_instance_ctx.per_node_scan_ranges);
VLOG_FILE << "making rpc: ExecPlanFragment"
@@ -1357,7 +1368,7 @@ void Coordinator::CancelInternal() {
void Coordinator::CancelFragmentInstances() {
int num_cancelled = 0;
- for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+ for (InstanceState* exec_state: fragment_instance_states_) {
DCHECK(exec_state != nullptr);
// lock each exec_state individually to synchronize correctly with
@@ -1433,7 +1444,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
Substitute("Unknown fragment instance index $0 (max known: $1)",
instance_state_idx, fragment_instance_states_.size() - 1));
}
- FragmentInstanceState* exec_state = fragment_instance_states_[instance_state_idx];
+ InstanceState* exec_state = fragment_instance_states_[instance_state_idx];
const TRuntimeProfileTree& cumulative_profile = params.profile;
Status status(params.status);
@@ -1507,9 +1518,8 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
if (VLOG_FILE_IS_ON) {
stringstream s;
exec_state->profile()->PrettyPrint(&s);
- VLOG_FILE << "profile for query_id=" << query_id_
- << " instance_id=" << exec_state->fragment_instance_id()
- << "\n" << s.str();
+ VLOG_FILE << "profile for instance_id=" << exec_state->fragment_instance_id()
+ << "\n" << s.str();
}
// also print the cumulative profile
// TODO: fix the coordinator/PlanFragmentExecutor, so this isn't needed
@@ -1533,14 +1543,14 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
lock_guard<mutex> l(lock_);
exec_state->stopwatch()->Stop();
DCHECK_GT(num_remaining_fragment_instances_, 0);
- VLOG_QUERY << "Fragment instance completed: "
+ VLOG_QUERY << "Fragment instance completed:"
<< " id=" << PrintId(exec_state->fragment_instance_id())
<< " host=" << exec_state->impalad_address()
<< " remaining=" << num_remaining_fragment_instances_ - 1;
if (VLOG_QUERY_IS_ON && num_remaining_fragment_instances_ > 1) {
// print host/port info for the first backend that's still in progress as a
// debugging aid for backend deadlocks
- for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+ for (InstanceState* exec_state: fragment_instance_states_) {
lock_guard<mutex> l2(*exec_state->lock());
if (!exec_state->done()) {
VLOG_QUERY << "query_id=" << query_id_ << ": first in-progress backend: "
@@ -1567,7 +1577,7 @@ uint64_t Coordinator::GetLatestKuduInsertTimestamp() const {
}
RuntimeState* Coordinator::runtime_state() {
- return executor_ == NULL ? NULL : executor_->runtime_state();
+ return executor() == NULL ? NULL : executor()->runtime_state();
}
MemTracker* Coordinator::query_mem_tracker() {
@@ -1595,7 +1605,7 @@ typedef struct {
}
} InstanceComparator;
-void Coordinator::UpdateAverageProfile(FragmentInstanceState* instance_state) {
+void Coordinator::UpdateAverageProfile(InstanceState* instance_state) {
FragmentIdx fragment_idx = instance_state->fragment_idx();
DCHECK_GE(fragment_idx, 0);
DCHECK_LT(fragment_idx, fragment_profiles_.size());
@@ -1608,7 +1618,7 @@ void Coordinator::UpdateAverageProfile(FragmentInstanceState* instance_state) {
data->root_profile->AddChild(instance_state->profile());
}
-void Coordinator::ComputeFragmentSummaryStats(FragmentInstanceState* instance_state) {
+void Coordinator::ComputeFragmentSummaryStats(InstanceState* instance_state) {
FragmentIdx fragment_idx = instance_state->fragment_idx();
DCHECK_GE(fragment_idx, 0);
DCHECK_LT(fragment_idx, fragment_profiles_.size());
@@ -1625,7 +1635,7 @@ void Coordinator::ComputeFragmentSummaryStats(FragmentInstanceState* instance_st
data->root_profile->AddChild(instance_state->profile());
}
-void Coordinator::UpdateExecSummary(const FragmentInstanceState& instance_state) {
+void Coordinator::UpdateExecSummary(const InstanceState& instance_state) {
vector<RuntimeProfile*> children;
instance_state.profile()->GetAllChildren(&children);
@@ -1667,12 +1677,12 @@ void Coordinator::ReportQuerySummary() {
if (!fragment_instance_states_.empty()) {
// Average all fragment instances for each fragment.
- for (FragmentInstanceState* state: fragment_instance_states_) {
+ for (InstanceState* state: fragment_instance_states_) {
state->profile()->ComputeTimeInProfile();
UpdateAverageProfile(state);
// Skip coordinator fragment, if one exists.
// TODO: Can we remove the special casing here?
- if (executor_ == nullptr || state->fragment_idx() != 0) {
+ if (coord_instance_ == nullptr || state->fragment_idx() != 0) {
ComputeFragmentSummaryStats(state);
}
UpdateExecSummary(*state);
@@ -1680,7 +1690,7 @@ void Coordinator::ReportQuerySummary() {
InstanceComparator comparator;
// Per fragment instances have been collected, output summaries
- for (int i = (executor_ != NULL ? 1 : 0); i < fragment_profiles_.size(); ++i) {
+ for (int i = (executor() != NULL ? 1 : 0); i < fragment_profiles_.size(); ++i) {
fragment_profiles_[i].root_profile->SortChildren(comparator);
SummaryStats& completion_times = fragment_profiles_[i].completion_times;
SummaryStats& rates = fragment_profiles_[i].rates;
@@ -1719,7 +1729,7 @@ void Coordinator::ReportQuerySummary() {
// Map from Impalad address to peak memory usage of this query
typedef unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage;
PerNodePeakMemoryUsage per_node_peak_mem_usage;
- for (FragmentInstanceState* state: fragment_instance_states_) {
+ for (InstanceState* state: fragment_instance_states_) {
int64_t initial_usage = 0;
int64_t* mem_usage = FindOrInsert(&per_node_peak_mem_usage,
state->impalad_address(), initial_usage);
@@ -1740,7 +1750,7 @@ void Coordinator::ReportQuerySummary() {
string Coordinator::GetErrorLog() {
ErrorLogMap merged;
- for (FragmentInstanceState* state: fragment_instance_states_) {
+ for (InstanceState* state: fragment_instance_states_) {
lock_guard<mutex> l(*state->lock());
if (state->error_log()->size() > 0) MergeErrorMaps(&merged, *state->error_log());
}
@@ -1760,7 +1770,7 @@ void Coordinator::SetExecPlanFragmentParams(
// Remove filters that weren't selected during filter routing table construction.
if (filter_mode_ != TRuntimeFilterMode::OFF) {
- DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0);
+ DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0);
int instance_idx = GetInstanceIdx(params.instance_id);
for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) {
if (plan_node.__isset.runtime_filters) {
@@ -1893,6 +1903,8 @@ void DistributeFilters(shared_ptr<TPublishFilterParams> params,
}
+// TODO: call this as soon as it's clear that we won't reference the state
+// anymore, ie, in CancelInternal() and when GetNext() hits eos
void Coordinator::TearDown() {
DCHECK(!torn_down_) << "Coordinator::TearDown() may not be called twice";
torn_down_ = true;
@@ -1909,7 +1921,15 @@ void Coordinator::TearDown() {
}
// Need to protect against failed Prepare(), where root_sink() would not be set.
- if (root_sink_ != nullptr) root_sink_->CloseConsumer();
+ if (coord_sink_ != nullptr) {
+ coord_sink_->CloseConsumer();
+ coord_sink_ = nullptr;
+ }
+ if (coord_instance_ != nullptr) {
+ ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(
+ coord_instance_->query_state());
+ coord_instance_ = nullptr;
+ }
}
void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
@@ -1987,7 +2007,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
rpc_params->filter_id = params.filter_id;
for (int target_idx: target_fragment_instance_state_idxs) {
- FragmentInstanceState* fragment_inst = fragment_instance_states_[target_idx];
+ InstanceState* fragment_inst = fragment_instance_states_[target_idx];
DCHECK(fragment_inst != NULL) << "Missing fragment instance: " << target_idx;
exec_env_->rpc_pool()->Offer(bind<void>(DistributeFilters, rpc_params,
fragment_inst->impalad_address(), fragment_inst->fragment_instance_id()));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index d53f16c..e1334db 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -38,13 +38,11 @@
#include "common/status.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/Types_types.h"
-#include "runtime/runtime-state.h"
-#include "scheduling/simple-scheduler.h"
-#include "service/fragment-exec-state.h"
-#include "service/fragment-mgr.h"
#include "util/histogram-metric.h"
#include "util/progress-updater.h"
#include "util/runtime-profile.h"
+#include "scheduling/query-schedule.h"
+#include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle
namespace impala {
@@ -69,6 +67,9 @@ class RuntimeProfile;
class TablePrinter;
class TPlanFragment;
class QueryResultSet;
+class MemTracker;
+class PlanRootSink;
+class FragmentInstanceState;
struct DebugOptions;
@@ -212,7 +213,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
void TearDown();
private:
- class FragmentInstanceState;
+ class InstanceState;
struct FilterTarget;
class FilterState;
@@ -249,10 +250,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
CounterMap scan_ranges_complete_counters;
};
- /// FragmentInstanceStates for all fragment instances, including that of the coordinator
+ /// InstanceStates for all fragment instances, including that of the coordinator
/// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in
/// StartFInstances().
- std::vector<FragmentInstanceState*> fragment_instance_states_;
+ std::vector<InstanceState*> fragment_instance_states_;
/// True if the query needs a post-execution step to tidy up
bool needs_finalization_;
@@ -273,7 +274,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// time.
/// Lock ordering is
/// 1. lock_
- /// 2. FragmentInstanceState::lock_
+ /// 2. InstanceState::lock_
boost::mutex lock_;
/// Overall status of the entire query; set to the first reported fragment error
@@ -294,13 +295,13 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Result rows are materialized by this fragment instance in its own thread. They are
/// materialized into a QueryResultSet provided to the coordinator during GetNext().
///
- /// Created during fragment instance start-up by FragmentExecState and set here in
- /// Exec(). Keep a shared_ptr reference to the fragment state so that root_sink_ will
- /// be valid for the lifetime of the coordinator. This is important if, for example, the
- /// fragment instance is cancelled while the coordinator is calling GetNext().
- std::shared_ptr<FragmentMgr::FragmentExecState> root_fragment_instance_;
- PlanFragmentExecutor* executor_ = nullptr;
- PlanRootSink* root_sink_ = nullptr;
+ /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
+ /// reference of QueryState released) in TearDown().
+ FragmentInstanceState* coord_instance_ = nullptr;
+
+ /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when
+ /// GetNext() hits eos.
+ PlanRootSink* coord_sink_ = nullptr;
/// Query mem tracker for this coordinator initialized in Exec(). Only valid if there
/// is no coordinator fragment (i.e. executor_ == NULL). If executor_ is not NULL,
@@ -314,6 +315,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Returns a local object pool.
ObjectPool* obj_pool() { return obj_pool_.get(); }
+ PlanFragmentExecutor* executor();
+
// Sets the TDescriptorTable(s) for the current fragment.
void SetExecPlanDescriptorTable(const TPlanFragment& fragment,
TExecPlanFragmentParams* rpc_params);
@@ -501,11 +504,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
void InitExecSummary();
/// Update fragment profile information from a fragment instance state.
- void UpdateAverageProfile(FragmentInstanceState* fragment_instance_state);
+ void UpdateAverageProfile(InstanceState* instance_state);
/// Compute the summary stats (completion_time and rates)
/// for an individual fragment_profile_ based on the specified instance state.
- void ComputeFragmentSummaryStats(FragmentInstanceState* fragment_instance_state);
+ void ComputeFragmentSummaryStats(InstanceState* instance_state);
/// Outputs aggregate query profile summary. This is assumed to be called at the end of
/// a query -- remote fragments' profiles must not be updated while this is running.
@@ -513,7 +516,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Populates the summary execution stats from the profile. Can only be called when the
/// query is done.
- void UpdateExecSummary(const FragmentInstanceState& instance_state);
+ void UpdateExecSummary(const InstanceState& instance_state);
/// Determines what the permissions of directories created by INSERT statements should
/// be if permission inheritance is enabled. Populates a map from all prefixes of
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index d53a146..c76afb5 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -28,6 +28,7 @@
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/data-stream-mgr.h"
+#include "runtime/exec-env.h"
#include "runtime/data-stream-sender.h"
#include "runtime/data-stream-recvr.h"
#include "runtime/descriptors.h"
@@ -112,10 +113,12 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf {
class DataStreamTest : public testing::Test {
protected:
- DataStreamTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_), next_val_(0) {
+ DataStreamTest()
+ : runtime_state_(TQueryCtx(), &exec_env_),
+ next_val_(0) {
// Initialize Mem trackers for use by the data stream receiver.
exec_env_.InitForFeTests();
- runtime_state_.InitMemTrackers(TUniqueId(), NULL, -1);
+ runtime_state_.InitMemTrackers(NULL, -1);
// Stop tests that rely on mismatched sender / receiver pairs timing out from failing.
FLAGS_datastream_sender_timeout_ms = 250;
@@ -480,9 +483,9 @@ class DataStreamTest : public testing::Test {
void Sender(int sender_num, int channel_buffer_size,
TPartitionType::type partition_type) {
- RuntimeState state(TExecPlanFragmentParams(), &exec_env_);
+ RuntimeState state(TQueryCtx(), &exec_env_);
state.set_desc_tbl(desc_tbl_);
- state.InitMemTrackers(TUniqueId(), NULL, -1);
+ state.InitMemTrackers(NULL, -1);
VLOG_QUERY << "create sender " << sender_num;
const TDataStreamSink& sink = GetSink(partition_type);
DataStreamSender sender(
@@ -593,9 +596,8 @@ TEST_F(DataStreamTest, BasicTest) {
//
// TODO: Make lifecycle requirements more explicit.
TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
- scoped_ptr<RuntimeState> runtime_state(
- new RuntimeState(TExecPlanFragmentParams(), &exec_env_));
- runtime_state->InitMemTrackers(TUniqueId(), NULL, -1);
+ scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_));
+ runtime_state->InitMemTrackers(NULL, -1);
scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver"));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index db0d242..ad996e3 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -36,10 +36,10 @@
#include "runtime/lib-cache.h"
#include "runtime/mem-tracker.h"
#include "runtime/thread-resource-mgr.h"
+#include "runtime/query-exec-mgr.h"
#include "runtime/tmp-file-mgr.h"
#include "scheduling/request-pool-service.h"
#include "scheduling/simple-scheduler.h"
-#include "service/fragment-mgr.h"
#include "service/frontend.h"
#include "statestore/statestore-subscriber.h"
#include "util/debug-util.h"
@@ -146,7 +146,7 @@ ExecEnv::ExecEnv()
fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc",
"worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
- fragment_mgr_(new FragmentMgr()),
+ query_exec_mgr_(new QueryExecMgr()),
enable_webserver_(FLAGS_enable_webserver),
is_fe_tests_(false),
backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
@@ -199,7 +199,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc",
"worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
- fragment_mgr_(new FragmentMgr()),
+ query_exec_mgr_(new QueryExecMgr()),
enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
is_fe_tests_(false),
backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index d4cdf24..08ddd9f 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -31,7 +31,7 @@ namespace impala {
class CallableThreadPool;
class DataStreamMgr;
class DiskIoMgr;
-class FragmentMgr;
+class QueryExecMgr;
class Frontend;
class HBaseTableFactory;
class HdfsFsCache;
@@ -96,7 +96,7 @@ class ExecEnv {
Frontend* frontend() { return frontend_.get(); }
RequestPoolService* request_pool_service() { return request_pool_service_.get(); }
CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
- FragmentMgr* fragment_mgr() { return fragment_mgr_.get(); }
+ QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
@@ -138,7 +138,7 @@ class ExecEnv {
boost::scoped_ptr<Frontend> frontend_;
boost::scoped_ptr<CallableThreadPool> fragment_exec_thread_pool_;
boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
- boost::scoped_ptr<FragmentMgr> fragment_mgr_;
+ boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
/// Not owned by this class
ImpalaServer* impala_server_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
new file mode 100644
index 0000000..5dcd4c6
--- /dev/null
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include "runtime/fragment-instance-state.h"
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+
+#include "runtime/exec-env.h"
+#include "runtime/backend-client.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/client-cache.h"
+#include "runtime/runtime-state.h"
+#include "runtime/query-state.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+
+using namespace impala;
+
+FragmentInstanceState::FragmentInstanceState(
+ QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& desc_tbl)
+ : query_state_(query_state),
+ fragment_ctx_(fragment_ctx),
+ instance_ctx_(instance_ctx),
+ desc_tbl_(desc_tbl),
+ executor_(
+ [this](const Status& status, RuntimeProfile* profile, bool done) {
+ ReportStatusCb(status, profile, done);
+ }) {
+}
+
+Status FragmentInstanceState::UpdateStatus(const Status& status) {
+ lock_guard<mutex> l(status_lock_);
+ if (!status.ok() && exec_status_.ok()) exec_status_ = status;
+ return exec_status_;
+}
+
+Status FragmentInstanceState::Cancel() {
+ lock_guard<mutex> l(status_lock_);
+ RETURN_IF_ERROR(exec_status_);
+ executor_.Cancel();
+ return Status::OK();
+}
+
+void FragmentInstanceState::Exec() {
+ Status status =
+ executor_.Prepare(query_state_, desc_tbl_, fragment_ctx_, instance_ctx_);
+ prepare_promise_.Set(status);
+ if (status.ok()) {
+ if (executor_.Open().ok()) {
+ executor_.Exec();
+ }
+ }
+ executor_.Close();
+}
+
+void FragmentInstanceState::ReportStatusCb(
+ const Status& status, RuntimeProfile* profile, bool done) {
+ DCHECK(status.ok() || done); // if !status.ok() => done
+ Status exec_status = UpdateStatus(status);
+
+ Status coord_status;
+ ImpalaBackendConnection coord(
+ ExecEnv::GetInstance()->impalad_client_cache(), coord_address(), &coord_status);
+ if (!coord_status.ok()) {
+ stringstream s;
+ s << "Couldn't get a client for " << coord_address() <<"\tReason: "
+ << coord_status.GetDetail();
+ UpdateStatus(Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, s.str())));
+ return;
+ }
+
+ TReportExecStatusParams params;
+ params.protocol_version = ImpalaInternalServiceVersion::V1;
+ params.__set_query_id(query_state_->query_ctx().query_id);
+ params.__set_fragment_instance_id(instance_ctx_.fragment_instance_id);
+ exec_status.SetTStatus(¶ms);
+ params.__set_done(done);
+
+ if (profile != NULL) {
+ profile->ToThrift(¶ms.profile);
+ params.__isset.profile = true;
+ }
+
+ RuntimeState* runtime_state = executor_.runtime_state();
+ // If executor_ did not successfully prepare, runtime state may not have been set.
+ if (runtime_state != NULL) {
+ // Only send updates to insert status if fragment is finished, the coordinator
+ // waits until query execution is done to use them anyhow.
+ if (done) {
+ TInsertExecStatus insert_status;
+
+ if (runtime_state->hdfs_files_to_move()->size() > 0) {
+ insert_status.__set_files_to_move(*runtime_state->hdfs_files_to_move());
+ }
+ if (runtime_state->per_partition_status()->size() > 0) {
+ insert_status.__set_per_partition_status(*runtime_state->per_partition_status());
+ }
+
+ params.__set_insert_exec_status(insert_status);
+ }
+
+ // Send new errors to coordinator
+ runtime_state->GetUnreportedErrors(&(params.error_log));
+ }
+ params.__isset.error_log = (params.error_log.size() > 0);
+
+ TReportExecStatusResult res;
+ Status rpc_status;
+ bool retry_is_safe;
+ // Try to send the RPC 3 times before failing.
+ for (int i = 0; i < 3; ++i) {
+ rpc_status = coord.DoRpc(
+ &ImpalaBackendClient::ReportExecStatus, params, &res, &retry_is_safe);
+ if (rpc_status.ok()) {
+ rpc_status = Status(res.status);
+ break;
+ }
+ if (!retry_is_safe) break;
+ if (i < 2) SleepForMs(100);
+ }
+ if (!rpc_status.ok()) {
+ UpdateStatus(rpc_status);
+ executor_.Cancel();
+ }
+}
+
+void FragmentInstanceState::PublishFilter(
+ int32_t filter_id, const TBloomFilter& thrift_bloom_filter) {
+ VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
+ << " filter_id=" << filter_id;
+ // Defensively protect against blocking forever in case there's some problem with
+ // Prepare().
+ static const int WAIT_MS = 30000;
+ bool timed_out = false;
+ // Wait until Prepare() is done, so we know that the filter bank is set up.
+ // TODO: get rid of concurrency in the setup phase as part of the per-query exec rpc
+ Status prepare_status = prepare_promise_.Get(WAIT_MS, &timed_out);
+ if (timed_out) {
+ LOG(ERROR) << "Unexpected timeout in PublishFilter()";
+ return;
+ }
+ if (!prepare_status.ok()) return;
+ executor_.runtime_state()->filter_bank()->PublishGlobalFilter(
+ filter_id, thrift_bloom_filter);
+}
+
+const TQueryCtx& FragmentInstanceState::query_ctx() const {
+ return query_state_->query_ctx();
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
new file mode 100644
index 0000000..c938a31
--- /dev/null
+++ b/be/src/runtime/fragment-instance-state.h
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H
+#define IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H
+
+#include "common/status.h"
+#include "util/promise.h"
+#include "runtime/plan-fragment-executor.h"
+
+#include "gen-cpp/ImpalaInternalService_types.h"
+
+namespace impala {
+
+class TPlanFragmentCtx;
+class TPlanFragmentInstanceCtx;
+class TBloomFilter;
+class TUniqueId;
+class TNetworkAddress;
+class QueryState;
+class PlanFragmentExecutor;
+class RuntimeProfile;
+
+/// Collection of state specific to the execution of a particular fragment instance,
+/// as well as manager of the execution of that fragment instance, including
+/// set-up and tear-down.
+/// Tear-down happens automatically in the d'tor and frees all memory allocated for
+/// this fragment instance and closes all data streams.
+///
+/// Aside from Cancel(), which may be called asynchronously, this class is not
+/// thread-safe.
+///
+/// TODO:
+/// - merge PlanFragmentExecutor into this class
+/// - move tear-down logic out of d'tor and into ReleaseResources() function
+/// - as part of the per-query exec rpc, get rid of concurrency during setup
+/// (and remove prepare_promise_)
+/// - move ReportStatusCb() logic into PFE::SendReport() and get rid of the callback
+class FragmentInstanceState {
+ public:
+ FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& desc_tbl);
+
+ /// Frees up all resources allocated in Exec().
+ /// It is an error to delete a FragmentInstanceState before Exec() returns.
+ ~FragmentInstanceState() { }
+
+ /// Main loop of plan fragment execution. Blocks until execution finishes.
+ void Exec();
+
+ /// Returns current execution status, if there was an error. Otherwise cancels
+ /// the fragment and returns OK.
+ Status Cancel();
+
+ /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
+ void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter);
+
+ QueryState* query_state() { return query_state_; }
+ PlanFragmentExecutor* executor() { return &executor_; }
+ const TQueryCtx& query_ctx() const;
+ const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; }
+ const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; }
+ const TUniqueId& query_id() const { return query_ctx().query_id; }
+ const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; }
+ const TNetworkAddress& coord_address() const { return query_ctx().coord_address; }
+
+ private:
+ QueryState* query_state_;
+ const TPlanFragmentCtx fragment_ctx_;
+ const TPlanFragmentInstanceCtx instance_ctx_;
+
+ /// instance-specific descriptor table
+ /// TODO: remove when switching to per-query exec rpc
+ const TDescriptorTable desc_tbl_;
+
+ PlanFragmentExecutor executor_;
+
+ /// protects exec_status_
+ boost::mutex status_lock_;
+
+ /// set in ReportStatusCb();
+ /// if set to anything other than OK, execution has terminated w/ an error
+ Status exec_status_;
+
+ /// Barrier for the completion of executor_.Prepare().
+ Promise<Status> prepare_promise_;
+
+ /// Update 'exec_status_' w/ 'status', if the former is not already an error.
+ /// Returns the value of 'exec_status_' after this method completes.
+ Status UpdateStatus(const Status& status);
+
+ /// Callback for executor; updates exec_status_ if 'status' indicates an error
+ /// or if there was a thrift error.
+ /// If not NULL, `profile` is encoded as a Thrift structure and transmitted as part of
+ /// the reporting RPC. `profile` may be NULL if a runtime profile has not been created
+ /// for this fragment (e.g. when the fragment has failed during preparation).
+ /// The executor must ensure that there is only one invocation at a time.
+ void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done);
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 3657882..464dbce 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -37,7 +37,9 @@
#include "runtime/descriptors.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
+#include "runtime/query-state.h"
#include "runtime/runtime-filter-bank.h"
+#include "runtime/exec-env.h"
#include "util/container-util.h"
#include "runtime/runtime-state.h"
#include "util/cpu-info.h"
@@ -69,9 +71,8 @@ const string EXEC_TIMER_NAME = "ExecTime";
}
PlanFragmentExecutor::PlanFragmentExecutor(
- ExecEnv* exec_env, const ReportStatusCallback& report_status_cb)
- : exec_env_(exec_env),
- exec_tree_(NULL),
+ const ReportStatusCallback& report_status_cb)
+ : exec_tree_(NULL),
report_status_cb_(report_status_cb),
report_thread_active_(false),
closed_(false),
@@ -92,8 +93,10 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
DCHECK(!report_thread_active_);
}
-Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
- Status status = PrepareInternal(request);
+Status PlanFragmentExecutor::Prepare(
+ QueryState* query_state, const TDescriptorTable& desc_tbl,
+ const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) {
+ Status status = PrepareInternal(query_state, desc_tbl, fragment_ctx, instance_ctx);
prepared_promise_.Set(status);
if (!status.ok()) FragmentComplete(status);
return status;
@@ -105,7 +108,9 @@ Status PlanFragmentExecutor::WaitForOpen() {
return opened_promise_.Get();
}
-Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& request) {
+Status PlanFragmentExecutor::PrepareInternal(
+ QueryState* qs, const TDescriptorTable& tdesc_tbl,
+ const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) {
lock_guard<mutex> l(prepare_lock_);
DCHECK(!is_prepared_);
@@ -113,19 +118,17 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
is_prepared_ = true;
// TODO: Break this method up.
- const TPlanFragmentInstanceCtx& fragment_instance_ctx = request.fragment_instance_ctx;
- query_id_ = request.query_ctx.query_id;
+ query_id_ = qs->query_ctx().query_id;
- VLOG_QUERY << "Prepare(): query_id=" << PrintId(query_id_) << " instance_id="
- << PrintId(request.fragment_instance_ctx.fragment_instance_id);
- VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(fragment_instance_ctx);
-
- DCHECK(request.__isset.fragment_ctx);
+ VLOG_QUERY << "Prepare(): instance_id="
+ << PrintId(instance_ctx.fragment_instance_id);
+ VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx);
// Prepare() must not return before runtime_state_ is set if is_prepared_ was
// set. Having runtime_state_.get() != NULL is a postcondition of this method in that
// case. Do not call RETURN_IF_ERROR or explicitly return before this line.
- runtime_state_.reset(new RuntimeState(request, exec_env_));
+ runtime_state_.reset(
+ new RuntimeState(qs, fragment_ctx, instance_ctx, ExecEnv::GetInstance()));
// total_time_counter() is in the runtime_state_ so start it up now.
SCOPED_TIMER(profile()->total_time_counter());
@@ -143,9 +146,8 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
<< PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
}
- DCHECK(!fragment_instance_ctx.request_pool.empty());
- runtime_state_->InitMemTrackers(
- query_id_, &fragment_instance_ctx.request_pool, bytes_limit);
+ DCHECK(!instance_ctx.request_pool.empty());
+ runtime_state_->InitMemTrackers(&instance_ctx.request_pool, bytes_limit);
RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
runtime_state_->InitFilterBank();
@@ -167,27 +169,21 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
// set up desc tbl
DescriptorTbl* desc_tbl = NULL;
- DCHECK(request.__isset.query_ctx);
- DCHECK(request.query_ctx.__isset.desc_tbl);
- RETURN_IF_ERROR(
- DescriptorTbl::Create(obj_pool(), request.query_ctx.desc_tbl, &desc_tbl));
+ RETURN_IF_ERROR(DescriptorTbl::Create(obj_pool(), tdesc_tbl, &desc_tbl));
runtime_state_->set_desc_tbl(desc_tbl);
- VLOG_QUERY << "descriptor table for fragment="
- << request.fragment_instance_ctx.fragment_instance_id
+ VLOG_QUERY << "descriptor table for fragment=" << instance_ctx.fragment_instance_id
<< "\n" << desc_tbl->DebugString();
// set up plan
- DCHECK(request.__isset.fragment_ctx);
RETURN_IF_ERROR(ExecNode::CreateTree(
- runtime_state_.get(), request.fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_));
+ runtime_state_.get(), fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_));
runtime_state_->set_fragment_root_id(exec_tree_->id());
- if (fragment_instance_ctx.__isset.debug_node_id) {
- DCHECK(fragment_instance_ctx.__isset.debug_action);
- DCHECK(fragment_instance_ctx.__isset.debug_phase);
- ExecNode::SetDebugOptions(fragment_instance_ctx.debug_node_id,
- fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action,
- exec_tree_);
+ if (instance_ctx.__isset.debug_node_id) {
+ DCHECK(instance_ctx.__isset.debug_action);
+ DCHECK(instance_ctx.__isset.debug_phase);
+ ExecNode::SetDebugOptions(instance_ctx.debug_node_id, instance_ctx.debug_phase,
+ instance_ctx.debug_action, exec_tree_);
}
// set #senders of exchange nodes before calling Prepare()
@@ -195,8 +191,8 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
for (ExecNode* exch_node : exch_nodes) {
DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
- int num_senders = FindWithDefault(fragment_instance_ctx.per_exch_num_senders,
- exch_node->id(), 0);
+ int num_senders =
+ FindWithDefault(instance_ctx.per_exch_num_senders, exch_node->id(), 0);
DCHECK_GT(num_senders, 0);
static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
}
@@ -208,7 +204,7 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
for (int i = 0; i < scan_nodes.size(); ++i) {
ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
- fragment_instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
+ instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
scan_node->SetScanRanges(scan_ranges);
}
@@ -220,13 +216,13 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
RETURN_IF_ERROR(exec_tree_->Prepare(state));
}
- PrintVolumeIds(fragment_instance_ctx.per_node_scan_ranges);
+ PrintVolumeIds(instance_ctx.per_node_scan_ranges);
- DCHECK(request.fragment_ctx.fragment.__isset.output_sink);
+ DCHECK(fragment_ctx.fragment.__isset.output_sink);
RETURN_IF_ERROR(
- DataSink::CreateDataSink(obj_pool(), request.fragment_ctx.fragment.output_sink,
- request.fragment_ctx.fragment.output_exprs, fragment_instance_ctx,
- exec_tree_->row_desc(), &sink_));
+ DataSink::CreateDataSink(obj_pool(), fragment_ctx.fragment.output_sink,
+ fragment_ctx.fragment.output_exprs, instance_ctx, exec_tree_->row_desc(),
+ &sink_));
RETURN_IF_ERROR(
sink_->Prepare(runtime_state(), runtime_state_->instance_mem_tracker()));
@@ -235,7 +231,7 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
profile()->AddChild(sink_profile);
}
- if (request.fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
+ if (fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
root_sink_ = reinterpret_cast<PlanRootSink*>(sink_.get());
// Release the thread token on the root fragment instance. This fragment spends most
// of the time waiting and doing very little work. Holding on to the token causes
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index f93dab4..2194e58 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -84,7 +84,7 @@ class PlanFragmentExecutor {
/// report_status_cb, if !empty(), is used to report the accumulated profile
/// information periodically during execution.
- PlanFragmentExecutor(ExecEnv* exec_env, const ReportStatusCallback& report_status_cb);
+ PlanFragmentExecutor(const ReportStatusCallback& report_status_cb);
/// It is an error to delete a PlanFragmentExecutor with a report callback before Exec()
/// indicated that execution is finished, or to delete one that has not been Close()'d
@@ -103,11 +103,16 @@ class PlanFragmentExecutor {
/// Status::CANCELLED;
///
/// If Prepare() fails, it will invoke final status callback with the error status.
- Status Prepare(const TExecPlanFragmentParams& request);
+ /// TODO: remove desc_tbl parameter once we do a per-query exec rpc (and we
+ /// have a single descriptor table to cover all fragment instances); at the moment
+ /// we need to pass the TDescriptorTable explicitly
+ Status Prepare(QueryState* query_state, const TDescriptorTable& desc_tbl,
+ const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx);
/// Opens the fragment plan and sink. Starts the profile reporting thread, if
/// required. Can be called only if Prepare() succeeded. If Open() fails it will
/// invoke the final status callback with the error status.
+ /// TODO: is this needed? It's only ever called in conjunction with Exec() and Close()
Status Open();
/// Executes the fragment by repeatedly driving the sink with batches produced by the
@@ -153,7 +158,6 @@ class PlanFragmentExecutor {
static const std::string PER_HOST_PEAK_MEM_COUNTER;
private:
- ExecEnv* exec_env_; // not owned
ExecNode* exec_tree_; // lives in runtime_state_->obj_pool()
TUniqueId query_id_;
@@ -278,7 +282,10 @@ class PlanFragmentExecutor {
Status ExecInternal();
/// Performs all the logic of Prepare() and returns resulting status.
- Status PrepareInternal(const TExecPlanFragmentParams& request);
+ /// TODO: remove desc_tbl parameter as part of per-query exec rpc
+ Status PrepareInternal(QueryState* qs, const TDescriptorTable& desc_tbl,
+ const TPlanFragmentCtx& fragment_ctx,
+ const TPlanFragmentInstanceCtx& instance_ctx);
/// Releases the thread token for this fragment executor.
void ReleaseThreadToken();
@@ -288,7 +295,6 @@ class PlanFragmentExecutor {
void StopReportThread();
/// Print stats about scan ranges for each volumeId in params to info log.
- void PrintVolumeIds(const TPlanExecParams& params);
void PrintVolumeIds(const PerNodeScanRanges& per_node_scan_ranges);
const DescriptorTbl& desc_tbl() { return runtime_state_->desc_tbl(); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
new file mode 100644
index 0000000..4a3742c
--- /dev/null
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include "runtime/query-exec-mgr.h"
+
+#include <gperftools/malloc_extension.h>
+#include <gutil/strings/substitute.h>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+
+#include "common/logging.h"
+#include "runtime/query-state.h"
+#include "runtime/fragment-instance-state.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "util/uid-util.h"
+#include "util/thread.h"
+#include "util/impalad-metrics.h"
+#include "util/debug-util.h"
+
+using boost::lock_guard;
+using namespace impala;
+
+// TODO: this logging should go into a per query log.
+DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
+ "every log_mem_usage_interval'th fragment completion.");
+
+Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) {
+ TUniqueId instance_id = params.fragment_instance_ctx.fragment_instance_id;
+ VLOG_QUERY << "StartFInstance() instance_id=" << PrintId(instance_id)
+ << " coord=" << params.query_ctx.coord_address;
+
+ // Starting a new fragment instance creates a thread and consumes a non-trivial
+ // amount of memory. If we are already starved for memory, cancel the instance as
+ // early as possible to avoid digging the hole deeper.
+ MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
+ if (process_mem_tracker->LimitExceeded()) {
+ string msg = Substitute("Instance $0 of plan fragment $1 could not "
+ "start because the backend Impala daemon is over its memory limit",
+ PrintId(instance_id), params.fragment_ctx.fragment.display_name);
+ return process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
+ }
+
+ QueryState* qs = nullptr;
+ int refcnt;
+ {
+ lock_guard<mutex> l(qs_map_lock_);
+ TUniqueId query_id = params.query_ctx.query_id;
+ auto it = qs_map_.find(query_id);
+ if (it == qs_map_.end()) {
+ // register new QueryState
+ qs = new QueryState(params.query_ctx);
+ qs_map_.insert(make_pair(query_id, qs));
+ VLOG_QUERY << "new QueryState: query_id=" << query_id;
+ } else {
+ qs = it->second;
+ }
+ // decremented at the end of ExecFInstance()
+ refcnt = qs->refcnt_.Add(1);
+ }
+ DCHECK(qs != nullptr && qs->refcnt_.Load() > 0);
+ VLOG_QUERY << "QueryState: query_id=" << params.query_ctx.query_id
+ << " refcnt=" << refcnt;
+
+ DCHECK(params.__isset.fragment_ctx);
+ DCHECK(params.__isset.fragment_instance_ctx);
+ FragmentInstanceState* fis = qs->obj_pool()->Add(
+ new FragmentInstanceState(qs, params.fragment_ctx, params.fragment_instance_ctx,
+ params.query_ctx.desc_tbl));
+ // register instance before returning so that async Cancel() calls can
+ // find the instance
+ qs->RegisterFInstance(fis);
+ // start new thread to execute instance
+ Thread t("query-exec-mgr",
+ Substitute("exec-fragment-instance-$0", PrintId(instance_id)),
+ &QueryExecMgr::ExecFInstance, this, fis);
+ t.Detach();
+
+ ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
+ ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
+ return Status::OK();
+}
+
+void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) {
+ fis->Exec();
+
+ ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
+ VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id());
+
+#ifndef ADDRESS_SANITIZER
+ // tcmalloc and address sanitizer can not be used together
+ if (FLAGS_log_mem_usage_interval > 0) {
+ uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
+ if (num_complete % FLAGS_log_mem_usage_interval == 0) {
+ char buf[2048];
+ // This outputs how much memory is currently being used by this impalad
+ MallocExtension::instance()->GetStats(buf, 2048);
+ LOG(INFO) << buf;
+ }
+ }
+#endif
+
+ // decrement refcount taken in StartFInstance()
+ ReleaseQueryState(fis->query_state());
+}
+
+QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
+ VLOG_QUERY << "GetQueryState(): query_id=" << PrintId(query_id);
+ lock_guard<mutex> l(qs_map_lock_);
+ auto it = qs_map_.find(query_id);
+ if (it == qs_map_.end()) return nullptr;
+ QueryState* qs = it->second;
+ int32_t cnt = qs->refcnt_.Add(1);
+ DCHECK_GT(cnt, 0);
+ return qs;
+}
+
+void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
+ DCHECK(qs != nullptr);
+ TUniqueId query_id;
+ {
+ int32_t cnt = qs->refcnt_.Add(-1);
+ VLOG_QUERY << "ReleaseQueryState(): query_id=" << PrintId(qs->query_id())
+ << " refcnt=" << cnt + 1;
+ DCHECK_GE(cnt, 0);
+ if (cnt > 0) return;
+ // don't reference anything from 'qs' beyond this point, 'qs' might get
+ // gc'd out from under us
+ query_id = qs->query_id();
+ }
+
+ {
+ // for now, gc right away
+ lock_guard<mutex> l(qs_map_lock_);
+ auto it = qs_map_.find(query_id);
+ // someone else might have gc'd the entry
+ if (it == qs_map_.end()) return;
+ qs = it->second;
+ DCHECK_EQ(qs->query_ctx().query_id, query_id);
+ int32_t cnt = qs->refcnt_.Load();
+ DCHECK_GE(cnt, 0);
+ // someone else might have increased the refcnt in the meantime
+ if (cnt > 0) return;
+ size_t num_erased = qs_map_.erase(qs->query_ctx().query_id);
+ DCHECK_EQ(num_erased, 1);
+ }
+ // TODO: send final status report during gc, but do this from a different thread
+ delete qs;
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
new file mode 100644
index 0000000..7b3fb84
--- /dev/null
+++ b/be/src/runtime/query-exec-mgr.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_QUERY_EXEC_MGR_H
+#define IMPALA_RUNTIME_QUERY_EXEC_MGR_H
+
+#include <boost/thread/mutex.hpp>
+#include <unordered_map>
+
+#include "common/status.h"
+#include "util/uid-util.h"
+#include "gen-cpp/Types_types.h"
+
+namespace impala {
+
+class QueryState;
+class Thread;
+class TExecPlanFragmentParams;
+class TUniqueId;
+class FragmentInstanceState;
+
+/// A daemon-wide registry and manager of QueryStates. This is the central
+/// entry point for gaining refcounted access to a QueryState. It also initiates
+/// fragment instance execution.
+/// Thread-safe.
+///
+/// TODO: as part of Impala-2550 (per-query exec rpc)
+/// replace Start-/CancelFInstance() with StartQuery()/CancelQuery()
+class QueryExecMgr {
+ public:
+ /// Initiates execution of this fragment instance in a newly created thread.
+ /// Also creates a QueryState for this query, if none exists.
+ /// In both cases it increases the refcount prior to instance execution and decreases
+ /// it after execution finishes.
+ ///
+ /// Returns an error if there was some unrecoverable problem before the fragment
+ /// was started (like low memory). In that case, no QueryState is created or has its
+ /// refcount incremented. After this call returns, it is legal to call
+ /// FragmentInstanceState::Cancel() on this fragment instance, regardless of the
+ /// return value of this function.
+ Status StartFInstance(const TExecPlanFragmentParams& params);
+
+ /// If a QueryState for the given query exists, increments that refcount and returns
+ /// the QueryState, otherwise returns nullptr.
+ QueryState* GetQueryState(const TUniqueId& query_id);
+
+ /// Decrements the refcount for the given QueryState.
+ void ReleaseQueryState(QueryState* qs);
+
+ private:
+ /// protects qs_map_
+ boost::mutex qs_map_lock_;
+
+ /// map from query id to QueryState (owned by us)
+ std::unordered_map<TUniqueId, QueryState*> qs_map_;
+
+ /// Execute instance.
+ void ExecFInstance(FragmentInstanceState* fis);
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
new file mode 100644
index 0000000..2757750
--- /dev/null
+++ b/be/src/runtime/query-state.cc
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include "runtime/query-state.h"
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+
+#include "runtime/exec-env.h"
+#include "runtime/fragment-instance-state.h"
+#include "runtime/query-exec-mgr.h"
+
+using namespace impala;
+
+QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
+ DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
+ query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
+}
+
+QueryState::ScopedRef::~ScopedRef() {
+ if (query_state_ == nullptr) return;
+ ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
+}
+
+QueryState::QueryState(const TQueryCtx& query_ctx)
+ : query_ctx_(query_ctx),
+ refcnt_(0) {
+ TQueryOptions& query_options = query_ctx_.client_request.query_options;
+ // max_errors does not indicate how many errors in total have been recorded, but rather
+ // how many are distinct. It is defined as the sum of the number of generic errors and
+ // the number of distinct other errors.
+ if (query_options.max_errors <= 0) {
+ // TODO: fix linker error and uncomment this
+ //query_options_.max_errors = FLAGS_max_errors;
+ query_options.max_errors = 100;
+ }
+ if (query_options.batch_size <= 0) {
+ query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
+ }
+}
+
+void QueryState::RegisterFInstance(FragmentInstanceState* fis) {
+ VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id());
+ lock_guard<mutex> l(fis_map_lock_);
+ DCHECK_EQ(fis_map_.count(fis->instance_id()), 0);
+ fis_map_.insert(make_pair(fis->instance_id(), fis));
+}
+
+FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
+ VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
+ lock_guard<mutex> l(fis_map_lock_);
+ auto it = fis_map_.find(instance_id);
+ return it != fis_map_.end() ? it->second : nullptr;
+}