You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:56:01 UTC
[07/13] incubator-impala git commit: IMPALA-2550: Switch to per-query
exec rpc
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
deleted file mode 100644
index df5b0d2..0000000
--- a/be/src/runtime/plan-fragment-executor.cc
+++ /dev/null
@@ -1,518 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/plan-fragment-executor.h"
-
-#include <thrift/protocol/TDebugProtocol.h>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
-#include <boost/unordered_map.hpp>
-#include <gutil/strings/substitute.h>
-
-#include "codegen/llvm-codegen.h"
-#include "common/logging.h"
-#include "common/object-pool.h"
-#include "exec/data-sink.h"
-#include "exec/exchange-node.h"
-#include "exec/exec-node.h"
-#include "exec/hbase-table-scanner.h"
-#include "exec/hdfs-scan-node.h"
-#include "exec/plan-root-sink.h"
-#include "exec/scan-node.h"
-#include "exprs/expr.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/descriptors.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/query-state.h"
-#include "runtime/runtime-filter-bank.h"
-#include "runtime/exec-env.h"
-#include "util/container-util.h"
-#include "runtime/runtime-state.h"
-#include "util/cpu-info.h"
-#include "util/debug-util.h"
-#include "util/mem-info.h"
-#include "util/parse-util.h"
-#include "util/periodic-counter-updater.h"
-#include "util/pretty-printer.h"
-
-DEFINE_bool(serialize_batch, false, "serialize and deserialize each returned row batch");
-DEFINE_int32(status_report_interval, 5, "interval between profile reports; in seconds");
-
-#include "common/names.h"
-
-namespace posix_time = boost::posix_time;
-using boost::get_system_time;
-using boost::system_time;
-using namespace apache::thrift;
-using namespace strings;
-
-namespace impala {
-
-const string PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage";
-
-namespace {
-const string OPEN_TIMER_NAME = "OpenTime";
-const string PREPARE_TIMER_NAME = "PrepareTime";
-const string EXEC_TIMER_NAME = "ExecTime";
-}
-
-PlanFragmentExecutor::PlanFragmentExecutor(
- const ReportStatusCallback& report_status_cb)
- : exec_tree_(NULL),
- report_status_cb_(report_status_cb),
- report_thread_active_(false),
- closed_(false),
- has_thread_token_(false),
- timings_profile_(NULL),
- root_sink_(NULL),
- is_prepared_(false),
- is_cancelled_(false),
- per_host_mem_usage_(NULL),
- rows_produced_counter_(NULL),
- average_thread_tokens_(NULL),
- mem_usage_sampled_counter_(NULL),
- thread_usage_sampled_counter_(NULL) {}
-
-PlanFragmentExecutor::~PlanFragmentExecutor() {
- DCHECK(!is_prepared_ || closed_);
- // at this point, the report thread should have been stopped
- DCHECK(!report_thread_active_);
-}
-
-Status PlanFragmentExecutor::Prepare(
- QueryState* query_state, const TDescriptorTable& desc_tbl,
- const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) {
- Status status = PrepareInternal(query_state, desc_tbl, fragment_ctx, instance_ctx);
- prepared_promise_.Set(status);
- if (!status.ok()) FragmentComplete(status);
- return status;
-}
-
-Status PlanFragmentExecutor::WaitForOpen() {
- DCHECK(prepared_promise_.IsSet()) << "Prepare() must complete before WaitForOpen()";
- RETURN_IF_ERROR(prepared_promise_.Get());
- return opened_promise_.Get();
-}
-
-Status PlanFragmentExecutor::PrepareInternal(
- QueryState* qs, const TDescriptorTable& tdesc_tbl,
- const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) {
- lock_guard<mutex> l(prepare_lock_);
- DCHECK(!is_prepared_);
-
- if (is_cancelled_) return Status::CANCELLED;
- is_prepared_ = true;
-
- // TODO: Break this method up.
- query_id_ = qs->query_ctx().query_id;
-
- VLOG_QUERY << "Prepare(): instance_id="
- << PrintId(instance_ctx.fragment_instance_id);
- VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx);
-
- // Prepare() must not return before runtime_state_ is set if is_prepared_ was
- // set. Having runtime_state_.get() != NULL is a postcondition of this method in that
- // case. Do not call RETURN_IF_ERROR or explicitly return before this line.
- runtime_state_.reset(
- new RuntimeState(qs, fragment_ctx, instance_ctx, ExecEnv::GetInstance()));
-
- // total_time_counter() is in the runtime_state_ so start it up now.
- SCOPED_TIMER(profile()->total_time_counter());
- timings_profile_ = obj_pool()->Add(
- new RuntimeProfile(obj_pool(), "Fragment Instance Lifecycle Timings"));
- profile()->AddChild(timings_profile_);
- SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
-
- // reservation or a query option.
- int64_t bytes_limit = -1;
- if (runtime_state_->query_options().__isset.mem_limit &&
- runtime_state_->query_options().mem_limit > 0) {
- bytes_limit = runtime_state_->query_options().mem_limit;
- VLOG_QUERY << "Using query memory limit from query options: "
- << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
- }
-
- DCHECK(!instance_ctx.request_pool.empty());
- RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
- runtime_state_->InitFilterBank();
-
- // Reserve one main thread from the pool
- runtime_state_->resource_pool()->AcquireThreadToken();
- has_thread_token_ = true;
-
- average_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
- bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
- runtime_state_->resource_pool()));
- mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage",
- TUnit::BYTES,
- bind<int64_t>(mem_fn(&MemTracker::consumption),
- runtime_state_->instance_mem_tracker()));
- thread_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("ThreadUsage",
- TUnit::UNIT,
- bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
- runtime_state_->resource_pool()));
-
- // set up desc tbl
- DescriptorTbl* desc_tbl = NULL;
- RETURN_IF_ERROR(DescriptorTbl::Create(obj_pool(), tdesc_tbl, &desc_tbl));
- runtime_state_->set_desc_tbl(desc_tbl);
- VLOG_QUERY << "descriptor table for fragment=" << instance_ctx.fragment_instance_id
- << "\n" << desc_tbl->DebugString();
-
- // set up plan
- RETURN_IF_ERROR(ExecNode::CreateTree(
- runtime_state_.get(), fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_));
- runtime_state_->set_fragment_root_id(exec_tree_->id());
-
- if (instance_ctx.__isset.debug_node_id) {
- DCHECK(instance_ctx.__isset.debug_action);
- DCHECK(instance_ctx.__isset.debug_phase);
- ExecNode::SetDebugOptions(instance_ctx.debug_node_id, instance_ctx.debug_phase,
- instance_ctx.debug_action, exec_tree_);
- }
-
- // set #senders of exchange nodes before calling Prepare()
- vector<ExecNode*> exch_nodes;
- exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
- for (ExecNode* exch_node : exch_nodes) {
- DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
- int num_senders =
- FindWithDefault(instance_ctx.per_exch_num_senders, exch_node->id(), 0);
- DCHECK_GT(num_senders, 0);
- static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
- }
-
- // set scan ranges
- vector<ExecNode*> scan_nodes;
- vector<TScanRangeParams> no_scan_ranges;
- exec_tree_->CollectScanNodes(&scan_nodes);
- for (int i = 0; i < scan_nodes.size(); ++i) {
- ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
- const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
- instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
- scan_node->SetScanRanges(scan_ranges);
- }
-
- RuntimeState* state = runtime_state();
- RuntimeProfile::Counter* prepare_timer =
- ADD_CHILD_TIMER(timings_profile_, "ExecTreePrepareTime", PREPARE_TIMER_NAME);
- {
- SCOPED_TIMER(prepare_timer);
- RETURN_IF_ERROR(exec_tree_->Prepare(state));
- }
-
- PrintVolumeIds(instance_ctx.per_node_scan_ranges);
-
- DCHECK(fragment_ctx.fragment.__isset.output_sink);
- RETURN_IF_ERROR(
- DataSink::CreateDataSink(obj_pool(), fragment_ctx.fragment.output_sink,
- fragment_ctx.fragment.output_exprs, instance_ctx, exec_tree_->row_desc(),
- &sink_));
- RETURN_IF_ERROR(
- sink_->Prepare(runtime_state(), runtime_state_->instance_mem_tracker()));
-
- RuntimeProfile* sink_profile = sink_->profile();
- if (sink_profile != NULL) {
- profile()->AddChild(sink_profile);
- }
-
- if (fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
- root_sink_ = reinterpret_cast<PlanRootSink*>(sink_.get());
- // Release the thread token on the root fragment instance. This fragment spends most
- // of the time waiting and doing very little work. Holding on to the token causes
- // underutilization of the machine. If there are 12 queries on this node, that's 12
- // tokens reserved for no reason.
- ReleaseThreadToken();
- }
-
- if (state->ShouldCodegen()) {
- RETURN_IF_ERROR(state->CreateCodegen());
- exec_tree_->Codegen(state);
- // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
- // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
- // the error status for now.
- RETURN_IF_ERROR(state->CodegenScalarFns());
- }
-
- // set up profile counters
- profile()->AddChild(exec_tree_->runtime_profile());
- rows_produced_counter_ =
- ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
- per_host_mem_usage_ =
- ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES);
-
- row_batch_.reset(new RowBatch(exec_tree_->row_desc(), state->batch_size(),
- state->instance_mem_tracker()));
- VLOG(2) << "plan_root=\n" << exec_tree_->DebugString();
- return Status::OK();
-}
-
-Status PlanFragmentExecutor::OptimizeLlvmModule() {
- if (!runtime_state_->ShouldCodegen()) return Status::OK();
- LlvmCodeGen* codegen = runtime_state_->codegen();
- DCHECK(codegen != NULL);
- return codegen->FinalizeModule();
-}
-
-void PlanFragmentExecutor::PrintVolumeIds(
- const PerNodeScanRanges& per_node_scan_ranges) {
- if (per_node_scan_ranges.empty()) return;
-
- HdfsScanNode::PerVolumnStats per_volume_stats;
- for (const PerNodeScanRanges::value_type& entry: per_node_scan_ranges) {
- HdfsScanNode::UpdateHdfsSplitStats(entry.second, &per_volume_stats);
- }
-
- stringstream str;
-
- HdfsScanNode::PrintHdfsSplitStats(per_volume_stats, &str);
- profile()->AddInfoString(HdfsScanNode::HDFS_SPLIT_STATS_DESC, str.str());
- VLOG_FILE
- << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query="
- << query_id_ << ":\n" << str.str();
-}
-
-Status PlanFragmentExecutor::Open() {
- DCHECK(prepared_promise_.IsSet() && prepared_promise_.Get().ok());
- Status status;
- {
- SCOPED_TIMER(profile()->total_time_counter());
- SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
- VLOG_QUERY << "Open(): instance_id=" << runtime_state_->fragment_instance_id();
- status = OpenInternal();
- }
- if (!status.ok()) FragmentComplete(status);
- opened_promise_.Set(status);
- return status;
-}
-
-Status PlanFragmentExecutor::OpenInternal() {
- SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
- RETURN_IF_ERROR(
- runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
-
- // We need to start the profile-reporting thread before calling exec_tree_->Open(),
- // since it may block.
- if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
- unique_lock<mutex> l(report_thread_lock_);
- report_thread_.reset(
- new Thread("plan-fragment-executor", "report-profile",
- &PlanFragmentExecutor::ReportProfileThread, this));
- // Make sure the thread started up, otherwise ReportProfileThread() might get into
- // a race with StopReportThread().
- while (!report_thread_active_) report_thread_started_cv_.wait(l);
- }
-
- RETURN_IF_ERROR(OptimizeLlvmModule());
-
- {
- SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", OPEN_TIMER_NAME));
- RETURN_IF_ERROR(exec_tree_->Open(runtime_state_.get()));
- }
- return sink_->Open(runtime_state_.get());
-}
-
-Status PlanFragmentExecutor::Exec() {
- DCHECK(opened_promise_.IsSet() && opened_promise_.Get().ok());
- Status status;
- {
- // Must go out of scope before FragmentComplete(), otherwise counter will not be
- // updated by time final profile is sent.
- SCOPED_TIMER(profile()->total_time_counter());
- SCOPED_TIMER(ADD_TIMER(timings_profile_, EXEC_TIMER_NAME));
- status = ExecInternal();
- }
- FragmentComplete(status);
- return status;
-}
-
-Status PlanFragmentExecutor::ExecInternal() {
- RuntimeProfile::Counter* plan_exec_timer =
- ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
- SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
- bool exec_tree_complete = false;
- do {
- Status status;
- row_batch_->Reset();
- {
- SCOPED_TIMER(plan_exec_timer);
- status = exec_tree_->GetNext(
- runtime_state_.get(), row_batch_.get(), &exec_tree_complete);
- }
- if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::ExecInternal()");
- COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
- RETURN_IF_ERROR(status);
- RETURN_IF_ERROR(sink_->Send(runtime_state(), row_batch_.get()));
- } while (!exec_tree_complete);
-
- // Flush the sink *before* stopping the report thread. Flush may need to add some
- // important information to the last report that gets sent. (e.g. table sinks record the
- // files they have written to in this method)
- RETURN_IF_ERROR(sink_->FlushFinal(runtime_state()));
- return Status::OK();
-}
-
-void PlanFragmentExecutor::ReportProfileThread() {
- VLOG_FILE << "ReportProfileThread(): instance_id="
- << runtime_state_->fragment_instance_id();
- DCHECK(!report_status_cb_.empty());
- unique_lock<mutex> l(report_thread_lock_);
- // tell Open() that we started
- report_thread_active_ = true;
- report_thread_started_cv_.notify_one();
-
- // Jitter the reporting time of remote fragments by a random amount between
- // 0 and the report_interval. This way, the coordinator doesn't get all the
- // updates at once so its better for contention as well as smoother progress
- // reporting.
- int report_fragment_offset = rand() % FLAGS_status_report_interval;
- system_time timeout = get_system_time()
- + posix_time::seconds(report_fragment_offset);
- // We don't want to wait longer than it takes to run the entire fragment.
- stop_report_thread_cv_.timed_wait(l, timeout);
-
- while (report_thread_active_) {
- system_time timeout = get_system_time()
- + posix_time::seconds(FLAGS_status_report_interval);
-
- // timed_wait can return because the timeout occurred or the condition variable
- // was signaled. We can't rely on its return value to distinguish between the
- // two cases (e.g. there is a race here where the wait timed out but before grabbing
- // the lock, the condition variable was signaled). Instead, we will use an external
- // flag, report_thread_active_, to coordinate this.
- stop_report_thread_cv_.timed_wait(l, timeout);
-
- if (VLOG_FILE_IS_ON) {
- VLOG_FILE << "Reporting " << (!report_thread_active_ ? "final " : " ")
- << "profile for instance " << runtime_state_->fragment_instance_id();
- stringstream ss;
- profile()->PrettyPrint(&ss);
- VLOG_FILE << ss.str();
- }
-
- if (!report_thread_active_) break;
- SendReport(false, Status::OK());
- }
-
- VLOG_FILE << "exiting reporting thread: instance_id="
- << runtime_state_->fragment_instance_id();
-}
-
-void PlanFragmentExecutor::SendReport(bool done, const Status& status) {
- DCHECK(status.ok() || done);
- if (report_status_cb_.empty()) return;
-
- // Update the counter for the peak per host mem usage.
- if (per_host_mem_usage_ != nullptr) {
- per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
- }
-
- // This will send a report even if we are cancelled. If the query completed correctly
- // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
- // be waiting for a final report and profile.
- RuntimeProfile* prof = is_prepared_ ? profile() : nullptr;
- report_status_cb_(status, prof, done);
-}
-
-void PlanFragmentExecutor::StopReportThread() {
- if (!report_thread_active_) return;
- {
- lock_guard<mutex> l(report_thread_lock_);
- report_thread_active_ = false;
- }
- stop_report_thread_cv_.notify_one();
- report_thread_->Join();
-}
-
-void PlanFragmentExecutor::FragmentComplete(const Status& status) {
- ReleaseThreadToken();
- StopReportThread();
- // It's safe to send final report now that the reporting thread is stopped.
- SendReport(true, status);
-}
-
-void PlanFragmentExecutor::Cancel() {
- VLOG_QUERY << "Cancelling fragment instance...";
- lock_guard<mutex> l(prepare_lock_);
- is_cancelled_ = true;
- if (!is_prepared_) {
- VLOG_QUERY << "Cancel() called before Prepare()";
- return;
- }
-
- // Ensure that the sink is closed from both sides. Although in ordinary executions we
- // rely on the consumer to do this, in error cases the consumer may not be able to send
- // CloseConsumer() (see IMPALA-4348 for an example).
- if (root_sink_ != nullptr) root_sink_->CloseConsumer();
-
- DCHECK(runtime_state_ != NULL);
- VLOG_QUERY << "Cancel(): instance_id=" << runtime_state_->fragment_instance_id();
- runtime_state_->set_is_cancelled(true);
- runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
-}
-
-RuntimeProfile* PlanFragmentExecutor::profile() {
- return runtime_state_->runtime_profile();
-}
-
-void PlanFragmentExecutor::ReleaseThreadToken() {
- if (has_thread_token_) {
- has_thread_token_ = false;
- runtime_state_->resource_pool()->ReleaseThreadToken(true);
- PeriodicCounterUpdater::StopSamplingCounter(average_thread_tokens_);
- PeriodicCounterUpdater::StopTimeSeriesCounter(
- thread_usage_sampled_counter_);
- }
-}
-
-void PlanFragmentExecutor::Close() {
- DCHECK(!has_thread_token_);
- DCHECK(!report_thread_active_);
-
- if (closed_) return;
- if (!is_prepared_) return;
- if (sink_.get() != nullptr) sink_->Close(runtime_state());
-
- row_batch_.reset();
-
- // Prepare should always have been called, and so runtime_state_ should be set
- DCHECK(prepared_promise_.IsSet());
- if (exec_tree_ != NULL) exec_tree_->Close(runtime_state_.get());
- if (mem_usage_sampled_counter_ != NULL) {
- // This counter references runtime_state_->instance_mem_tracker() so must be
- // stopped before calling ReleaseResources().
- PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
- mem_usage_sampled_counter_ = NULL;
- }
- // Sanity timer checks
-#ifndef NDEBUG
- int64_t total_time = profile()->total_time_counter()->value();
- int64_t other_time = 0;
- for (auto& name: {PREPARE_TIMER_NAME, OPEN_TIMER_NAME, EXEC_TIMER_NAME}) {
- RuntimeProfile::Counter* counter = timings_profile_->GetCounter(name);
- if (counter != nullptr) other_time += counter->value();
- }
- // TODO: IMPALA-4631: Occasionally we see other_time = total_time + 1 for some reason
- // we don't yet understand, so add 1 to total_time to avoid DCHECKing in that case.
- DCHECK_LE(other_time, total_time + 1);
-#endif
- runtime_state_->ReleaseResources();
-
- closed_ = true;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
deleted file mode 100644
index 2194e58..0000000
--- a/be/src/runtime/plan-fragment-executor.h
+++ /dev/null
@@ -1,305 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_SERVICE_PLAN_EXECUTOR_H
-#define IMPALA_SERVICE_PLAN_EXECUTOR_H
-
-#include <vector>
-#include <boost/scoped_ptr.hpp>
-#include <boost/function.hpp>
-
-#include "common/object-pool.h"
-#include "common/status.h"
-#include "runtime/runtime-state.h"
-#include "util/promise.h"
-#include "util/runtime-profile-counters.h"
-#include "util/thread.h"
-
-namespace impala {
-
-class HdfsFsCache;
-class ExecNode;
-class PlanRootSink;
-class RowDescriptor;
-class RowBatch;
-class DataSink;
-class DataStreamMgr;
-class RuntimeProfile;
-class RuntimeState;
-class TRowBatch;
-class TPlanExecRequest;
-class TPlanFragment;
-class TPlanExecParams;
-
-/// PlanFragmentExecutor handles all aspects of the execution of a single plan fragment,
-/// including setup and tear-down, both in the success and error case. Tear-down, which
-/// happens in Close(), frees all memory allocated for this plan fragment and closes all
-/// data streams.
-///
-/// The lifecycle of a PlanFragmentExecutor is as follows:
-/// if (Prepare().ok()) {
-/// Open()
-/// Exec()
-/// }
-/// Close()
-///
-/// The executor makes an aggregated profile for the entire fragment available, which
-/// includes profile information for the plan itself as well as the output sink.
-///
-/// The ReportStatusCallback passed into the c'tor is invoked periodically to report the
-/// execution profile. The frequency of those reports is controlled by the flag
-/// status_report_interval; setting that flag to 0 disables periodic reporting altogether
-/// Regardless of the value of that flag, if a report callback is specified, it is invoked
-/// at least once at the end of execution with an overall status and profile (and 'done'
-/// indicator).
-///
-/// Aside from Cancel(), which may be called asynchronously, this class is not
-/// thread-safe.
-class PlanFragmentExecutor {
- public:
- /// Callback to report execution status of plan fragment.
- /// 'profile' is the cumulative profile, 'done' indicates whether the execution
- /// is done or still continuing.
- /// Note: this does not take a const RuntimeProfile&, because it might need to call
- /// functions like PrettyPrint() or ToThrift(), neither of which is const
- /// because they take locks.
- typedef boost::function<
- void (const Status& status, RuntimeProfile* profile, bool done)>
- ReportStatusCallback;
-
- /// report_status_cb, if !empty(), is used to report the accumulated profile
- /// information periodically during execution.
- PlanFragmentExecutor(const ReportStatusCallback& report_status_cb);
-
- /// It is an error to delete a PlanFragmentExecutor with a report callback before Exec()
- /// indicated that execution is finished, or to delete one that has not been Close()'d
- /// if Prepare() has been called.
- ~PlanFragmentExecutor();
-
- /// Prepare for execution. Call this prior to Open().
- ///
- /// runtime_state() will not be valid until Prepare() is called. runtime_state() will
- /// always be valid after Prepare() returns, unless the query was cancelled before
- /// Prepare() was called. If request.query_options.mem_limit > 0, it is used as an
- /// approximate limit on the number of bytes this query can consume at runtime. The
- /// query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
- ///
- /// If Cancel() is called before Prepare(), Prepare() is a no-op and returns
- /// Status::CANCELLED;
- ///
- /// If Prepare() fails, it will invoke final status callback with the error status.
- /// TODO: remove desc_tbl parameter once we do a per-query exec rpc (and we
- /// have a single descriptor table to cover all fragment instances); at the moment
- /// we need to pass the TDescriptorTable explicitly
- Status Prepare(QueryState* query_state, const TDescriptorTable& desc_tbl,
- const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx);
-
- /// Opens the fragment plan and sink. Starts the profile reporting thread, if
- /// required. Can be called only if Prepare() succeeded. If Open() fails it will
- /// invoke the final status callback with the error status.
- /// TODO: is this needed? It's only ever called in conjunction with Exec() and Close()
- Status Open();
-
- /// Executes the fragment by repeatedly driving the sink with batches produced by the
- /// exec node tree. report_status_cb will have been called for the final time when
- /// Exec() returns, and the status-reporting thread will have been stopped. Can be
- /// called only if Open() succeeded.
- Status Exec();
-
- /// Closes the underlying plan fragment and frees up all resources allocated in
- /// Prepare() and Open(). Must be called if Prepare() has been called - no matter
- /// whether or not Prepare() succeeded.
- void Close();
-
- /// Initiate cancellation. If called concurrently with Prepare(), will wait for
- /// Prepare() to finish in order to properly tear down Prepare()'d state.
- ///
- /// Cancel() may be called more than once. Calls after the first will have no
- /// effect. Duplicate calls to Cancel() are not serialised, and may safely execute
- /// concurrently.
- ///
- /// It is legal to call Cancel() if Prepare() returned an error.
- void Cancel();
-
- /// call these only after Prepare()
- RuntimeState* runtime_state() { return runtime_state_.get(); }
-
- /// Profile information for plan and output sink.
- RuntimeProfile* profile();
-
- /// Blocks until Prepare() is completed.
- Status WaitForPrepare() { return prepared_promise_.Get(); }
-
- /// Blocks until exec tree and sink are both opened. It is an error to call this before
- /// Prepare() has completed. If Prepare() returned an error, WaitForOpen() will
- /// return that error without blocking.
- Status WaitForOpen();
-
- /// Returns fragment instance's sink if this is the root fragment instance. Valid after
- /// Prepare() returns; if Prepare() fails may be nullptr.
- PlanRootSink* root_sink() { return root_sink_; }
-
- /// Name of the counter that is tracking per query, per host peak mem usage.
- static const std::string PER_HOST_PEAK_MEM_COUNTER;
-
- private:
- ExecNode* exec_tree_; // lives in runtime_state_->obj_pool()
- TUniqueId query_id_;
-
- /// profile reporting-related
- ReportStatusCallback report_status_cb_;
- boost::scoped_ptr<Thread> report_thread_;
- boost::mutex report_thread_lock_;
-
- /// Indicates that profile reporting thread should stop.
- /// Tied to report_thread_lock_.
- boost::condition_variable stop_report_thread_cv_;
-
- /// Indicates that profile reporting thread started.
- /// Tied to report_thread_lock_.
- boost::condition_variable report_thread_started_cv_;
-
- /// When the report thread starts, it sets 'report_thread_active_' to true and signals
- /// 'report_thread_started_cv_'. The report thread is shut down by setting
- /// 'report_thread_active_' to false and signalling 'stop_report_thread_cv_'. Protected
- /// by 'report_thread_lock_'.
- bool report_thread_active_;
-
- /// true if Close() has been called
- bool closed_;
-
- /// true if this fragment has not returned the thread token to the thread resource mgr
- bool has_thread_token_;
-
- /// 'runtime_state_' has to be before 'sink_' as 'sink_' relies on the object pool of
- /// 'runtime_state_'. This means 'sink_' is destroyed first so any implicit connections
- /// (e.g. mem_trackers_) from 'runtime_state_' to 'sink_' need to be severed prior to
- /// the dtor of 'runtime_state_'.
- boost::scoped_ptr<RuntimeState> runtime_state_;
-
- /// Profile for timings for each stage of the plan fragment instance's lifecycle.
- RuntimeProfile* timings_profile_;
-
- /// Output sink for rows sent to this fragment. Created in Prepare(), owned by this
- /// object.
- boost::scoped_ptr<DataSink> sink_;
-
- /// Set if this fragment instance is the root of the entire plan, so that a consumer can
- /// pull results by calling root_sink_->GetNext(). Same object as sink_.
- PlanRootSink* root_sink_ = nullptr;
-
- boost::scoped_ptr<RowBatch> row_batch_;
-
- /// Protects is_prepared_ and is_cancelled_, and is also used to coordinate between
- /// Prepare() and Cancel() to ensure mutual exclusion.
- boost::mutex prepare_lock_;
-
- /// True if Prepare() has been called and done some work - even if it returned an
- /// error. If Cancel() was called before Prepare(), is_prepared_ will not be set.
- bool is_prepared_;
-
- /// Set when Prepare() returns.
- Promise<Status> prepared_promise_;
-
- /// Set when OpenInternal() returns.
- Promise<Status> opened_promise_;
-
- /// True if and only if Cancel() has been called.
- bool is_cancelled_;
-
- /// A counter for the per query, per host peak mem usage. Note that this is not the
- /// max of the peak memory of all fragments running on a host since it needs to take
- /// into account when they are running concurrently. All fragments for a single query
- /// on a single host will have the same value for this counter.
- RuntimeProfile::Counter* per_host_mem_usage_;
-
- /// Number of rows returned by this fragment
- RuntimeProfile::Counter* rows_produced_counter_;
-
- /// Average number of thread tokens for the duration of the plan fragment execution.
- /// Fragments that do a lot of cpu work (non-coordinator fragment) will have at
- /// least 1 token. Fragments that contain a hdfs scan node will have 1+ tokens
- /// depending on system load. Other nodes (e.g. hash join node) can also reserve
- /// additional tokens.
- /// This is a measure of how much CPU resources this fragment used during the course
- /// of the execution.
- RuntimeProfile::Counter* average_thread_tokens_;
-
- /// Sampled memory usage at even time intervals.
- RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_;
-
- /// Sampled thread usage (tokens) at even time intervals.
- RuntimeProfile::TimeSeriesCounter* thread_usage_sampled_counter_;
-
- ObjectPool* obj_pool() { return runtime_state_->obj_pool(); }
-
- /// typedef for TPlanFragmentInstanceCtx.per_node_scan_ranges
- typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
-
- /// Main loop of profile reporting thread.
- /// Exits when notified on stop_report_thread_cv_ and report_thread_active_ is set to
- /// false. This will not send the final report.
- void ReportProfileThread();
-
- /// Invoked the report callback. If 'done' is true, sends the final report with
- /// 'status' and the profile. This type of report is sent once and only by the
- /// instance execution thread. Otherwise, a profile-only report is sent, which the
- /// ReportProfileThread() thread will do periodically.
- void SendReport(bool done, const Status& status);
-
- /// Called when the fragment execution is complete to finalize counters and send
- /// the final status report. Must be called only once.
- void FragmentComplete(const Status& status);
-
- /// Optimizes the code-generated functions in runtime_state_->llvm_codegen().
- /// Must be called after exec_tree_->Prepare() and before exec_tree_->Open().
- /// Returns error if LLVM optimization or compilation fails.
- Status OptimizeLlvmModule();
-
- /// Executes Open() logic and returns resulting status. Does not set status_.
- Status OpenInternal();
-
- /// Pulls row batches from fragment instance and pushes them to sink_ in a loop. Returns
- /// OK if the input was exhausted and sent to the sink successfully, an error otherwise.
- /// If ExecInternal() returns without an error condition, all rows will have been sent
- /// to the sink, the sink will have been closed, a final report will have been sent and
- /// the report thread will have been stopped.
- Status ExecInternal();
-
- /// Performs all the logic of Prepare() and returns resulting status.
- /// TODO: remove desc_tbl parameter as part of per-query exec rpc
- Status PrepareInternal(QueryState* qs, const TDescriptorTable& desc_tbl,
- const TPlanFragmentCtx& fragment_ctx,
- const TPlanFragmentInstanceCtx& instance_ctx);
-
- /// Releases the thread token for this fragment executor.
- void ReleaseThreadToken();
-
- /// Stops report thread, if one is running. Blocks until report thread terminates.
- /// Idempotent.
- void StopReportThread();
-
- /// Print stats about scan ranges for each volumeId in params to info log.
- void PrintVolumeIds(const PerNodeScanRanges& per_node_scan_ranges);
-
- const DescriptorTbl& desc_tbl() { return runtime_state_->desc_tbl(); }
-};
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 8b72ed9..6057b52 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -41,48 +41,51 @@ using namespace impala;
DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
"every log_mem_usage_interval'th fragment completion.");
-Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) {
- TUniqueId instance_id = params.fragment_instance_ctx.fragment_instance_id;
- VLOG_QUERY << "StartFInstance() instance_id=" << PrintId(instance_id)
+Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
+ TUniqueId query_id = params.query_ctx.query_id;
+ VLOG_QUERY << "StartQueryFInstances() query_id=" << PrintId(query_id)
<< " coord=" << params.query_ctx.coord_address;
bool dummy;
- QueryState* qs = GetOrCreateQueryState(
- params.query_ctx, params.fragment_instance_ctx.request_pool, &dummy);
- DCHECK(params.__isset.fragment_ctx);
- DCHECK(params.__isset.fragment_instance_ctx);
- Status status = qs->Prepare();
+ QueryState* qs = GetOrCreateQueryState(params.query_ctx, &dummy);
+ Status status = qs->Init(params);
if (!status.ok()) {
ReleaseQueryState(qs);
return status;
}
-
- FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(
- qs, params.fragment_ctx, params.fragment_instance_ctx, params.query_ctx.desc_tbl));
- // register instance before returning so that async Cancel() calls can
- // find the instance
- qs->RegisterFInstance(fis);
- // start new thread to execute instance
+ // avoid blocking the rpc handler thread for too long by starting a new thread for
+ // query startup (which takes ownership of the QueryState reference)
Thread t("query-exec-mgr",
- Substitute("exec-fragment-instance-$0", PrintId(instance_id)),
- &QueryExecMgr::ExecFInstance, this, fis);
+ Substitute("start-query-finstances-$0", PrintId(query_id)),
+ &QueryExecMgr::StartQueryHelper, this, qs);
t.Detach();
-
- ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
- ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
return Status::OK();
}
-QueryState* QueryExecMgr::CreateQueryState(
- const TQueryCtx& query_ctx, const string& request_pool) {
+QueryState* QueryExecMgr::CreateQueryState(const TQueryCtx& query_ctx) {
bool created;
- QueryState* qs = GetOrCreateQueryState(query_ctx, request_pool, &created);
+ QueryState* qs = GetOrCreateQueryState(query_ctx, &created);
DCHECK(created);
return qs;
}
+QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
+ QueryState* qs = nullptr;
+ int refcnt;
+ {
+ lock_guard<mutex> l(qs_map_lock_);
+ auto it = qs_map_.find(query_id);
+ if (it == qs_map_.end()) return nullptr;
+ qs = it->second;
+ refcnt = qs->refcnt_.Add(1);
+ }
+ DCHECK(qs != nullptr && refcnt > 0);
+ VLOG_QUERY << "QueryState: query_id=" << query_id << " refcnt=" << refcnt;
+ return qs;
+}
+
QueryState* QueryExecMgr::GetOrCreateQueryState(
- const TQueryCtx& query_ctx, const string& request_pool, bool* created) {
+ const TQueryCtx& query_ctx, bool* created) {
QueryState* qs = nullptr;
int refcnt;
{
@@ -90,30 +93,26 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
auto it = qs_map_.find(query_ctx.query_id);
if (it == qs_map_.end()) {
// register new QueryState
- qs = new QueryState(query_ctx, request_pool);
+ qs = new QueryState(query_ctx);
qs_map_.insert(make_pair(query_ctx.query_id, qs));
- VLOG_QUERY << "new QueryState: query_id=" << query_ctx.query_id;
*created = true;
} else {
qs = it->second;
*created = false;
}
- // decremented at the end of ExecFInstance()
+ // decremented by ReleaseQueryState()
refcnt = qs->refcnt_.Add(1);
}
- DCHECK(qs != nullptr && qs->refcnt_.Load() > 0);
- VLOG_QUERY << "QueryState: query_id=" << query_ctx.query_id << " refcnt=" << refcnt;
+ DCHECK(qs != nullptr && refcnt > 0);
return qs;
}
-void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) {
- fis->Exec();
- ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
- VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id());
+void QueryExecMgr::StartQueryHelper(QueryState* qs) {
+ qs->StartFInstances();
#ifndef ADDRESS_SANITIZER
- // tcmalloc and address sanitizer can not be used together
+ // tcmalloc and address sanitizer cannot be used together
if (FLAGS_log_mem_usage_interval > 0) {
uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
if (num_complete % FLAGS_log_mem_usage_interval == 0) {
@@ -125,19 +124,8 @@ void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) {
}
#endif
- // decrement refcount taken in StartFInstance()
- ReleaseQueryState(fis->query_state());
-}
-
-QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
- VLOG_QUERY << "GetQueryState(): query_id=" << PrintId(query_id);
- lock_guard<mutex> l(qs_map_lock_);
- auto it = qs_map_.find(query_id);
- if (it == qs_map_.end()) return nullptr;
- QueryState* qs = it->second;
- int32_t cnt = qs->refcnt_.Add(1);
- DCHECK_GT(cnt, 0);
- return qs;
+ // decrement refcount taken in StartQuery()
+ ReleaseQueryState(qs);
}
void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 92e756f..8a0c884 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -37,30 +37,24 @@ class FragmentInstanceState;
/// A daemon-wide registry and manager of QueryStates. This is the central
/// entry point for gaining refcounted access to a QueryState. It also initiates
-/// fragment instance execution.
+/// query execution.
/// Thread-safe.
-///
-/// TODO: as part of Impala-2550 (per-query exec rpc)
-/// replace Start-/CancelFInstance() with StartQuery()/CancelQuery()
class QueryExecMgr {
public:
- /// Initiates execution of this fragment instance in a newly created thread.
- /// Also creates a QueryState for this query, if none exists.
- /// In both cases it increases the refcount prior to instance execution and decreases
- /// it after execution finishes.
+ /// Creates QueryState if it doesn't exist and initiates execution of all fragment
+ /// instance for this query. All fragment instances hold a reference to their
+ /// QueryState for the duration of their execution.
///
- /// Returns an error if there was some unrecoverable problem before the fragment
- /// was started (like low memory). In that case, no QueryState is created or has its
- /// refcount incremented. After this call returns, it is legal to call
- /// FragmentInstanceState::Cancel() on this fragment instance, regardless of the
- /// return value of this function.
- Status StartFInstance(const TExecPlanFragmentParams& params);
+ /// Returns an error if there was some unrecoverable problem before any instance
+ /// was started (like low memory). In that case, no QueryState is created.
+ /// After this function returns, it is legal to call QueryState::Cancel(), regardless of
+ /// the return value of this function.
+ Status StartQuery(const TExecQueryFInstancesParams& params);
/// Creates a QueryState for the given query with the provided parameters. Only valid
/// to call if the QueryState does not already exist. The caller must call
/// ReleaseQueryState() with the returned QueryState to decrement the refcount.
- QueryState* CreateQueryState(
- const TQueryCtx& query_ctx, const std::string& request_pool);
+ QueryState* CreateQueryState(const TQueryCtx& query_ctx);
/// If a QueryState for the given query exists, increments that refcount and returns
/// the QueryState, otherwise returns nullptr.
@@ -78,11 +72,11 @@ class QueryExecMgr {
/// Gets the existing QueryState or creates a new one if not present.
/// 'created' is set to true if it was created, false otherwise.
- QueryState* GetOrCreateQueryState(
- const TQueryCtx& query_ctx, const std::string& request_pool, bool* created);
+ /// Increments the refcount.
+ QueryState* GetOrCreateQueryState(const TQueryCtx& query_ctx, bool* created);
- /// Execute instance.
- void ExecFInstance(FragmentInstanceState* fis);
+ /// Execute instances and decrement refcount (acquire ownership of qs).
+ void StartQueryHelper(QueryState* qs);
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 2cc4818..1ae0f36 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -21,20 +21,27 @@
#include <boost/thread/locks.hpp>
#include <kudu/client/client.h>
+#include "exprs/expr.h"
#include "exec/kudu-util.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/backend-client.h"
#include "runtime/exec-env.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-exec-mgr.h"
+#include "runtime/runtime-state.h"
#include "util/debug-util.h"
+#include "util/impalad-metrics.h"
+#include "util/thread.h"
#include "common/names.h"
using boost::algorithm::join;
using namespace impala;
+#define RETRY_SLEEP_MS 100
+
struct QueryState::KuduClientPtr {
kudu::client::sp::shared_ptr<kudu::client::KuduClient> kudu_client;
};
@@ -49,14 +56,17 @@ QueryState::ScopedRef::~ScopedRef() {
ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
}
-QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
+QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
: query_ctx_(query_ctx),
refcnt_(0),
- prepared_(false),
- released_resources_(false),
- buffer_reservation_(nullptr),
- file_group_(nullptr) {
- TQueryOptions& query_options = query_ctx_.client_request.query_options;
+ is_cancelled_(0) {
+ if (query_ctx_.request_pool.empty()) {
+ // fix up pool name for tests
+ DCHECK(!request_pool.empty());
+ const_cast<TQueryCtx&>(query_ctx_).request_pool = request_pool;
+ }
+ TQueryOptions& query_options =
+ const_cast<TQueryOptions&>(query_ctx_.client_request.query_options);
// max_errors does not indicate how many errors in total have been recorded, but rather
// how many are distinct. It is defined as the sum of the number of generic errors and
// the number of distinct other errors.
@@ -66,59 +76,56 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
if (query_options.batch_size <= 0) {
query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
}
- InitMemTrackers(pool);
+ InitMemTrackers();
}
void QueryState::ReleaseResources() {
+ DCHECK(!released_resources_);
// Clean up temporary files.
if (file_group_ != nullptr) file_group_->Close();
// Release any remaining reservation.
if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
// Avoid dangling reference from the parent of 'query_mem_tracker_'.
if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent();
+ if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources();
released_resources_ = true;
}
QueryState::~QueryState() {
DCHECK(released_resources_);
+ DCHECK_EQ(refcnt_.Load(), 0);
}
-Status QueryState::Prepare() {
- lock_guard<SpinLock> l(prepare_lock_);
- if (prepared_) {
- DCHECK(prepare_status_.ok());
- return Status::OK();
- }
- RETURN_IF_ERROR(prepare_status_);
-
- Status status;
+Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
// Starting a new query creates threads and consumes a non-trivial amount of memory.
// If we are already starved for memory, fail as early as possible to avoid consuming
// more resources.
ExecEnv* exec_env = ExecEnv::GetInstance();
MemTracker* process_mem_tracker = exec_env->process_mem_tracker();
if (process_mem_tracker->LimitExceeded()) {
- string msg = Substitute("Query $0 could not start because the backend Impala daemon "
- "is over its memory limit",
- PrintId(query_id()));
- status = process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
- goto error;
+ string msg = Substitute(
+ "Query $0 could not start because the backend Impala daemon "
+ "is over its memory limit", PrintId(query_id()));
+ RETURN_IF_ERROR(process_mem_tracker->MemLimitExceeded(NULL, msg, 0));
}
// Do buffer-pool-related setup if running in a backend test that explicitly created
// the pool.
- if (exec_env->buffer_pool() != nullptr) {
- status = InitBufferPoolState();
- if (!status.ok()) goto error;
- }
- prepared_ = true;
- return Status::OK();
+ if (exec_env->buffer_pool() != nullptr) RETURN_IF_ERROR(InitBufferPoolState());
-error:
- prepare_status_ = status;
- return status;
+ // don't copy query_ctx, it's large and we already did that in the c'tor
+ rpc_params_.__set_coord_state_idx(rpc_params.coord_state_idx);
+ TExecQueryFInstancesParams& non_const_params =
+ const_cast<TExecQueryFInstancesParams&>(rpc_params);
+ rpc_params_.fragment_ctxs.swap(non_const_params.fragment_ctxs);
+ rpc_params_.__isset.fragment_ctxs = true;
+ rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
+ rpc_params_.__isset.fragment_instance_ctxs = true;
+
+ return Status::OK();
}
-void QueryState::InitMemTrackers(const std::string& pool) {
+void QueryState::InitMemTrackers() {
+ const string& pool = query_ctx_.request_pool;
int64_t bytes_limit = -1;
if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
bytes_limit = query_options().mem_limit;
@@ -160,23 +167,194 @@ Status QueryState::InitBufferPoolState() {
return Status::OK();
}
-void QueryState::RegisterFInstance(FragmentInstanceState* fis) {
- VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id());
- lock_guard<SpinLock> l(fis_map_lock_);
- DCHECK_EQ(fis_map_.count(fis->instance_id()), 0);
- fis_map_.insert(make_pair(fis->instance_id(), fis));
-}
-
FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
- lock_guard<SpinLock> l(fis_map_lock_);
+ if (!instances_prepared_promise_.Get().ok()) return nullptr;
auto it = fis_map_.find(instance_id);
return it != fis_map_.end() ? it->second : nullptr;
}
-Status QueryState::GetKuduClient(const std::vector<std::string>& master_addresses,
- kudu::client::KuduClient** client) {
- std::string master_addr_concat = join(master_addresses, ",");
+void QueryState::ReportExecStatus(bool done, const Status& status,
+ FragmentInstanceState* fis) {
+ ReportExecStatusAux(done, status, fis, true);
+}
+
+void QueryState::ReportExecStatusAux(bool done, const Status& status,
+ FragmentInstanceState* fis, bool instances_started) {
+ // if we're reporting an error, we're done
+ DCHECK(status.ok() || done);
+ // if this is not for a specific fragment instance, we're reporting an error
+ DCHECK(fis != nullptr || !status.ok());
+ DCHECK(fis == nullptr || fis->IsPrepared());
+
+ // This will send a report even if we are cancelled. If the query completed correctly
+ // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
+ // be waiting for a final report and profile.
+
+ Status coord_status;
+ ImpalaBackendConnection coord(ExecEnv::GetInstance()->impalad_client_cache(),
+ query_ctx().coord_address, &coord_status);
+ if (!coord_status.ok()) {
+ // TODO: this might flood the log
+ LOG(WARNING) << "Couldn't get a client for " << query_ctx().coord_address
+ <<"\tReason: " << coord_status.GetDetail();
+ if (instances_started) Cancel();
+ return;
+ }
+
+ TReportExecStatusParams params;
+ params.protocol_version = ImpalaInternalServiceVersion::V1;
+ params.__set_query_id(query_ctx().query_id);
+ DCHECK(rpc_params().__isset.coord_state_idx);
+ params.__set_coord_state_idx(rpc_params().coord_state_idx);
+
+ if (fis != nullptr) {
+ // create status for 'fis'
+ params.instance_exec_status.emplace_back();
+ params.__isset.instance_exec_status = true;
+ TFragmentInstanceExecStatus& instance_status = params.instance_exec_status.back();
+ instance_status.__set_fragment_instance_id(fis->instance_id());
+ status.SetTStatus(&instance_status);
+ instance_status.__set_done(done);
+
+ if (fis->profile() != nullptr) {
+ fis->profile()->ToThrift(&instance_status.profile);
+ instance_status.__isset.profile = true;
+ }
+
+ // Only send updates to insert status if fragment is finished, the coordinator
+ // waits until query execution is done to use them anyhow.
+ if (done) {
+ TInsertExecStatus insert_status;
+ if (fis->runtime_state()->hdfs_files_to_move()->size() > 0) {
+ insert_status.__set_files_to_move(*fis->runtime_state()->hdfs_files_to_move());
+ }
+ if (fis->runtime_state()->per_partition_status()->size() > 0) {
+ insert_status.__set_per_partition_status(
+ *fis->runtime_state()->per_partition_status());
+ }
+ params.__set_insert_exec_status(insert_status);
+ }
+
+ // Send new errors to coordinator
+ fis->runtime_state()->GetUnreportedErrors(¶ms.error_log);
+ params.__isset.error_log = (params.error_log.size() > 0);
+ }
+
+ TReportExecStatusResult res;
+ Status rpc_status;
+ bool retry_is_safe;
+ // Try to send the RPC 3 times before failing.
+ for (int i = 0; i < 3; ++i) {
+ rpc_status = coord.DoRpc(
+ &ImpalaBackendClient::ReportExecStatus, params, &res, &retry_is_safe);
+ if (rpc_status.ok()) break;
+ if (!retry_is_safe) break;
+ if (i < 2) SleepForMs(RETRY_SLEEP_MS);
+ }
+ Status result_status(res.status);
+ if ((!rpc_status.ok() || !result_status.ok()) && instances_started) {
+ // TODO: should we try to keep rpc_status for the final report? (but the final
+ // report, following this Cancel(), may not succeed anyway.)
+ // TODO: not keeping an error status here means that all instances might
+ // abort with CANCELLED status, despite there being an error
+ Cancel();
+ }
+}
+
+Status QueryState::WaitForPrepare() {
+ return instances_prepared_promise_.Get();
+}
+
+void QueryState::StartFInstances() {
+ VLOG_QUERY << "StartFInstances(): query_id=" << PrintId(query_id())
+ << " #instances=" << rpc_params_.fragment_instance_ctxs.size();
+ DCHECK_GT(refcnt_.Load(), 0);
+
+ // set up desc tbl
+ DCHECK(query_ctx().__isset.desc_tbl);
+ Status status = DescriptorTbl::Create(
+ &obj_pool_, query_ctx().desc_tbl, query_mem_tracker_, &desc_tbl_);
+ if (!status.ok()) {
+ instances_prepared_promise_.Set(status);
+ ReportExecStatusAux(true, status, nullptr, false);
+ return;
+ }
+ VLOG_QUERY << "descriptor table for query=" << PrintId(query_id())
+ << "\n" << desc_tbl_->DebugString();
+
+ DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
+ TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0];
+ int fragment_ctx_idx = 0;
+ for (const TPlanFragmentInstanceCtx& instance_ctx: rpc_params_.fragment_instance_ctxs) {
+ // determine corresponding TPlanFragmentCtx
+ if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {
+ ++fragment_ctx_idx;
+ DCHECK_LT(fragment_ctx_idx, rpc_params_.fragment_ctxs.size());
+ fragment_ctx = &rpc_params_.fragment_ctxs[fragment_ctx_idx];
+ // we expect fragment and instance contexts to follow the same order
+ DCHECK_EQ(fragment_ctx->fragment.idx, instance_ctx.fragment_idx);
+ }
+ FragmentInstanceState* fis = obj_pool_.Add(
+ new FragmentInstanceState(this, *fragment_ctx, instance_ctx));
+ fis_map_.emplace(fis->instance_id(), fis);
+
+ // update fragment_map_
+ vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
+ fis_list.push_back(fis);
+
+ // start new thread to execute instance
+ refcnt_.Add(1); // decremented in ExecFInstance()
+ Thread t("query-state",
+ Substitute(
+ "exec-query-finstance-$0", PrintId(instance_ctx.fragment_instance_id)),
+ &QueryState::ExecFInstance, this, fis);
+ t.Detach();
+ }
+
+ // don't return until every instance is prepared and record the first non-OK
+ // (non-CANCELLED if available) status
+ Status prepare_status;
+ for (auto entry: fis_map_) {
+ Status instance_status = entry.second->WaitForPrepare();
+ // don't wipe out an error in one instance with the resulting CANCELLED from
+ // the remaining instances
+ if (!instance_status.ok() && (prepare_status.ok() || prepare_status.IsCancelled())) {
+ prepare_status = instance_status;
+ }
+ }
+ instances_prepared_promise_.Set(prepare_status);
+}
+
+void QueryState::ExecFInstance(FragmentInstanceState* fis) {
+ ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
+ ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
+ VLOG_QUERY << "Executing instance. instance_id=" << PrintId(fis->instance_id())
+ << " fragment_idx=" << fis->instance_ctx().fragment_idx
+ << " per_fragment_instance_idx=" << fis->instance_ctx().per_fragment_instance_idx
+ << " coord_state_idx=" << rpc_params().coord_state_idx
+ << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value();
+ Status status = fis->Exec();
+ ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
+ VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id())
+ << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value()
+ << " status=" << status;
+ // initiate cancellation if nobody has done so yet
+ if (!status.ok()) Cancel();
+ // decrement refcount taken in StartFInstances()
+ ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
+}
+
+void QueryState::Cancel() {
+ VLOG_QUERY << "Cancel: query_id=" << query_id();
+ (void) instances_prepared_promise_.Get();
+ if (!is_cancelled_.CompareAndSwap(0, 1)) return;
+ for (auto entry: fis_map_) entry.second->Cancel();
+}
+
+Status QueryState::GetKuduClient(
+ const vector<string>& master_addresses, kudu::client::KuduClient** client) {
+ string master_addr_concat = join(master_addresses, ",");
lock_guard<SpinLock> l(kudu_client_map_lock_);
auto kudu_client_map_it = kudu_client_map_.find(master_addr_concat);
if (kudu_client_map_it == kudu_client_map_.end()) {
@@ -191,3 +369,12 @@ Status QueryState::GetKuduClient(const std::vector<std::string>& master_addresse
}
return Status::OK();
}
+
+void QueryState::PublishFilter(int32_t filter_id, int fragment_idx,
+ const TBloomFilter& thrift_bloom_filter) {
+ if (!instances_prepared_promise_.Get().ok()) return;
+ DCHECK_EQ(fragment_map_.count(fragment_idx), 1);
+ for (FragmentInstanceState* fis: fragment_map_[fragment_idx]) {
+ fis->PublishFilter(filter_id, thrift_bloom_filter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index d7ce10f..5660a49 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -29,6 +29,7 @@
#include "runtime/tmp-file-mgr.h"
#include "util/spinlock.h"
#include "util/uid-util.h"
+#include "util/promise.h"
namespace kudu { namespace client { class KuduClient; } }
@@ -44,17 +45,35 @@ class ReservationTracker;
/// instances; in contrast, fragment instance-specific state is collected in
/// FragmentInstanceState.
///
-/// The lifetime of an instance of this class is dictated by a reference count.
-/// Any thread that executes on behalf of a query, and accesses any of its state,
-/// must obtain a reference to the corresponding QueryState and hold it for at least the
+/// The lifetime of a QueryState is dictated by a reference count. Any thread that
+/// executes on behalf of a query, and accesses any of its state, must obtain a
+/// reference to the corresponding QueryState and hold it for at least the
/// duration of that access. The reference is obtained and released via
/// QueryExecMgr::Get-/ReleaseQueryState() or via QueryState::ScopedRef (the latter
/// for references limited to the scope of a single function or block).
-/// As long as the reference count is greater than 0, all query state (contained
-/// either in this class or accessible through this class, such as the
-/// FragmentInstanceStates) is guaranteed to be alive.
+/// As long as the reference count is greater than 0, all of a query's control
+/// structures (contained either in this class or accessible through this class, such
+/// as the FragmentInstanceStates) are guaranteed to be alive.
+///
+/// When any fragment instance execution returns with an error status, all
+/// fragment instances are automatically cancelled.
+///
+/// Status reporting: all instances currently report their status independently.
+/// Each instance sends at least one final status report with its overall execution
+/// status, so if any of the instances encountered an error, that error will be reported.
///
/// Thread-safe, unless noted otherwise.
+///
+/// TODO:
+/// - set up kudu clients in Init(), remove related locking
+/// - release resources (those referenced directly or indirectly by the query result
+/// set) automatically when all instances have finished execution
+/// (either by returning all rows or by being cancelled), rather than waiting for an
+/// explicit call to ReleaseResources()
+/// - when ReportExecStatus() encounters an error, query execution at this node
+/// gets aborted, but it's possible for the coordinator not to find out about that;
+/// fix the coordinator to periodically ping the backends (should the coordinator
+/// simply poll for the status reports?)
class QueryState {
public:
/// Use this class to obtain a QueryState for the duration of a function/block,
@@ -84,89 +103,126 @@ class QueryState {
/// a shared pool for all objects that have query lifetime
ObjectPool* obj_pool() { return &obj_pool_; }
- /// This TQueryCtx was copied from the first fragment instance which led to the
- /// creation of this QueryState. For all subsequently arriving fragment instances the
- /// desc_tbl in this context will be incorrect, therefore query_ctx().desc_tbl should
- /// not be used. This restriction will go away with the switch to a per-query exec
- /// rpc.
const TQueryCtx& query_ctx() const { return query_ctx_; }
-
- const TUniqueId& query_id() const { return query_ctx_.query_id; }
-
+ const TUniqueId& query_id() const { return query_ctx().query_id; }
const TQueryOptions& query_options() const {
return query_ctx_.client_request.query_options;
}
-
MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
+
+ // the following getters are only valid after Prepare()
ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
TmpFileMgr::FileGroup* file_group() const { return file_group_; }
+ const TExecQueryFInstancesParams& rpc_params() const { return rpc_params_; }
+
+ // the following getters are only valid after StartFInstances()
+ const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
/// Sets up state required for fragment execution: memory reservations, etc. Fails
- /// if resources could not be acquired. Safe to call concurrently and idempotent:
- /// the first thread to call this does the setup work.
- Status Prepare();
+ /// if resources could not be acquired. Uses few cycles and never blocks.
+ /// Not idempotent, not thread-safe.
+ /// The remaining public functions must be called only after Init().
+ Status Init(const TExecQueryFInstancesParams& rpc_params) WARN_UNUSED_RESULT;
+
+ /// Performs the runtime-intensive parts of initial setup and starts all fragment
+ /// instances belonging to this query. Each instance receives its own execution
+ /// thread. Blocks until all fragment instances have finished their Prepare phase.
+ /// Not idempotent, not thread-safe.
+ void StartFInstances();
+
+ /// Return overall status of Prepare phases of fragment instances. A failure
+ /// in any instance's Prepare will cause this function to return an error status.
+ /// Blocks until all fragment instances have finished their Prepare phase.
+ Status WaitForPrepare();
+
+ /// Blocks until all fragment instances have finished their Prepare phase.
+ FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
- /// Registers a new FInstanceState.
- void RegisterFInstance(FragmentInstanceState* fis);
+ /// Blocks until all fragment instances have finished their Prepare phase.
+ void PublishFilter(int32_t filter_id, int fragment_idx,
+ const TBloomFilter& thrift_bloom_filter);
- /// Returns the instance state or nullptr if the instance id has not previously
- /// been registered. The returned FIS is valid for the duration of the QueryState.
- FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
+ /// Cancels all actively executing fragment instances. Blocks until all fragment
+ /// instances have finished their Prepare phase. Idempotent.
+ void Cancel();
/// Called once the query is complete to release any resources.
- /// Must be called before destroying the QueryState.
+ /// Must be called only once and before destroying the QueryState.
+ /// Not idempotent, not thread-safe.
void ReleaseResources();
/// Gets a KuduClient for this list of master addresses. It will lookup and share
/// an existing KuduClient if possible. Otherwise, it will create a new KuduClient
/// internally and return a pointer to it. All KuduClients accessed through this
/// interface are owned by the QueryState. Thread safe.
- Status GetKuduClient(const std::vector<std::string>& master_addrs,
- kudu::client::KuduClient** client);
+ Status GetKuduClient(
+ const std::vector<std::string>& master_addrs, kudu::client::KuduClient** client)
+ WARN_UNUSED_RESULT;
+
+ /// Sends a ReportExecStatus rpc to the coordinator. If fis == nullptr, the
+ /// status must be an error. If fis is given, expects that fis finished its Prepare
+ /// phase; it then sends a report for that instance, including its profile.
+ /// If there is an error during the rpc, initiates cancellation.
+ void ReportExecStatus(bool done, const Status& status, FragmentInstanceState* fis);
~QueryState();
private:
friend class QueryExecMgr;
+ /// test execution
+ friend class RuntimeState;
+
static const int DEFAULT_BATCH_SIZE = 1024;
- TQueryCtx query_ctx_;
+ /// set in c'tor
+ const TQueryCtx query_ctx_;
- ObjectPool obj_pool_;
- AtomicInt32 refcnt_;
+ /// the top-level MemTracker for this query (owned by obj_pool_), created in c'tor
+ MemTracker* query_mem_tracker_ = nullptr;
- /// Held for duration of Prepare(). Protects 'prepared_',
- /// 'prepare_status_' and the members initialized in Prepare().
- SpinLock prepare_lock_;
+ /// set in Prepare(); rpc_params_.query_ctx is *not* set to avoid duplication
+ /// with query_ctx_
+ /// TODO: find a way not to have to copy this
+ TExecQueryFInstancesParams rpc_params_;
- /// Non-OK if Prepare() failed the first time it was called.
- /// All subsequent calls to Prepare() return this status.
- Status prepare_status_;
+ /// Buffer reservation for this query (owned by obj_pool_)
+ /// Only non-null in backend tests that explicitly enabled the new buffer pool
+ /// Set in Prepare().
+ /// TODO: this will always be non-null once IMPALA-3200 is done
+ ReservationTracker* buffer_reservation_ = nullptr;
- /// True if Prepare() executed and finished successfully.
- bool prepared_;
+ /// Temporary files for this query (owned by obj_pool_)
+ /// Only non-null in backend tests the explicitly enabled the new buffer pool
+ /// Set in Prepare().
+ /// TODO: this will always be non-null once IMPALA-3200 is done
+ TmpFileMgr::FileGroup* file_group_ = nullptr;
- /// True if and only if ReleaseResources() has been called.
- bool released_resources_;
+ /// created in StartFInstances(), owned by obj_pool_
+ DescriptorTbl* desc_tbl_ = nullptr;
- SpinLock fis_map_lock_; // protects fis_map_
+ /// Barrier for the completion of the Prepare phases of all fragment instances,
+ /// set in StartFInstances().
+ Promise<Status> instances_prepared_promise_;
- /// map from instance id to its state (owned by obj_pool_)
+ /// map from instance id to its state (owned by obj_pool_), populated in
+ /// StartFInstances(); not valid to read from until instances_prepare_promise_
+ /// is set
std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;
- /// The top-level MemTracker for this query (owned by obj_pool_).
- MemTracker* query_mem_tracker_;
+ /// map from fragment index to its instances (owned by obj_pool_), populated in
+ /// StartFInstances()
+ std::unordered_map<int, std::vector<FragmentInstanceState*>> fragment_map_;
- /// Buffer reservation for this query (owned by obj_pool_)
- /// Only non-null in backend tests that explicitly enabled the new buffer pool
- /// TODO: this will always be non-null once IMPALA-3200 is done
- ReservationTracker* buffer_reservation_;
+ ObjectPool obj_pool_;
+ AtomicInt32 refcnt_;
- /// Temporary files for this query (owned by obj_pool_)
- /// Only non-null in backend tests the explicitly enabled the new buffer pool
- /// TODO: this will always be non-null once IMPALA-3200 is done
- TmpFileMgr::FileGroup* file_group_;
+ /// set to 1 when any fragment instance fails or when Cancel() is called; used to
+ /// initiate cancellation exactly once
+ AtomicInt32 is_cancelled_;
+
+ /// True if and only if ReleaseResources() has been called.
+ bool released_resources_ = false;
SpinLock kudu_client_map_lock_; // protects kudu_client_map_
@@ -184,16 +240,25 @@ class QueryState {
/// that the master address lists be identical in order to share a KuduClient.
KuduClientMap kudu_client_map_;
- /// Create QueryState w/ copy of query_ctx and refcnt of 0.
- /// The query is associated with the resource pool named 'pool'
- QueryState(const TQueryCtx& query_ctx, const std::string& pool);
+ /// Create QueryState w/ refcnt of 0.
+ /// The query is associated with the resource pool query_ctx.request_pool or
+ /// 'request_pool', if the former is not set (needed for tests).
+ QueryState(const TQueryCtx& query_ctx, const std::string& request_pool = "");
+
+ /// Execute the fragment instance and decrement the refcnt when done.
+ void ExecFInstance(FragmentInstanceState* fis);
/// Called from Prepare() to initialize MemTrackers.
- void InitMemTrackers(const std::string& pool);
+ void InitMemTrackers();
- /// Called from PrepareForExecution() to setup buffer reservations and the
+ /// Called from Prepare() to setup buffer reservations and the
/// file group. Fails if required resources are not available.
- Status InitBufferPoolState();
+ Status InitBufferPoolState() WARN_UNUSED_RESULT;
+
+ /// Same behavior as ReportExecStatus().
+ /// Cancel on error only if instances_started is true.
+ void ReportExecStatusAux(bool done, const Status& status, FragmentInstanceState* fis,
+ bool instances_started);
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 3e2dc6d..2a47cac 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -151,9 +151,10 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
if (has_remote_target
&& state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
+ params.__set_filter_id(filter_id);
+ params.__set_query_id(state_->query_id());
BloomFilter::ToThrift(bloom_filter, ¶ms.bloom_filter);
- params.filter_id = filter_id;
- params.query_id = state_->query_id();
+ params.__isset.bloom_filter = true;
ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
SendFilterToCoordinator, state_->query_ctx().coord_address, params,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 62d0737..c5e2f59 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -74,16 +74,13 @@ namespace impala {
RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env)
- : desc_tbl_(nullptr),
- obj_pool_(new ObjectPool()),
- query_state_(query_state),
+ : query_state_(query_state),
fragment_ctx_(&fragment_ctx),
instance_ctx_(&instance_ctx),
now_(new TimestampValue(query_state->query_ctx().now_string.c_str(),
query_state->query_ctx().now_string.size())),
exec_env_(exec_env),
- profile_(obj_pool_.get(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
- query_mem_tracker_(query_state_->query_mem_tracker()),
+ profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
instance_buffer_reservation_(nullptr),
is_cancelled_(false),
root_node_id_(-1) {
@@ -91,20 +88,21 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
}
RuntimeState::RuntimeState(
- const TQueryCtx& query_ctx, ExecEnv* exec_env, const std::string& request_pool)
- : obj_pool_(new ObjectPool()),
- query_state_(nullptr),
+ const TQueryCtx& qctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl)
+ : query_state_(new QueryState(qctx, "test-pool")),
fragment_ctx_(nullptr),
instance_ctx_(nullptr),
- local_query_ctx_(query_ctx),
- now_(new TimestampValue(query_ctx.now_string.c_str(), query_ctx.now_string.size())),
+ local_query_state_(query_state_),
+ now_(new TimestampValue(qctx.now_string.c_str(), qctx.now_string.size())),
exec_env_(exec_env),
- profile_(obj_pool_.get(), "<unnamed>"),
- query_mem_tracker_(MemTracker::CreateQueryMemTracker(
- query_id(), query_options(), request_pool, obj_pool_.get())),
+ profile_(obj_pool(), "<unnamed>"),
instance_buffer_reservation_(nullptr),
is_cancelled_(false),
root_node_id_(-1) {
+ if (query_ctx().request_pool.empty()) {
+ const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool";
+ }
+ if (desc_tbl != nullptr) query_state_->desc_tbl_ = desc_tbl;
Init();
}
@@ -125,10 +123,10 @@ void RuntimeState::Init() {
total_network_receive_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkReceiveTime");
instance_mem_tracker_.reset(new MemTracker(
- runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_));
+ runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
if (query_state_ != nullptr && exec_env_->buffer_pool() != nullptr) {
- instance_buffer_reservation_ = obj_pool_->Add(new ReservationTracker);
+ instance_buffer_reservation_ = obj_pool()->Add(new ReservationTracker);
instance_buffer_reservation_->InitChildTracker(&profile_,
query_state_->buffer_reservation(), instance_mem_tracker_.get(),
numeric_limits<int64_t>::max());
@@ -143,7 +141,7 @@ Status RuntimeState::CreateBlockMgr() {
DCHECK(block_mgr_.get() == NULL);
// Compute the max memory the block mgr will use.
- int64_t block_mgr_limit = query_mem_tracker_->lowest_limit();
+ int64_t block_mgr_limit = query_mem_tracker()->lowest_limit();
if (block_mgr_limit < 0) block_mgr_limit = numeric_limits<int64_t>::max();
block_mgr_limit = min(static_cast<int64_t>(block_mgr_limit * BLOCK_MGR_MEM_FRACTION),
block_mgr_limit - BLOCK_MGR_MEM_MIN_REMAINING);
@@ -266,7 +264,6 @@ void RuntimeState::UnregisterReaderContexts() {
void RuntimeState::ReleaseResources() {
UnregisterReaderContexts();
- if (desc_tbl_ != nullptr) desc_tbl_->ClosePartitionExprs(this);
if (filter_bank_ != nullptr) filter_bank_->Close();
if (resource_pool_ != nullptr) {
exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
@@ -277,17 +274,22 @@ void RuntimeState::ReleaseResources() {
// Release the reservation, which should be unused at the point.
if (instance_buffer_reservation_ != nullptr) instance_buffer_reservation_->Close();
- // 'query_mem_tracker_' must be valid as long as 'instance_mem_tracker_' is so
+ // 'query_mem_tracker()' must be valid as long as 'instance_mem_tracker_' is so
// delete 'instance_mem_tracker_' first.
// LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded, so
// break the link between 'instance_mem_tracker_' and its parent before
// 'instance_mem_tracker_' and its children are destroyed.
instance_mem_tracker_->UnregisterFromParent();
+ if (instance_mem_tracker_->consumption() != 0) {
+ LOG(WARNING) << "Query " << query_id() << " may have leaked memory." << endl
+ << instance_mem_tracker_->LogUsage();
+ }
instance_mem_tracker_.reset();
- // If this RuntimeState owns 'query_mem_tracker_' it must deregister it.
- if (query_state_ == nullptr) query_mem_tracker_->UnregisterFromParent();
- query_mem_tracker_ = nullptr;
+ if (local_query_state_.get() != nullptr) {
+ // if we created this QueryState, we must call ReleaseResources()
+ local_query_state_->ReleaseResources();
+ }
}
const std::string& RuntimeState::GetEffectiveUser() const {
@@ -314,14 +316,28 @@ HBaseTableFactory* RuntimeState::htable_factory() {
return exec_env_->htable_factory();
}
+ObjectPool* RuntimeState::obj_pool() const {
+ DCHECK(query_state_ != nullptr);
+ return query_state_->obj_pool();
+}
+
const TQueryCtx& RuntimeState::query_ctx() const {
- return query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_;
+ DCHECK(query_state_ != nullptr);
+ return query_state_->query_ctx();
+}
+
+const DescriptorTbl& RuntimeState::desc_tbl() const {
+ DCHECK(query_state_ != nullptr);
+ return query_state_->desc_tbl();
}
const TQueryOptions& RuntimeState::query_options() const {
- const TQueryCtx& query_ctx =
- query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_;
- return query_ctx.client_request.query_options;
+ return query_ctx().client_request.query_options;
+}
+
+MemTracker* RuntimeState::query_mem_tracker() {
+ DCHECK(query_state_ != nullptr);
+ return query_state_->query_mem_tracker();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index db9948f..d70459f 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -81,9 +81,10 @@ class RuntimeState {
RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env);
- /// RuntimeState for executing expr in fe-support.
+ /// RuntimeState for test execution and fe-support.cc. Creates its own QueryState and
+ /// installs desc_tbl, if set. If query_ctx.request_pool isn't set, sets it to "test-pool".
RuntimeState(
- const TQueryCtx& query_ctx, ExecEnv* exec_env, const std::string& request_pool);
+ const TQueryCtx& query_ctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl = nullptr);
/// Empty d'tor to avoid issues with scoped_ptr.
~RuntimeState();
@@ -95,9 +96,9 @@ class RuntimeState {
Status CreateBlockMgr();
QueryState* query_state() const { return query_state_; }
- ObjectPool* obj_pool() const { return obj_pool_.get(); }
- const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
- void set_desc_tbl(DescriptorTbl* desc_tbl) { desc_tbl_ = desc_tbl; }
+ /// Return the query's ObjectPool
+ ObjectPool* obj_pool() const;
+ const DescriptorTbl& desc_tbl() const;
const TQueryOptions& query_options() const;
int batch_size() const { return query_options().batch_size; }
bool abort_on_error() const { return query_options().abort_on_error; }
@@ -128,7 +129,7 @@ class RuntimeState {
CatalogServiceClientCache* catalogd_client_cache();
DiskIoMgr* io_mgr();
MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
- MemTracker* query_mem_tracker() { return query_mem_tracker_; }
+ MemTracker* query_mem_tracker(); // reference to the query_state_'s memtracker
ReservationTracker* instance_buffer_reservation() {
return instance_buffer_reservation_;
}
@@ -253,7 +254,7 @@ class RuntimeState {
Status LogOrReturnError(const ErrorMsg& message);
bool is_cancelled() const { return is_cancelled_; }
- void set_is_cancelled(bool v) { is_cancelled_ = v; }
+ void set_is_cancelled() { is_cancelled_ = true; }
RuntimeProfile::Counter* total_storage_wait_timer() {
return total_storage_wait_timer_;
@@ -320,9 +321,6 @@ class RuntimeState {
block_mgr_ = block_mgr;
}
- DescriptorTbl* desc_tbl_ = nullptr;
- boost::scoped_ptr<ObjectPool> obj_pool_;
-
/// Lock protecting error_log_
SpinLock error_log_lock_;
@@ -330,13 +328,12 @@ class RuntimeState {
ErrorLogMap error_log_;
/// Global QueryState and original thrift descriptors for this fragment instance.
- /// Not set by the (const TQueryCtx&) c'tor.
QueryState* const query_state_;
const TPlanFragmentCtx* const fragment_ctx_;
const TPlanFragmentInstanceCtx* const instance_ctx_;
- /// Provides query ctx if query_state_ == nullptr.
- TQueryCtx local_query_ctx_;
+ /// only populated by the (const QueryCtx&, ExecEnv*, DescriptorTbl*) c'tor
+ boost::scoped_ptr<QueryState> local_query_state_;
/// Provides instance id if instance_ctx_ == nullptr
TUniqueId no_instance_id_;
@@ -377,16 +374,12 @@ class RuntimeState {
/// Total CPU utilization for all threads in this plan fragment.
RuntimeProfile::ThreadCounters* total_thread_statistics_;
- /// Reference to the query MemTracker, owned by 'query_state_' if that is non-NULL
- /// or stored in 'obj_pool_' otherwise.
- MemTracker* query_mem_tracker_;
-
/// Memory usage of this fragment instance, a child of 'query_mem_tracker_'.
boost::scoped_ptr<MemTracker> instance_mem_tracker_;
/// Buffer reservation for this fragment instance - a child of the query buffer
/// reservation. Non-NULL if 'query_state_' is not NULL and ExecEnv::buffer_pool_
- /// was created by a backend test. Owned by 'obj_pool_'.
+ /// was created by a backend test. Owned by obj_pool().
ReservationTracker* instance_buffer_reservation_;
/// if true, execution should stop with a CANCELLED status
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 026b2ee..37b4363 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -18,17 +18,15 @@
#include "runtime/test-env.h"
#include <limits>
+#include <memory>
#include "runtime/buffered-block-mgr.h"
#include "runtime/query-exec-mgr.h"
#include "runtime/tmp-file-mgr.h"
+#include "runtime/query-state.h"
#include "util/disk-info.h"
#include "util/impalad-metrics.h"
-
#include "gutil/strings/substitute.h"
-
-#include <memory>
-
#include "common/names.h"
using boost::scoped_ptr;
@@ -40,8 +38,8 @@ scoped_ptr<MetricGroup> TestEnv::static_metrics_;
TestEnv::TestEnv()
: have_tmp_file_mgr_args_(false),
- buffer_pool_min_buffer_len_(1024),
- buffer_pool_capacity_(0) {}
+ buffer_pool_min_buffer_len_(-1),
+ buffer_pool_capacity_(-1) {}
Status TestEnv::Init() {
if (static_metrics_ == NULL) {
@@ -61,7 +59,9 @@ Status TestEnv::Init() {
} else {
RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics()));
}
- exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_);
+ if (buffer_pool_min_buffer_len_ != -1 && buffer_pool_capacity_ != -1) {
+ exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_);
+ }
return Status::OK();
}
@@ -80,7 +80,7 @@ void TestEnv::SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity) {
TestEnv::~TestEnv() {
// Queries must be torn down first since they are dependent on global state.
TearDownQueries();
- exec_env_->disk_io_mgr_.reset();
+ // tear down exec env state to avoid leaks
exec_env_.reset();
}
@@ -113,13 +113,23 @@ Status TestEnv::CreateQueryState(
if (query_options != nullptr) query_ctx.client_request.query_options = *query_options;
query_ctx.query_id.hi = 0;
query_ctx.query_id.lo = query_id;
+ query_ctx.request_pool = "test-pool";
// CreateQueryState() enforces the invariant that 'query_id' must be unique.
- QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx, "test-pool");
+ QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx);
query_states_.push_back(qs);
- RETURN_IF_ERROR(qs->Prepare());
- FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(
- qs, TPlanFragmentCtx(), TPlanFragmentInstanceCtx(), TDescriptorTable()));
+ // make sure to initialize data structures unrelated to the TExecQueryFInstancesParams
+ // param
+ TExecQueryFInstancesParams rpc_params;
+ // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
+ rpc_params.__set_coord_state_idx(0);
+ rpc_params.__set_query_ctx(TQueryCtx());
+ rpc_params.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
+ rpc_params.__set_fragment_instance_ctxs(
+ vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
+ RETURN_IF_ERROR(qs->Init(rpc_params));
+ FragmentInstanceState* fis = qs->obj_pool()->Add(
+ new FragmentInstanceState(qs, qs->rpc_params().fragment_ctxs[0], qs->rpc_params().fragment_instance_ctxs[0]));
RuntimeState* rs = qs->obj_pool()->Add(
new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
runtime_states_.push_back(rs);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index 30e9309..f314452 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -22,11 +22,12 @@
#include "runtime/exec-env.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
#include "runtime/runtime-state.h"
namespace impala {
+class QueryState;
+
/// Helper testing class that creates an environment with runtime memory management
/// similar to the one used by the Impala runtime. Only one TestEnv can be active at a
/// time, because it modifies the global ExecEnv singleton.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index bb44145..a035c7d 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -220,14 +220,6 @@ const TPlanFragment& FInstanceExecParams::fragment() const {
return fragment_exec_params.fragment;
}
-int QuerySchedule::GetNumFragmentInstances() const {
- int result = 0;
- for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) {
- result += fragment_exec_params.instance_exec_params.size();
- }
- return result;
-}
-
const TPlanFragment* QuerySchedule::GetCoordFragment() const {
// Only have coordinator fragment for statements that return rows.
if (request_.stmt_type != TStmtType::QUERY) return nullptr;
@@ -262,4 +254,12 @@ vector<int> FragmentExecParams::GetInstanceIdxs() const {
return result;
}
+int QuerySchedule::GetNumFragmentInstances() const {
+ int total = 0;
+ for (const FragmentExecParams& p: fragment_exec_params_) {
+ total += p.instance_exec_params.size();
+ }
+ return total;
+}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 47848e8..8a985c7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -132,9 +132,6 @@ class QuerySchedule {
/// Helper methods used by scheduler to populate this QuerySchedule.
void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
- /// Returns the total number of fragment instances.
- int GetNumFragmentInstances() const;
-
/// Return the coordinator fragment, or nullptr if there isn't one.
const TPlanFragment* GetCoordFragment() const;
@@ -148,6 +145,9 @@ class QuerySchedule {
return plan_node_to_fragment_idx_[id];
}
+ /// Return the total number of instances across all fragments.
+ int GetNumFragmentInstances() const;
+
/// Returns next instance id. Instance ids are consecutive numbers generated from
/// the query id.
/// If the query contains a coordinator fragment instance, the generated instance
@@ -213,7 +213,7 @@ class QuerySchedule {
// (TPlanFragment.idx)
std::vector<FragmentExecParams> fragment_exec_params_;
- /// The set of hosts that the query will run on excluding the coordinator.
+ /// The set of hosts that the query will run on including the coordinator.
boost::unordered_set<TNetworkAddress> unique_hosts_;
/// Total number of scan ranges of this query.