You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/10/16 15:56:02 UTC
[6/7] incubator-impala git commit: IMPALA-2905: Handle coordinator
fragment lifecycle like all others
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index aba4a26..5269fe5 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -26,22 +26,23 @@
#include "common/logging.h"
#include "common/object-pool.h"
#include "exec/data-sink.h"
-#include "exec/exec-node.h"
#include "exec/exchange-node.h"
-#include "exec/scan-node.h"
-#include "exec/hdfs-scan-node.h"
+#include "exec/exec-node.h"
#include "exec/hbase-table-scanner.h"
+#include "exec/hdfs-scan-node.h"
+#include "exec/plan-root-sink.h"
+#include "exec/scan-node.h"
#include "exprs/expr.h"
-#include "runtime/descriptors.h"
#include "runtime/data-stream-mgr.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-filter-bank.h"
-#include "runtime/mem-tracker.h"
+#include "util/container-util.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
-#include "util/container-util.h"
-#include "util/parse-util.h"
#include "util/mem-info.h"
+#include "util/parse-util.h"
#include "util/periodic-counter-updater.h"
#include "util/pretty-printer.h"
@@ -60,28 +61,45 @@ namespace impala {
const string PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage";
-PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
- const ReportStatusCallback& report_status_cb) :
- exec_env_(exec_env), plan_(NULL), report_status_cb_(report_status_cb),
- report_thread_active_(false), done_(false), closed_(false),
- has_thread_token_(false), is_prepared_(false), is_cancelled_(false),
- average_thread_tokens_(NULL), mem_usage_sampled_counter_(NULL),
- thread_usage_sampled_counter_(NULL) {
-}
+PlanFragmentExecutor::PlanFragmentExecutor(
+ ExecEnv* exec_env, const ReportStatusCallback& report_status_cb)
+ : exec_env_(exec_env),
+ exec_tree_(NULL),
+ report_status_cb_(report_status_cb),
+ report_thread_active_(false),
+ closed_(false),
+ has_thread_token_(false),
+ is_prepared_(false),
+ is_cancelled_(false),
+ average_thread_tokens_(NULL),
+ mem_usage_sampled_counter_(NULL),
+ thread_usage_sampled_counter_(NULL) {}
PlanFragmentExecutor::~PlanFragmentExecutor() {
- Close();
+ DCHECK(!is_prepared_ || closed_);
// at this point, the report thread should have been stopped
DCHECK(!report_thread_active_);
}
Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
+ Status status = PrepareInternal(request);
+ prepared_promise_.Set(status);
+ return status;
+}
+
+Status PlanFragmentExecutor::WaitForOpen() {
+ DCHECK(prepared_promise_.IsSet()) << "Prepare() must complete before WaitForOpen()";
+ RETURN_IF_ERROR(prepared_promise_.Get());
+ return opened_promise_.Get();
+}
+
+Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& request) {
lock_guard<mutex> l(prepare_lock_);
DCHECK(!is_prepared_);
if (is_cancelled_) return Status::CANCELLED;
-
is_prepared_ = true;
+
// TODO: Break this method up.
fragment_sw_.Start();
const TPlanFragmentInstanceCtx& fragment_instance_ctx = request.fragment_instance_ctx;
@@ -100,6 +118,10 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
// total_time_counter() is in the runtime_state_ so start it up now.
SCOPED_TIMER(profile()->total_time_counter());
+ timings_profile_ =
+ obj_pool()->Add(new RuntimeProfile(obj_pool(), "PlanFragmentExecutor"));
+ profile()->AddChild(timings_profile_);
+ SCOPED_TIMER(ADD_TIMER(timings_profile_, "PrepareTime"));
// reservation or a query option.
int64_t bytes_limit = -1;
@@ -145,22 +167,22 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
// set up plan
DCHECK(request.__isset.fragment_ctx);
- RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_.get(),
- request.fragment_ctx.fragment.plan, *desc_tbl, &plan_));
- runtime_state_->set_fragment_root_id(plan_->id());
+ RETURN_IF_ERROR(ExecNode::CreateTree(
+ runtime_state_.get(), request.fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_));
+ runtime_state_->set_fragment_root_id(exec_tree_->id());
if (fragment_instance_ctx.__isset.debug_node_id) {
DCHECK(fragment_instance_ctx.__isset.debug_action);
DCHECK(fragment_instance_ctx.__isset.debug_phase);
ExecNode::SetDebugOptions(fragment_instance_ctx.debug_node_id,
- fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action, plan_);
+ fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action,
+ exec_tree_);
}
// set #senders of exchange nodes before calling Prepare()
vector<ExecNode*> exch_nodes;
- plan_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
- for (ExecNode* exch_node: exch_nodes)
- {
+ exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
+ for (ExecNode* exch_node : exch_nodes) {
DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
int num_senders = FindWithDefault(fragment_instance_ctx.per_exch_num_senders,
exch_node->id(), 0);
@@ -171,7 +193,7 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
// set scan ranges
vector<ExecNode*> scan_nodes;
vector<TScanRangeParams> no_scan_ranges;
- plan_->CollectScanNodes(&scan_nodes);
+ exec_tree_->CollectScanNodes(&scan_nodes);
for (int i = 0; i < scan_nodes.size(); ++i) {
ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
@@ -179,42 +201,47 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
scan_node->SetScanRanges(scan_ranges);
}
- RuntimeProfile::Counter* prepare_timer = ADD_TIMER(profile(), "PrepareTime");
+ RuntimeProfile::Counter* prepare_timer = ADD_TIMER(profile(), "ExecTreePrepareTime");
{
SCOPED_TIMER(prepare_timer);
- RETURN_IF_ERROR(plan_->Prepare(runtime_state_.get()));
+ RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_.get()));
}
PrintVolumeIds(fragment_instance_ctx.per_node_scan_ranges);
- // set up sink, if required
- if (request.fragment_ctx.fragment.__isset.output_sink) {
- RETURN_IF_ERROR(DataSink::CreateDataSink(
- obj_pool(), request.fragment_ctx.fragment.output_sink,
- request.fragment_ctx.fragment.output_exprs,
- fragment_instance_ctx, row_desc(), &sink_));
- sink_mem_tracker_.reset(new MemTracker(
- -1, sink_->GetName(), runtime_state_->instance_mem_tracker(), true));
- RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get()));
-
- RuntimeProfile* sink_profile = sink_->profile();
- if (sink_profile != NULL) {
- profile()->AddChild(sink_profile);
- }
- } else {
- sink_.reset(NULL);
+ DCHECK(request.fragment_ctx.fragment.__isset.output_sink);
+ RETURN_IF_ERROR(
+ DataSink::CreateDataSink(obj_pool(), request.fragment_ctx.fragment.output_sink,
+ request.fragment_ctx.fragment.output_exprs, fragment_instance_ctx,
+ exec_tree_->row_desc(), &sink_));
+ sink_mem_tracker_.reset(
+ new MemTracker(-1, sink_->GetName(), runtime_state_->instance_mem_tracker(), true));
+ RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get()));
+
+ RuntimeProfile* sink_profile = sink_->profile();
+ if (sink_profile != NULL) {
+ profile()->AddChild(sink_profile);
+ }
+
+ if (request.fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
+ root_sink_ = reinterpret_cast<PlanRootSink*>(sink_.get());
+ // Release the thread token on the root fragment instance. This fragment spends most
+ // of the time waiting and doing very little work. Holding on to the token causes
+ // underutilization of the machine. If there are 12 queries on this node, that's 12
+ // tokens reserved for no reason.
+ ReleaseThreadToken();
}
// set up profile counters
- profile()->AddChild(plan_->runtime_profile());
+ profile()->AddChild(exec_tree_->runtime_profile());
rows_produced_counter_ =
ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
per_host_mem_usage_ =
ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES);
- row_batch_.reset(new RowBatch(plan_->row_desc(), runtime_state_->batch_size(),
- runtime_state_->instance_mem_tracker()));
- VLOG(2) << "plan_root=\n" << plan_->DebugString();
+ row_batch_.reset(new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),
+ runtime_state_->instance_mem_tracker()));
+ VLOG(2) << "plan_root=\n" << exec_tree_->DebugString();
return Status::OK();
}
@@ -251,12 +278,21 @@ void PlanFragmentExecutor::PrintVolumeIds(
}
Status PlanFragmentExecutor::Open() {
- VLOG_QUERY << "Open(): instance_id="
- << runtime_state_->fragment_instance_id();
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(ADD_TIMER(timings_profile_, "OpenTime"));
+ VLOG_QUERY << "Open(): instance_id=" << runtime_state_->fragment_instance_id();
+ Status status = OpenInternal();
+ UpdateStatus(status);
+ opened_promise_.Set(status);
+ return status;
+}
- RETURN_IF_ERROR(runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
+Status PlanFragmentExecutor::OpenInternal() {
+ RETURN_IF_ERROR(
+ runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
- // we need to start the profile-reporting thread before calling Open(), since it
+ // we need to start the profile-reporting thread before calling exec_tree_->Open(),
+ // since it
// may block
if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
unique_lock<mutex> l(report_thread_lock_);
@@ -271,22 +307,25 @@ Status PlanFragmentExecutor::Open() {
OptimizeLlvmModule();
- Status status = OpenInternal();
- if (sink_.get() != NULL) {
- // We call Close() here rather than in OpenInternal() because we want to make sure
- // that Close() gets called even if there was an error in OpenInternal().
- // We also want to call sink_->Close() here rather than in PlanFragmentExecutor::Close
- // because we do not want the sink_ to hold on to all its resources as we will never
- // use it after this.
- sink_->Close(runtime_state());
- // If there's a sink and no error, OpenInternal() completed the fragment execution.
- if (status.ok()) {
- done_ = true;
- FragmentComplete();
- }
+ {
+ SCOPED_TIMER(ADD_TIMER(timings_profile_, "ExecTreeOpenTime"));
+ RETURN_IF_ERROR(exec_tree_->Open(runtime_state_.get()));
}
+ return sink_->Open(runtime_state_.get());
+}
- if (!status.ok() && !status.IsCancelled() && !status.IsMemLimitExceeded()) {
+Status PlanFragmentExecutor::Exec() {
+ SCOPED_TIMER(ADD_TIMER(timings_profile_, "ExecTime"));
+ {
+ lock_guard<mutex> l(status_lock_);
+ RETURN_IF_ERROR(status_);
+ }
+ Status status = ExecInternal();
+
+ // If there's no error, ExecInternal() completed the fragment instance's execution.
+ if (status.ok()) {
+ FragmentComplete();
+ } else if (!status.IsCancelled() && !status.IsMemLimitExceeded()) {
// Log error message in addition to returning in Status. Queries that do not
// fetch results (e.g. insert) may not receive the message directly and can
// only retrieve the log.
@@ -296,21 +335,23 @@ Status PlanFragmentExecutor::Open() {
return status;
}
-Status PlanFragmentExecutor::OpenInternal() {
- SCOPED_TIMER(profile()->total_time_counter());
- RETURN_IF_ERROR(plan_->Open(runtime_state_.get()));
- if (sink_.get() == NULL) return Status::OK();
-
- // If there is a sink, do all the work of driving it here, so that
- // when this returns the query has actually finished
- RETURN_IF_ERROR(sink_->Open(runtime_state_.get()));
- while (!done_) {
+Status PlanFragmentExecutor::ExecInternal() {
+ RuntimeProfile::Counter* plan_exec_timer =
+ ADD_TIMER(timings_profile_, "ExecTreeExecTime");
+ bool exec_tree_complete = false;
+ do {
+ Status status;
row_batch_->Reset();
- RETURN_IF_ERROR(plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_));
- if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::OpenInternal()");
+ {
+ SCOPED_TIMER(plan_exec_timer);
+ status = exec_tree_->GetNext(
+ runtime_state_.get(), row_batch_.get(), &exec_tree_complete);
+ }
+ if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::ExecInternal()");
COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
+ RETURN_IF_ERROR(status);
RETURN_IF_ERROR(sink_->Send(runtime_state(), row_batch_.get()));
- }
+ } while (!exec_tree_complete);
// Flush the sink *before* stopping the report thread. Flush may need to add some
// important information to the last report that gets sent. (e.g. table sinks record the
@@ -376,13 +417,20 @@ void PlanFragmentExecutor::SendReport(bool done) {
status = status_;
}
+ // If status is not OK, we need to make sure that only one sender sends a 'done'
+ // response.
+ // TODO: Clean all this up - move 'done' reporting to Close()?
+ if (!done && !status.ok()) {
+ done = completed_report_sent_.CompareAndSwap(0, 1);
+ }
+
// Update the counter for the peak per host mem usage.
per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.
- report_status_cb_(status, profile(), done || !status.ok());
+ report_status_cb_(status, profile(), done);
}
void PlanFragmentExecutor::StopReportThread() {
@@ -395,36 +443,6 @@ void PlanFragmentExecutor::StopReportThread() {
report_thread_->Join();
}
-Status PlanFragmentExecutor::GetNext(RowBatch** batch) {
- SCOPED_TIMER(profile()->total_time_counter());
- VLOG_FILE << "GetNext(): instance_id=" << runtime_state_->fragment_instance_id();
-
- Status status = Status::OK();
- row_batch_->Reset();
- // Loop until we've got a non-empty batch, hit an error or exhausted the input.
- while (!done_) {
- status = plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_);
- if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::GetNext()");
- if (!status.ok()) break;
- if (row_batch_->num_rows() > 0) break;
- row_batch_->Reset();
- }
- UpdateStatus(status);
- COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
-
- if (done_) {
- VLOG_QUERY << "Finished executing fragment query_id=" << PrintId(query_id_)
- << " instance_id=" << PrintId(runtime_state_->fragment_instance_id());
- FragmentComplete();
- // Once all rows are returned, signal that we're done with an empty batch.
- *batch = row_batch_->num_rows() == 0 ? NULL : row_batch_.get();
- return status;
- }
-
- *batch = row_batch_.get();
- return status;
-}
-
void PlanFragmentExecutor::FragmentComplete() {
// Check the atomic flag. If it is set, then a fragment complete report has already
// been sent.
@@ -463,7 +481,7 @@ void PlanFragmentExecutor::UpdateStatus(const Status& status) {
}
void PlanFragmentExecutor::Cancel() {
- VLOG_QUERY << "Cancelling plan fragment...";
+ VLOG_QUERY << "Cancelling fragment instance...";
lock_guard<mutex> l(prepare_lock_);
is_cancelled_ = true;
if (!is_prepared_) {
@@ -476,18 +494,10 @@ void PlanFragmentExecutor::Cancel() {
runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
}
-const RowDescriptor& PlanFragmentExecutor::row_desc() {
- return plan_->row_desc();
-}
-
RuntimeProfile* PlanFragmentExecutor::profile() {
return runtime_state_->runtime_profile();
}
-bool PlanFragmentExecutor::ReachedLimit() {
- return plan_->ReachedLimit();
-}
-
void PlanFragmentExecutor::ReleaseThreadToken() {
if (has_thread_token_) {
has_thread_token_ = false;
@@ -500,19 +510,23 @@ void PlanFragmentExecutor::ReleaseThreadToken() {
void PlanFragmentExecutor::Close() {
if (closed_) return;
+ if (!is_prepared_) return;
+ if (sink_.get() != nullptr) sink_->Close(runtime_state());
+
row_batch_.reset();
if (sink_mem_tracker_ != NULL) {
sink_mem_tracker_->UnregisterFromParent();
sink_mem_tracker_.reset();
}
- // Prepare may not have been called, which sets runtime_state_
- if (runtime_state_.get() != NULL) {
- if (plan_ != NULL) plan_->Close(runtime_state_.get());
- runtime_state_->UnregisterReaderContexts();
- exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
- runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get());
- runtime_state_->filter_bank()->Close();
- }
+
+ // Prepare should always have been called, and so runtime_state_ should be set
+ DCHECK(prepared_promise_.IsSet());
+ if (exec_tree_ != NULL) exec_tree_->Close(runtime_state_.get());
+ runtime_state_->UnregisterReaderContexts();
+ exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
+ runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get());
+ runtime_state_->filter_bank()->Close();
+
if (mem_usage_sampled_counter_ != NULL) {
PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
mem_usage_sampled_counter_ = NULL;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index f4355ea..82d3001 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -23,9 +23,10 @@
#include <boost/scoped_ptr.hpp>
#include <boost/function.hpp>
-#include "common/status.h"
#include "common/object-pool.h"
+#include "common/status.h"
#include "runtime/runtime-state.h"
+#include "util/promise.h"
#include "util/runtime-profile-counters.h"
#include "util/thread.h"
@@ -33,6 +34,7 @@ namespace impala {
class HdfsFsCache;
class ExecNode;
+class PlanRootSink;
class RowDescriptor;
class RowBatch;
class DataSink;
@@ -45,21 +47,28 @@ class TPlanFragment;
class TPlanExecParams;
/// PlanFragmentExecutor handles all aspects of the execution of a single plan fragment,
-/// including setup and tear-down, both in the success and error case.
-/// Tear-down frees all memory allocated for this plan fragment and closes all data
-/// streams; it happens automatically in the d'tor.
-//
-/// The executor makes an aggregated profile for the entire fragment available,
-/// which includes profile information for the plan itself as well as the output
-/// sink, if any.
+/// including setup and tear-down, both in the success and error case. Tear-down, which
+/// happens in Close(), frees all memory allocated for this plan fragment and closes all
+/// data streams.
+///
+/// The lifecycle of a PlanFragmentExecutor is as follows:
+/// if (Prepare().ok()) {
+/// Open()
+/// Exec()
+/// }
+/// Close()
+///
+/// The executor makes an aggregated profile for the entire fragment available, which
+/// includes profile information for the plan itself as well as the output sink.
+///
/// The ReportStatusCallback passed into the c'tor is invoked periodically to report the
/// execution status. The frequency of those reports is controlled by the flag
/// status_report_interval; setting that flag to 0 disables periodic reporting altogether
-/// Regardless of the value of that flag, if a report callback is specified, it is
-/// invoked at least once at the end of execution with an overall status and profile
-/// (and 'done' indicator). The only exception is when execution is cancelled, in which
-/// case the callback is *not* invoked (the coordinator already knows that execution
-/// stopped, because it initiated the cancellation).
+/// Regardless of the value of that flag, if a report callback is specified, it is invoked
+/// at least once at the end of execution with an overall status and profile (and 'done'
+/// indicator). The only exception is when execution is cancelled, in which case the
+/// callback is *not* invoked (the coordinator already knows that execution stopped,
+/// because it initiated the cancellation).
//
/// Aside from Cancel(), which may be called asynchronously, this class is not
/// thread-safe.
@@ -76,49 +85,37 @@ class PlanFragmentExecutor {
ReportStatusCallback;
/// report_status_cb, if !empty(), is used to report the accumulated profile
- /// information periodically during execution (Open() or GetNext()).
+ /// information periodically during execution.
PlanFragmentExecutor(ExecEnv* exec_env, const ReportStatusCallback& report_status_cb);
- /// Closes the underlying plan fragment and frees up all resources allocated
- /// in Open()/GetNext().
- /// It is an error to delete a PlanFragmentExecutor with a report callback
- /// before Open()/GetNext() (depending on whether the fragment has a sink)
- /// indicated that execution is finished.
+ /// It is an error to delete a PlanFragmentExecutor with a report callback before Exec()
+ /// indicated that execution is finished, or to delete one that has not been Close()'d
+ /// if Prepare() has been called.
~PlanFragmentExecutor();
/// Prepare for execution. Call this prior to Open().
///
- /// runtime_state() and row_desc() will not be valid until Prepare() is
- /// called. runtime_state() will always be valid after Prepare() returns, unless the
- /// query was cancelled before Prepare() was called. If request.query_options.mem_limit
- /// > 0, it is used as an approximate limit on the number of bytes this query can
- /// consume at runtime. The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over
- /// that limit.
+ /// runtime_state() will not be valid until Prepare() is called. runtime_state() will
+ /// always be valid after Prepare() returns, unless the query was cancelled before
+ /// Prepare() was called. If request.query_options.mem_limit > 0, it is used as an
+ /// approximate limit on the number of bytes this query can consume at runtime. The
+ /// query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
///
/// If Cancel() is called before Prepare(), Prepare() is a no-op and returns
/// Status::CANCELLED;
Status Prepare(const TExecPlanFragmentParams& request);
- /// Start execution. Call this prior to GetNext().
- /// If this fragment has a sink, Open() will send all rows produced
- /// by the fragment to that sink. Therefore, Open() may block until
- /// all rows are produced (and a subsequent call to GetNext() will not return
- /// any rows).
- /// This also starts the status-reporting thread, if the interval flag
- /// is > 0 and a callback was specified in the c'tor.
- /// If this fragment has a sink, report_status_cb will have been called for the final
- /// time when Open() returns, and the status-reporting thread will have been stopped.
+ /// Opens the fragment plan and sink. Starts the profile reporting thread, if required.
Status Open();
- /// Return results through 'batch'. Sets '*batch' to NULL if no more results.
- /// '*batch' is owned by PlanFragmentExecutor and must not be deleted.
- /// When *batch == NULL, GetNext() should not be called anymore. Also, report_status_cb
- /// will have been called for the final time and the status-reporting thread
- /// will have been stopped.
- Status GetNext(RowBatch** batch);
+ /// Executes the fragment by repeatedly driving the sink with batches produced by the
+ /// exec node tree. report_status_cb will have been called for the final time when
+ /// Exec() returns, and the status-reporting thread will have been stopped.
+ Status Exec();
- /// Closes the underlying plan fragment and frees up all resources allocated
- /// in Open()/GetNext().
+ /// Closes the underlying plan fragment and frees up all resources allocated in
+ /// Prepare() and Open(). Must be called if Prepare() has been called - no matter
+ /// whether or not Prepare() succeeded.
void Close();
/// Initiate cancellation. If called concurrently with Prepare(), will wait for
@@ -131,25 +128,30 @@ class PlanFragmentExecutor {
/// It is legal to call Cancel() if Prepare() returned an error.
void Cancel();
- /// Returns true if this query has a limit and it has been reached.
- bool ReachedLimit();
-
- /// Releases the thread token for this fragment executor.
- void ReleaseThreadToken();
-
/// call these only after Prepare()
RuntimeState* runtime_state() { return runtime_state_.get(); }
- const RowDescriptor& row_desc();
/// Profile information for plan and output sink.
RuntimeProfile* profile();
+ /// Blocks until Prepare() is completed.
+ Status WaitForPrepare() { return prepared_promise_.Get(); }
+
+ /// Blocks until exec tree and sink are both opened. It is an error to call this before
+ /// Prepare() has completed. If Prepare() returned an error, WaitForOpen() will
+ /// return that error without blocking.
+ Status WaitForOpen();
+
+ /// Returns fragment instance's sink if this is the root fragment instance. Valid after
+ /// Prepare() returns; if Prepare() fails may be nullptr.
+ PlanRootSink* root_sink() { return root_sink_; }
+
/// Name of the counter that is tracking per query, per host peak mem usage.
static const std::string PER_HOST_PEAK_MEM_COUNTER;
private:
ExecEnv* exec_env_; // not owned
- ExecNode* plan_; // lives in runtime_state_->obj_pool()
+ ExecNode* exec_tree_; // lives in runtime_state_->obj_pool()
TUniqueId query_id_;
/// profile reporting-related
@@ -166,9 +168,6 @@ class PlanFragmentExecutor {
boost::condition_variable report_thread_started_cv_;
bool report_thread_active_; // true if we started the thread
- /// true if plan_->GetNext() indicated that it's done
- bool done_;
-
/// true if Close() has been called
bool closed_;
@@ -190,14 +189,20 @@ class PlanFragmentExecutor {
/// (e.g. mem_trackers_) from 'runtime_state_' to 'sink_' need to be severed prior to
/// the dtor of 'runtime_state_'.
boost::scoped_ptr<RuntimeState> runtime_state_;
- /// Output sink for rows sent to this fragment. May not be set, in which case rows are
- /// returned via GetNext's row batch
- /// Created in Prepare (if required), owned by this object.
+
+ /// Profile for timings for each stage of the plan fragment instance's lifecycle.
+ RuntimeProfile* timings_profile_;
+
+ /// Output sink for rows sent to this fragment. Created in Prepare(), owned by this
+ /// object.
boost::scoped_ptr<DataSink> sink_;
boost::scoped_ptr<MemTracker> sink_mem_tracker_;
+ /// Set if this fragment instance is the root of the entire plan, so that a consumer can
+ /// pull results by calling root_sink_->GetNext(). Same object as sink_.
+ PlanRootSink* root_sink_ = nullptr;
+
boost::scoped_ptr<RowBatch> row_batch_;
- boost::scoped_ptr<TRowBatch> thrift_batch_;
/// Protects is_prepared_ and is_cancelled_, and is also used to coordinate between
/// Prepare() and Cancel() to ensure mutual exclusion.
@@ -207,6 +212,12 @@ class PlanFragmentExecutor {
/// error. If Cancel() was called before Prepare(), is_prepared_ will not be set.
bool is_prepared_;
+ /// Set when Prepare() returns.
+ Promise<Status> prepared_promise_;
+
+ /// Set when OpenInternal() returns.
+ Promise<Status> opened_promise_;
+
/// True if and only if Cancel() has been called.
bool is_cancelled_;
@@ -267,21 +278,25 @@ class PlanFragmentExecutor {
void FragmentComplete();
/// Optimizes the code-generated functions in runtime_state_->llvm_codegen().
- /// Must be called between plan_->Prepare() and plan_->Open().
- /// This is somewhat time consuming so we don't want it to do it in
- /// PlanFragmentExecutor()::Prepare() to allow starting plan fragments more
- /// quickly and in parallel (in a deep plan tree, the fragments are started
- /// in level order).
+ /// Must be called after exec_tree_->Prepare() and before exec_tree_->Open().
void OptimizeLlvmModule();
/// Executes Open() logic and returns resulting status. Does not set status_.
- /// If this plan fragment has no sink, OpenInternal() does nothing.
- /// If this plan fragment has a sink and OpenInternal() returns without an
- /// error condition, all rows will have been sent to the sink, the sink will
- /// have been closed, a final report will have been sent and the report thread will
- /// have been stopped. sink_ will be set to NULL after successful execution.
Status OpenInternal();
+ /// Pulls row batches from fragment instance and pushes them to sink_ in a loop. Returns
+ /// OK if the input was exhausted and sent to the sink successfully, an error otherwise.
+ /// If ExecInternal() returns without an error condition, all rows will have been sent
+ /// to the sink, the sink will have been closed, a final report will have been sent and
+ /// the report thread will have been stopped.
+ Status ExecInternal();
+
+ /// Performs all the logic of Prepare() and returns resulting status.
+ Status PrepareInternal(const TExecPlanFragmentParams& request);
+
+ /// Releases the thread token for this fragment executor.
+ void ReleaseThreadToken();
+
/// Stops report thread, if one is running. Blocks until report thread terminates.
/// Idempotent.
void StopReportThread();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 5ad84df..1eb36e3 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -198,36 +198,28 @@ const TPlanFragment& FInstanceExecParams::fragment() const {
}
int QuerySchedule::GetNumFragmentInstances() const {
- if (mt_fragment_exec_params_.empty()) return num_fragment_instances_;
int result = 0;
- for (const MtFragmentExecParams& fragment_exec_params: mt_fragment_exec_params_) {
- result += fragment_exec_params.instance_exec_params.size();
+ if (mt_fragment_exec_params_.empty()) {
+ DCHECK(!fragment_exec_params_.empty());
+ for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) {
+ result += fragment_exec_params.hosts.size();
+ }
+ } else {
+ for (const MtFragmentExecParams& fragment_exec_params : mt_fragment_exec_params_) {
+ result += fragment_exec_params.instance_exec_params.size();
+ }
}
return result;
}
-int QuerySchedule::GetNumRemoteFInstances() const {
- bool has_coordinator_fragment = GetCoordFragment() != nullptr;
- int result = GetNumFragmentInstances();
- bool is_mt_execution = request_.query_ctx.request.query_options.mt_dop > 0;
- if (is_mt_execution && has_coordinator_fragment) --result;
- return result;
-}
-
-int QuerySchedule::GetTotalFInstances() const {
- int result = GetNumRemoteFInstances();
- return GetCoordFragment() != nullptr ? result + 1 : result;
-}
-
const TPlanFragment* QuerySchedule::GetCoordFragment() const {
+ // Only have coordinator fragment for statements that return rows.
+ if (request_.stmt_type != TStmtType::QUERY) return nullptr;
bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
const TPlanFragment* fragment = is_mt_exec
? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0];
- if (fragment->partition.type == TPartitionType::UNPARTITIONED) {
+
return fragment;
- } else {
- return nullptr;
- }
}
void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 39ce268..77c9cd6 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -140,34 +140,9 @@ class QuerySchedule {
/// Helper methods used by scheduler to populate this QuerySchedule.
void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
- /// The following 4 functions need to be replaced once we stop special-casing
- /// the coordinator instance in the coordinator.
- /// The replacement is a single function int GetNumFInstances() (which includes
- /// the coordinator instance).
-
- /// TODO-MT: remove; this is actually only the number of remote instances
- /// (from the coordinator's perspective)
- void set_num_fragment_instances(int64_t num_fragment_instances) {
- num_fragment_instances_ = num_fragment_instances;
- }
-
- /// Returns the number of fragment instances registered with this schedule.
- /// MT: total number of fragment instances
- /// ST: value set with set_num_fragment_instances(); excludes coord instance
- /// (in effect the number of remote instances)
- /// TODO-MT: get rid of special-casing of coordinator instance and always return the
- /// total
+ /// Returns the total number of fragment instances.
int GetNumFragmentInstances() const;
- /// Returns the total number of fragment instances, incl. coordinator fragment.
- /// TODO-MT: remove
- int GetTotalFInstances() const;
-
- /// Returns the number of remote fragment instances (excludes coordinator).
- /// Works for both MT and ST.
- /// TODO-MT: remove
- int GetNumRemoteFInstances() const;
-
/// Return the coordinator fragment, or nullptr if there isn't one.
const TPlanFragment* GetCoordFragment() const;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 5b6303e..9b52d5a 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -663,11 +663,6 @@ void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_re
CreateInstanceId(schedule->query_id(), num_fragment_instances));
}
}
- if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) {
- // the root fragment is executed directly by the coordinator
- --num_fragment_instances;
- }
- schedule->set_num_fragment_instances(num_fragment_instances);
// compute destinations and # senders per exchange node
// (the root fragment doesn't have a destination)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index 76e11d1..cc56c19 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -54,8 +54,10 @@ Status FragmentMgr::FragmentExecState::Prepare() {
}
void FragmentMgr::FragmentExecState::Exec() {
- // Open() does the full execution, because all plan fragments have sinks
- executor_.Open();
+ if (Prepare().ok()) {
+ executor_.Open();
+ executor_.Exec();
+ }
executor_.Close();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/fragment-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.h b/be/src/service/fragment-exec-state.h
index 6cff7ce..c795cd8 100644
--- a/be/src/service/fragment-exec-state.h
+++ b/be/src/service/fragment-exec-state.h
@@ -47,9 +47,6 @@ class FragmentMgr::FragmentExecState {
/// the fragment and returns OK.
Status Cancel();
- /// Call Prepare() and create and initialize data sink.
- Status Prepare();
-
/// Main loop of plan fragment execution. Blocks until execution finishes.
void Exec();
@@ -67,6 +64,8 @@ class FragmentMgr::FragmentExecState {
/// Publishes filter with ID 'filter_id' to this fragment's filter bank.
void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter);
+ PlanFragmentExecutor* executor() { return &executor_; }
+
private:
TQueryCtx query_ctx_;
TPlanFragmentInstanceCtx fragment_instance_ctx_;
@@ -98,6 +97,9 @@ class FragmentMgr::FragmentExecState {
/// the reporting RPC. `profile` may be NULL if a runtime profile has not been created
/// for this fragment (e.g. when the fragment has failed during preparation).
void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done);
+
+ /// Call Prepare() and create and initialize data sink.
+ Status Prepare();
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
index 64e9a78..8e8fc05 100644
--- a/be/src/service/fragment-mgr.cc
+++ b/be/src/service/fragment-mgr.cc
@@ -54,9 +54,6 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params)
return process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
}
- // Remote fragments must always have a sink. Remove when IMPALA-2905 is resolved.
- DCHECK(exec_params.fragment_ctx.fragment.__isset.output_sink);
-
shared_ptr<FragmentExecState> exec_state(
new FragmentExecState(exec_params, ExecEnv::GetInstance()));
@@ -64,6 +61,8 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params)
// only happen after this RPC returns) can always find this fragment.
{
lock_guard<SpinLock> l(fragment_exec_state_map_lock_);
+ DCHECK(fragment_exec_state_map_.find(exec_state->fragment_instance_id())
+ == fragment_exec_state_map_.end());
fragment_exec_state_map_.insert(
make_pair(exec_state->fragment_instance_id(), exec_state));
}
@@ -84,8 +83,7 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params)
void FragmentMgr::FragmentThread(TUniqueId fragment_instance_id) {
shared_ptr<FragmentExecState> exec_state = GetFragmentExecState(fragment_instance_id);
if (exec_state.get() == NULL) return;
- Status status = exec_state->Prepare();
- if (status.ok()) exec_state->Exec();
+ exec_state->Exec();
// We're done with this plan fragment
{
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 3daa36b..ee7f958 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -17,55 +17,19 @@
#include "service/impala-server.h"
-#include <algorithm>
#include <boost/algorithm/string/join.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
-#include <boost/unordered_set.hpp>
-#include <jni.h>
-#include <thrift/protocol/TDebugProtocol.h>
-#include <gtest/gtest.h>
-#include <boost/bind.hpp>
-#include <boost/algorithm/string.hpp>
-#include <gperftools/heap-profiler.h>
-#include <gperftools/malloc_extension.h>
-
-#include "codegen/llvm-codegen.h"
+
#include "common/logging.h"
-#include "common/version.h"
-#include "exec/exec-node.h"
-#include "exec/hdfs-table-sink.h"
-#include "exec/scan-node.h"
-#include "exprs/expr.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/client-cache.h"
-#include "runtime/descriptors.h"
-#include "runtime/data-stream-sender.h"
-#include "runtime/row-batch.h"
-#include "runtime/plan-fragment-executor.h"
-#include "runtime/hdfs-fs-cache.h"
+#include "gen-cpp/Frontend_types.h"
+#include "rpc/thrift-util.h"
#include "runtime/exec-env.h"
-#include "runtime/mem-tracker.h"
#include "runtime/raw-value.inline.h"
#include "runtime/timestamp-value.h"
-#include "scheduling/simple-scheduler.h"
#include "service/query-exec-state.h"
#include "service/query-options.h"
-#include "util/container-util.h"
-#include "util/debug-util.h"
+#include "service/query-result-set.h"
#include "util/impalad-metrics.h"
-#include "util/string-parser.h"
-#include "rpc/thrift-util.h"
-#include "rpc/thrift-server.h"
-#include "util/jni-util.h"
#include "util/webserver.h"
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ImpalaService.h"
-#include "gen-cpp/DataSinks_types.h"
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ImpalaService.h"
-#include "gen-cpp/ImpalaService_types.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/Frontend_types.h"
#include "common/names.h"
@@ -83,11 +47,17 @@ using namespace beeswax;
} \
} while (false)
+namespace {
+
+/// Ascii output precision for double/float
+constexpr int ASCII_PRECISION = 16;
+}
+
namespace impala {
// Ascii result set for Beeswax.
// Beeswax returns rows in ascii, using "\t" as column delimiter.
-class ImpalaServer::AsciiQueryResultSet : public ImpalaServer::QueryResultSet {
+class AsciiQueryResultSet : public QueryResultSet {
public:
// Rows are added into rowset.
AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index ee79b4b..de0e2f3 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -36,13 +36,14 @@
#include "exprs/expr.h"
#include "rpc/thrift-util.h"
#include "runtime/raw-value.h"
+#include "service/hs2-util.h"
#include "service/query-exec-state.h"
#include "service/query-options.h"
+#include "service/query-result-set.h"
#include "util/debug-util.h"
-#include "util/runtime-profile-counters.h"
#include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
#include "util/string-parser.h"
-#include "service/hs2-util.h"
#include "common/names.h"
@@ -129,7 +130,7 @@ static TOperationState::type QueryStateToTOperationState(
// Result set container for Hive protocol versions >= V6, where results are returned in
// column-orientation.
-class ImpalaServer::HS2ColumnarResultSet : public ImpalaServer::QueryResultSet {
+class HS2ColumnarResultSet : public QueryResultSet {
public:
HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
: metadata_(metadata), result_set_(rowset), num_rows_(0) {
@@ -317,7 +318,7 @@ class ImpalaServer::HS2ColumnarResultSet : public ImpalaServer::QueryResultSet {
};
// TRow result set for HiveServer2
-class ImpalaServer::HS2RowOrientedResultSet : public ImpalaServer::QueryResultSet {
+class HS2RowOrientedResultSet : public QueryResultSet {
public:
// Rows are added into rowset.
HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
@@ -393,16 +394,6 @@ class ImpalaServer::HS2RowOrientedResultSet : public ImpalaServer::QueryResultSe
scoped_ptr<TRowSet> owned_result_set_;
};
-ImpalaServer::QueryResultSet* ImpalaServer::CreateHS2ResultSet(
- TProtocolVersion::type version, const TResultSetMetadata& metadata,
- TRowSet* rowset) {
- if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
- return new HS2RowOrientedResultSet(metadata, rowset);
- } else {
- return new HS2ColumnarResultSet(metadata, rowset);
- }
-}
-
void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) {
TUniqueId session_id;
@@ -482,6 +473,18 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
}
+namespace {
+
+QueryResultSet* CreateHS2ResultSet(
+ TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) {
+ if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
+ return new HS2RowOrientedResultSet(metadata, rowset);
+ } else {
+ return new HS2ColumnarResultSet(metadata, rowset);
+ }
+}
+}
+
Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size,
bool fetch_first, TFetchResultsResp* fetch_results) {
shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
@@ -759,8 +762,9 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
// Optionally enable result caching on the QueryExecState.
if (cache_num_rows > 0) {
- status = exec_state->SetResultCache(CreateHS2ResultSet(session->hs2_version,
- *exec_state->result_metadata()), cache_num_rows);
+ status = exec_state->SetResultCache(
+ CreateHS2ResultSet(session->hs2_version, *exec_state->result_metadata(), nullptr),
+ cache_num_rows);
if (!status.ok()) {
UnregisterQuery(exec_state->query_id(), false, &status);
HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index a238f65..af54c35 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -18,8 +18,6 @@
#ifndef IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
#define IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
-#include <boost/shared_ptr.hpp>
-
#include "gen-cpp/ImpalaInternalService.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "service/impala-server.h"
@@ -32,9 +30,12 @@ namespace impala {
/// ImpalaInternalService service.
class ImpalaInternalService : public ImpalaInternalServiceIf {
public:
- ImpalaInternalService(const boost::shared_ptr<ImpalaServer>& impala_server,
- const boost::shared_ptr<FragmentMgr>& fragment_mgr)
- : impala_server_(impala_server), fragment_mgr_(fragment_mgr) { }
+ ImpalaInternalService() {
+ impala_server_ = ExecEnv::GetInstance()->impala_server();
+ DCHECK(impala_server_ != nullptr);
+ fragment_mgr_ = ExecEnv::GetInstance()->fragment_mgr();
+ DCHECK(fragment_mgr_ != nullptr);
+ }
virtual void ExecPlanFragment(TExecPlanFragmentResult& return_val,
const TExecPlanFragmentParams& params) {
@@ -74,10 +75,10 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
private:
/// Manages fragment reporting and data transmission
- boost::shared_ptr<ImpalaServer> impala_server_;
+ ImpalaServer* impala_server_;
/// Manages fragment execution
- boost::shared_ptr<FragmentMgr> fragment_mgr_;
+ FragmentMgr* fragment_mgr_;
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 7f9d862..bf83eec 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -206,7 +206,6 @@ const string HS2_SERVER_NAME = "hiveserver2-frontend";
const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
-const int ImpalaServer::ASCII_PRECISION = 16; // print 16 digits for double/float
const int MAX_NM_MISSED_HEARTBEATS = 5;
@@ -1866,9 +1865,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
}
if (be_port != 0 && be_server != NULL) {
- boost::shared_ptr<FragmentMgr> fragment_mgr(new FragmentMgr());
- boost::shared_ptr<ImpalaInternalService> thrift_if(
- new ImpalaInternalService(handler, fragment_mgr));
+ boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService());
boost::shared_ptr<TProcessor> be_processor(
new ImpalaInternalServiceProcessor(thrift_if));
boost::shared_ptr<TProcessorEventHandler> event_handler(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 2104c5e..53f3384 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -249,45 +249,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
boost::scoped_ptr<ImpalaHttpHandler> http_handler_;
- /// Query result set stores converted rows returned by QueryExecState.fetchRows(). It
- /// provides an interface to convert Impala rows to external API rows.
- /// It is an abstract class. Subclass must implement AddOneRow().
- class QueryResultSet {
- public:
- QueryResultSet() {}
- virtual ~QueryResultSet() {}
-
- /// Add the row (list of expr value) from a select query to this result set. When a row
- /// comes from a select query, the row is in the form of expr values (void*). 'scales'
- /// contains the values' scales (# of digits after decimal), with -1 indicating no
- /// scale specified.
- virtual Status AddOneRow(
- const std::vector<void*>& row, const std::vector<int>& scales) = 0;
-
- /// Add the TResultRow to this result set. When a row comes from a DDL/metadata
- /// operation, the row in the form of TResultRow.
- virtual Status AddOneRow(const TResultRow& row) = 0;
-
- /// Copies rows in the range [start_idx, start_idx + num_rows) from the other result
- /// set into this result set. Returns the number of rows added to this result set.
- /// Returns 0 if the given range is out of bounds of the other result set.
- virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) = 0;
-
- /// Returns the approximate size of this result set in bytes.
- int64_t ByteSize() { return ByteSize(0, size()); }
-
- /// Returns the approximate size of the given range of rows in bytes.
- virtual int64_t ByteSize(int start_idx, int num_rows) = 0;
-
- /// Returns the size of this result set in number of rows.
- virtual size_t size() = 0;
- };
-
- /// Result set implementations for Beeswax and HS2
- class AsciiQueryResultSet;
- class HS2RowOrientedResultSet;
- class HS2ColumnarResultSet;
-
struct SessionState;
/// Execution state of a query.
@@ -299,14 +260,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
static const char* SQLSTATE_GENERAL_ERROR;
static const char* SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED;
- /// Ascii output precision for double/float
- static const int ASCII_PRECISION;
-
- QueryResultSet* CreateHS2ResultSet(
- apache::hive::service::cli::thrift::TProtocolVersion::type version,
- const TResultSetMetadata& metadata,
- apache::hive::service::cli::thrift::TRowSet* rowset = NULL);
-
/// Return exec state for given query_id, or NULL if not found.
/// If 'lock' is true, the returned exec state's lock() will be acquired before
/// the query_exec_state_map_lock_ is released.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index d55ac54..1532ecf 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -19,14 +19,15 @@
#include <limits>
#include <gutil/strings/substitute.h>
-#include "exprs/expr.h"
#include "exprs/expr-context.h"
+#include "exprs/expr.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
-#include "service/impala-server.h"
#include "service/frontend.h"
+#include "service/impala-server.h"
#include "service/query-options.h"
+#include "service/query-result-set.h"
#include "util/debug-util.h"
#include "util/impalad-metrics.h"
#include "util/runtime-profile-counters.h"
@@ -191,6 +192,7 @@ Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) {
exec_request_.set_query_option_request.value,
&session_->default_query_options,
&session_->set_query_options_mask));
+ SetResultSet({}, {});
} else {
// "SET" returns a table of all query options.
map<string, string> config;
@@ -421,17 +423,10 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
}
- // If desc_tbl is not set, query has SELECT with no FROM. In that
- // case, the query can only have a single fragment, and that fragment needs to be
- // executed by the coordinator. This check confirms that.
- // If desc_tbl is set, the query may or may not have a coordinator fragment.
bool is_mt_exec = query_exec_request.query_ctx.request.query_options.mt_dop > 0;
const TPlanFragment& fragment = is_mt_exec
? query_exec_request.mt_plan_exec_info[0].fragments[0]
: query_exec_request.fragments[0];
- bool has_coordinator_fragment =
- fragment.partition.type == TPartitionType::UNPARTITIONED;
- DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
{
lock_guard<mutex> l(lock_);
@@ -449,7 +444,7 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
}
coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_));
- status = coord_->Exec(&output_expr_ctxs_);
+ status = coord_->Exec();
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(UpdateQueryStatus(status));
@@ -538,12 +533,11 @@ void ImpalaServer::QueryExecState::Done() {
query_events_->MarkEvent("Unregister query");
if (coord_.get() != NULL) {
- Expr::Close(output_expr_ctxs_, coord_->runtime_state());
// Release any reserved resources.
Status status = exec_env_->scheduler()->Release(schedule_.get());
if (!status.ok()) {
LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
- << " because of error: " << status.GetDetail();
+ << " because of error: " << status.GetDetail();
}
coord_->TearDown();
}
@@ -626,7 +620,6 @@ Status ImpalaServer::QueryExecState::WaitInternal() {
if (coord_.get() != NULL) {
RETURN_IF_ERROR(coord_->Wait());
- RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, coord_->runtime_state()));
RETURN_IF_ERROR(UpdateCatalog());
}
@@ -719,6 +712,10 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
return Status::OK();
}
+ if (coord_.get() == nullptr) {
+ return Status("Client tried to fetch rows on a query that produces no results.");
+ }
+
int32_t num_rows_fetched_from_cache = 0;
if (result_cache_max_size_ > 0 && result_cache_ != NULL) {
// Satisfy the fetch from the result cache if possible.
@@ -729,27 +726,7 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
if (num_rows_fetched_from_cache >= max_rows) return Status::OK();
}
- // List of expr values to hold evaluated rows from the query
- vector<void*> result_row;
- result_row.resize(output_expr_ctxs_.size());
-
- // List of scales for floating point values in result_row
- vector<int> scales;
- scales.resize(result_row.size());
-
- if (coord_ == NULL) {
- // Query with LIMIT 0.
- query_state_ = QueryState::FINISHED;
- eos_ = true;
- return Status::OK();
- }
-
query_state_ = QueryState::FINISHED; // results will be ready after this call
- // Fetch the next batch if we've returned the current batch entirely
- if (current_batch_ == NULL || current_batch_row_ >= current_batch_->num_rows()) {
- RETURN_IF_ERROR(FetchNextBatch());
- }
- if (current_batch_ == NULL) return Status::OK();
// Maximum number of rows to be fetched from the coord.
int32_t max_coord_rows = max_rows;
@@ -759,22 +736,26 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
}
{
SCOPED_TIMER(row_materialization_timer_);
- // Convert the available rows, limited by max_coord_rows
- int available = current_batch_->num_rows() - current_batch_row_;
- int fetched_count = available;
- // max_coord_rows <= 0 means no limit
- if (max_coord_rows > 0 && max_coord_rows < available) fetched_count = max_coord_rows;
- for (int i = 0; i < fetched_count; ++i) {
- TupleRow* row = current_batch_->GetRow(current_batch_row_);
- RETURN_IF_ERROR(GetRowValue(row, &result_row, &scales));
- RETURN_IF_ERROR(fetched_rows->AddOneRow(result_row, scales));
- ++num_rows_fetched_;
- ++current_batch_row_;
+ size_t before = fetched_rows->size();
+ // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_
+ // (already held) ensures that we do not call coord_->GetNext() multiple times
+ // concurrently.
+ // TODO: Simplify this.
+ lock_.unlock();
+ Status status = coord_->GetNext(fetched_rows, max_coord_rows, &eos_);
+ lock_.lock();
+ int num_fetched = fetched_rows->size() - before;
+ DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute(
+ "Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows);
+ num_rows_fetched_ += num_fetched;
+
+ RETURN_IF_ERROR(status);
+ // Check if query status has changed during GetNext() call
+ if (!query_status_.ok()) {
+ eos_ = true;
+ return query_status_;
}
}
- ExprContext::FreeLocalAllocations(output_expr_ctxs_);
- // Check if there was an error evaluating a row value.
- RETURN_IF_ERROR(coord_->runtime_state()->CheckQueryState());
// Update the result cache if necessary.
if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) {
@@ -833,16 +814,6 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
return Status::OK();
}
-Status ImpalaServer::QueryExecState::GetRowValue(TupleRow* row, vector<void*>* result,
- vector<int>* scales) {
- DCHECK(result->size() >= output_expr_ctxs_.size());
- for (int i = 0; i < output_expr_ctxs_.size(); ++i) {
- (*result)[i] = output_expr_ctxs_[i]->GetValue(row);
- (*scales)[i] = output_expr_ctxs_[i]->root()->output_scale();
- }
- return Status::OK();
-}
-
Status ImpalaServer::QueryExecState::Cancel(bool check_inflight, const Status* cause) {
Coordinator* coord;
{
@@ -931,28 +902,6 @@ Status ImpalaServer::QueryExecState::UpdateCatalog() {
return Status::OK();
}
-Status ImpalaServer::QueryExecState::FetchNextBatch() {
- DCHECK(!eos_);
- DCHECK(coord_.get() != NULL);
-
- // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_
- // ensures that we do not call coord_->GetNext() multiple times concurrently.
- lock_.unlock();
- Status status = coord_->GetNext(¤t_batch_, coord_->runtime_state());
- lock_.lock();
- if (!status.ok()) return status;
-
- // Check if query status has changed during GetNext() call
- if (!query_status_.ok()) {
- current_batch_ = NULL;
- return query_status_;
- }
-
- current_batch_row_ = 0;
- eos_ = current_batch_ == NULL;
- return Status::OK();
-}
-
void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& results) {
request_result_set_.reset(new vector<TResultRow>);
request_result_set_->resize(results.size());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h
index 0a763ff..54ee929 100644
--- a/be/src/service/query-exec-state.h
+++ b/be/src/service/query-exec-state.h
@@ -248,7 +248,7 @@ class ImpalaServer::QueryExecState {
/// Resource assignment determined by scheduler. Owned by obj_pool_.
boost::scoped_ptr<QuerySchedule> schedule_;
- /// not set for ddl queries, or queries with "limit 0"
+ /// Not set for ddl queries.
boost::scoped_ptr<Coordinator> coord_;
/// Runs statements that query or modify the catalog via the CatalogService.
@@ -293,7 +293,7 @@ class ImpalaServer::QueryExecState {
MonotonicStopWatch client_wait_sw_;
RuntimeProfile::EventSequence* query_events_;
- std::vector<ExprContext*> output_expr_ctxs_;
+
bool is_cancelled_; // if true, Cancel() was called.
bool eos_; // if true, there are no more rows to return
// We enforce the invariant that query_status_ is not OK iff query_state_
@@ -356,13 +356,6 @@ class ImpalaServer::QueryExecState {
/// Caller needs to hold fetch_rows_lock_ and lock_.
Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows);
- /// Fetch the next row batch and store the results in current_batch_. Only called for
- /// non-DDL / DML queries. current_batch_ is set to NULL if execution is complete or the
- /// query was cancelled.
- /// Caller needs to hold fetch_rows_lock_ and lock_. Blocks, during which time lock_ is
- /// released.
- Status FetchNextBatch();
-
/// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in
/// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'.
/// result and scales must have been resized to the number of columns before call.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/query-result-set.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h
new file mode 100644
index 0000000..b444ca3
--- /dev/null
+++ b/be/src/service/query-result-set.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_SERVICE_QUERY_RESULT_SET_H
+#define IMPALA_SERVICE_QUERY_RESULT_SET_H
+
+#include "common/status.h"
+#include "gen-cpp/Data_types.h"
+
+#include <vector>
+
+namespace impala {
+
+/// Stores client-ready query result rows returned by
+/// QueryExecState::FetchRows(). Subclasses implement AddRows() / AddOneRow() to
+/// specialise how Impala's row batches are converted to client-API result
+/// representations.
+class QueryResultSet {
+ public:
+ QueryResultSet() {}
+ virtual ~QueryResultSet() {}
+
+ /// Add a single row to this result set. The row is a vector of pointers to values,
+ /// whose memory belongs to the caller. 'scales' contains the scales for decimal values
+ /// (# of digits after decimal), with -1 indicating no scale specified or the
+ /// corresponding value is not a decimal.
+ virtual Status AddOneRow(
+ const std::vector<void*>& row, const std::vector<int>& scales) = 0;
+
+ /// Add the TResultRow to this result set. When a row comes from a DDL/metadata
+ /// operation, the row in the form of TResultRow.
+ virtual Status AddOneRow(const TResultRow& row) = 0;
+
+ /// Copies rows in the range [start_idx, start_idx + num_rows) from the other result
+ /// set into this result set. Returns the number of rows added to this result set.
+ /// Returns 0 if the given range is out of bounds of the other result set.
+ virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) = 0;
+
+ /// Returns the approximate size of this result set in bytes.
+ int64_t ByteSize() { return ByteSize(0, size()); }
+
+ /// Returns the approximate size of the given range of rows in bytes.
+ virtual int64_t ByteSize(int start_idx, int num_rows) = 0;
+
+ /// Returns the size of this result set in number of rows.
+ virtual size_t size() = 0;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index cf0e28e..b28f7fc 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -34,6 +34,7 @@
DECLARE_string(ssl_server_certificate);
DECLARE_string(ssl_private_key);
+DECLARE_int32(be_port);
using namespace apache::thrift;
using namespace impala;
@@ -43,6 +44,9 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts(
for (int tries = 0; tries < 10; ++tries) {
int backend_port = FindUnusedEphemeralPort();
if (backend_port == -1) continue;
+ // This flag is read directly in several places to find the address of the local
+ // backend interface.
+ FLAGS_be_port = backend_port;
int subscriber_port = FindUnusedEphemeralPort();
if (subscriber_port == -1) continue;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/common/thrift/DataSinks.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 12a75b9..83c63b7 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -26,7 +26,8 @@ include "Partitions.thrift"
enum TDataSinkType {
DATA_STREAM_SINK,
TABLE_SINK,
- JOIN_BUILD_SINK
+ JOIN_BUILD_SINK,
+ PLAN_ROOT_SINK
}
enum TSinkAction {
@@ -87,10 +88,10 @@ struct TJoinBuildSink {
// Union type of all table sinks.
struct TTableSink {
- 1: required Types.TTableId target_table_id
+ 1: required Types.TTableId target_table_id
2: required TTableSinkType type
3: required TSinkAction action
- 4: optional THdfsTableSink hdfs_table_sink
+ 4: optional THdfsTableSink hdfs_table_sink
5: optional TKuduTableSink kudu_table_sink
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index b02bc73..392b961 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -25,6 +25,8 @@ import java.util.Set;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.TreeNode;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.PlanRootSink;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
@@ -409,6 +411,10 @@ public abstract class QueryStmt extends StatementBase {
resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
}
+ public DataSink createDataSink() {
+ return new PlanRootSink();
+ }
+
public ArrayList<OrderByElement> cloneOrderByElements() {
if (orderByElements_ == null) return null;
ArrayList<OrderByElement> result =
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
new file mode 100644
index 0000000..a199f54
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.impala.thrift.TDataSink;
+import org.apache.impala.thrift.TDataSinkType;
+import org.apache.impala.thrift.TExplainLevel;
+
+/**
+ * Sink for the root of a query plan that produces result rows. Allows coordination
+ * between the sender which produces those rows, and the consumer which sends them to the
+ * client, despite both executing concurrently.
+ */
+public class PlanRootSink extends DataSink {
+
+ public String getExplainString(String prefix, String detailPrefix,
+ TExplainLevel explainLevel) {
+ return String.format("%sPLAN-ROOT SINK\n", prefix);
+ }
+
+ protected TDataSink toThrift() {
+ return new TDataSink(TDataSinkType.PLAN_ROOT_SINK);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 405eebe..ed4c677 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -149,6 +149,8 @@ public class Planner {
} else if (ctx_.isDelete()) {
// Set up delete sink for root fragment
rootFragment.setSink(ctx_.getAnalysisResult().getDeleteStmt().createDataSink());
+ } else if (ctx_.isQuery()) {
+ rootFragment.setSink(ctx_.getAnalysisResult().getQueryStmt().createDataSink());
}
QueryStmt queryStmt = ctx_.getQueryStmt();
queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
index 29cca13..3275a7a 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
@@ -90,6 +90,7 @@ public class PlannerContext {
public boolean isInsertOrCtas() {
return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt();
}
+ public boolean isQuery() { return analysisResult_.isQueryStmt(); }
public boolean hasSubplan() { return !subplans_.isEmpty(); }
public SubplanNode getSubplan() { return subplans_.getFirst(); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index 47bfb23..d7838f9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -3,12 +3,16 @@ select count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), sum(tin
avg(tinyint_col)
from functional.alltypesagg
---- PLAN
+PLAN-ROOT SINK
+|
01:AGGREGATE [FINALIZE]
| output: count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col)
|
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
03:AGGREGATE [FINALIZE]
| output: count:merge(*), count:merge(tinyint_col), min:merge(tinyint_col), max:merge(tinyint_col), sum:merge(tinyint_col), avg:merge(tinyint_col)
|
@@ -26,6 +30,8 @@ avg(tinyint_col)
from functional.alltypesagg
group by 2, 1
---- PLAN
+PLAN-ROOT SINK
+|
01:AGGREGATE [FINALIZE]
| output: count(*), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col)
| group by: bigint_col, tinyint_col
@@ -33,6 +39,8 @@ group by 2, 1
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
04:EXCHANGE [UNPARTITIONED]
|
03:AGGREGATE [FINALIZE]
@@ -54,6 +62,8 @@ from functional.testtbl
having count(id) > 0
order by avg(zip) limit 10
---- PLAN
+PLAN-ROOT SINK
+|
02:TOP-N [LIMIT=10]
| order by: avg(zip) ASC
|
@@ -64,6 +74,8 @@ order by avg(zip) limit 10
00:SCAN HDFS [functional.testtbl]
partitions=1/1 files=0 size=0B
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
02:TOP-N [LIMIT=10]
| order by: avg(zip) ASC
|
@@ -85,6 +97,8 @@ from functional.alltypesagg
group by int_col + int_col, int_col * int_col, int_col + int_col
having (int_col * int_col) < 0 limit 10
---- PLAN
+PLAN-ROOT SINK
+|
01:AGGREGATE [FINALIZE]
| group by: int_col + int_col, int_col * int_col
| having: int_col * int_col < 0
@@ -93,6 +107,8 @@ having (int_col * int_col) < 0 limit 10
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
04:EXCHANGE [UNPARTITIONED]
| limit: 10
|
@@ -116,6 +132,8 @@ functional.alltypes t1 inner join functional.alltypestiny t2
group by t1.tinyint_col, t2.smallint_col
having count(t2.int_col) = count(t1.bigint_col)
---- PLAN
+PLAN-ROOT SINK
+|
03:AGGREGATE [FINALIZE]
| output: count(*), count(t2.int_col), count(t1.bigint_col)
| group by: t1.tinyint_col, t2.smallint_col
@@ -141,6 +159,8 @@ select 1 from
group by int_col) t
where t.x > 10
---- PLAN
+PLAN-ROOT SINK
+|
01:AGGREGATE [FINALIZE]
| output: avg(bigint_col)
| group by: int_col
@@ -157,6 +177,8 @@ select count(*) from
select * from functional.alltypessmall) t
limit 10
---- PLAN
+PLAN-ROOT SINK
+|
03:AGGREGATE [FINALIZE]
| output: count(*)
| limit: 10
@@ -169,6 +191,8 @@ limit 10
01:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
05:AGGREGATE [FINALIZE]
| output: count:merge(*)
| limit: 10
@@ -194,6 +218,8 @@ select count(*) from
group by t.bigint_col
limit 10
---- PLAN
+PLAN-ROOT SINK
+|
03:AGGREGATE [FINALIZE]
| output: count(*)
| group by: bigint_col
@@ -207,6 +233,8 @@ limit 10
01:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
06:EXCHANGE [UNPARTITIONED]
| limit: 10
|
@@ -237,6 +265,8 @@ from
select * from functional.alltypessmall) t
limit 10
---- PLAN
+PLAN-ROOT SINK
+|
04:AGGREGATE [FINALIZE]
| output: count(int_col)
| limit: 10
@@ -252,6 +282,8 @@ limit 10
01:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
08:AGGREGATE [FINALIZE]
| output: count:merge(int_col)
| limit: 10
@@ -286,6 +318,8 @@ from
group by t.bigint_col
limit 10
---- PLAN
+PLAN-ROOT SINK
+|
04:AGGREGATE [FINALIZE]
| output: count(int_col)
| group by: t.bigint_col
@@ -302,6 +336,8 @@ limit 10
01:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
07:EXCHANGE [UNPARTITIONED]
| limit: 10
|
@@ -334,6 +370,8 @@ from
select * from functional.alltypessmall) t
limit 10
---- PLAN
+PLAN-ROOT SINK
+|
04:AGGREGATE [FINALIZE]
| output: count(int_col), count:merge(smallint_col)
| limit: 10
@@ -350,6 +388,8 @@ limit 10
01:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
08:AGGREGATE [FINALIZE]
| output: count:merge(int_col), count:merge(smallint_col)
| limit: 10
@@ -386,6 +426,8 @@ from
group by t.bigint_col
limit 10
---- PLAN
+PLAN-ROOT SINK
+|
04:AGGREGATE [FINALIZE]
| output: count(int_col), count:merge(smallint_col)
| group by: t.bigint_col
@@ -403,6 +445,8 @@ limit 10
01:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
07:EXCHANGE [UNPARTITIONED]
| limit: 10
|
@@ -438,6 +482,8 @@ from
group by t.bigint_col
limit 10
---- PLAN
+PLAN-ROOT SINK
+|
05:AGGREGATE [FINALIZE]
| output: count(int_col), count:merge(smallint_col)
| group by: t.bigint_col
@@ -458,6 +504,8 @@ limit 10
01:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
10:EXCHANGE [UNPARTITIONED]
| limit: 10
|
@@ -495,6 +543,8 @@ limit 10
# test that aggregations are not placed below an unpartitioned exchange with a limit
select count(*) from (select * from functional.alltypes limit 10) t
---- PLAN
+PLAN-ROOT SINK
+|
01:AGGREGATE [FINALIZE]
| output: count(*)
|
@@ -502,6 +552,8 @@ select count(*) from (select * from functional.alltypes limit 10) t
partitions=24/24 files=24 size=478.45KB
limit: 10
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
01:AGGREGATE [FINALIZE]
| output: count(*)
|
@@ -518,6 +570,8 @@ select count(*) from
union all
(select * from functional.alltypessmall) limit 10) t
---- PLAN
+PLAN-ROOT SINK
+|
03:AGGREGATE [FINALIZE]
| output: count(*)
|
@@ -530,6 +584,8 @@ select count(*) from
01:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
03:AGGREGATE [FINALIZE]
| output: count(*)
|
@@ -555,6 +611,8 @@ select * from (
limit 2) v
limit 1
---- PLAN
+PLAN-ROOT SINK
+|
06:AGGREGATE [FINALIZE]
| output: count(cnt)
| limit: 1
@@ -580,6 +638,8 @@ limit 1
partitions=11/11 files=11 size=814.73KB
runtime filters: RF000 -> t1.id
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
06:AGGREGATE [FINALIZE]
| output: count(cnt)
| limit: 1
@@ -629,6 +689,8 @@ select * from
group by 1, 2, 3, 4) v
where v.a = v.b and v.b = v.c and v.c = v.d and v.a = v.c and v.a = v.d
---- PLAN
+PLAN-ROOT SINK
+|
01:AGGREGATE [FINALIZE]
| group by: tinyint_col, smallint_col, int_col + int_col, coalesce(bigint_col, year)
| having: int_col + int_col = coalesce(bigint_col, year), smallint_col = int_col + int_col
@@ -643,6 +705,8 @@ select cnt from
from functional.alltypestiny
group by bool_col, x) v
---- PLAN
+PLAN-ROOT SINK
+|
01:AGGREGATE [FINALIZE]
| output: count(*)
| group by: bool_col, CAST(NULL AS INT)
@@ -656,6 +720,8 @@ select cnt from
from functional.alltypestiny
group by bool_col, x) v
---- PLAN
+PLAN-ROOT SINK
+|
02:AGGREGATE [FINALIZE]
| output: count(int_col)
| group by: bool_col, NULL
@@ -669,6 +735,8 @@ select cnt from
# test simple group_concat with distinct
select group_concat(distinct string_col) from functional.alltypesagg
---- PLAN
+PLAN-ROOT SINK
+|
02:AGGREGATE [FINALIZE]
| output: group_concat(string_col)
|
@@ -678,6 +746,8 @@ select group_concat(distinct string_col) from functional.alltypesagg
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
06:AGGREGATE [FINALIZE]
| output: group_concat:merge(string_col)
|
@@ -702,6 +772,8 @@ select day, group_concat(distinct string_col)
from (select * from functional.alltypesagg where id % 100 = day order by id limit 99999) a
group by day
---- PLAN
+PLAN-ROOT SINK
+|
03:AGGREGATE [FINALIZE]
| output: group_concat(string_col)
| group by: day
@@ -716,6 +788,8 @@ group by day
partitions=11/11 files=11 size=814.73KB
predicates: id % 100 = day
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
03:AGGREGATE [FINALIZE]
| output: group_concat(string_col)
| group by: day
@@ -739,6 +813,8 @@ select count(distinct cast(timestamp_col as string)),
group_concat(distinct cast(timestamp_col as string))
from functional.alltypesagg group by year
---- PLAN
+PLAN-ROOT SINK
+|
02:AGGREGATE [FINALIZE]
| output: count(CAST(timestamp_col AS STRING)), group_concat(CAST(timestamp_col AS STRING))
| group by: year
@@ -749,6 +825,8 @@ from functional.alltypesagg group by year
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
05:EXCHANGE [UNPARTITIONED]
|
02:AGGREGATE [FINALIZE]
@@ -769,6 +847,8 @@ from functional.alltypesagg group by year
# test group_concat distinct with other non-distinct aggregate functions
select group_concat(distinct string_col), count(*) from functional.alltypesagg
---- PLAN
+PLAN-ROOT SINK
+|
02:AGGREGATE [FINALIZE]
| output: group_concat(string_col), count:merge(*)
|
@@ -779,6 +859,8 @@ from functional.alltypesagg group by year
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
06:AGGREGATE [FINALIZE]
| output: group_concat:merge(string_col), count:merge(*)
|
@@ -804,6 +886,8 @@ from functional.alltypesagg group by year
select group_concat(distinct string_col, '-'), sum(int_col), count(distinct string_col)
from functional.alltypesagg
---- PLAN
+PLAN-ROOT SINK
+|
02:AGGREGATE [FINALIZE]
| output: group_concat(string_col, '-'), count(string_col), sum:merge(int_col)
|
@@ -814,6 +898,8 @@ from functional.alltypesagg
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
06:AGGREGATE [FINALIZE]
| output: group_concat:merge(string_col, '-'), count:merge(string_col), sum:merge(int_col)
|
@@ -841,6 +927,8 @@ select month, year, count(*), count(distinct date_string_col),
group_concat(distinct date_string_col, '-') from functional.alltypesagg
group by month, year
---- PLAN
+PLAN-ROOT SINK
+|
02:AGGREGATE [FINALIZE]
| output: count(date_string_col), group_concat(date_string_col, '-'), count:merge(*)
| group by: month, year
@@ -852,6 +940,8 @@ group by month, year
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
05:EXCHANGE [UNPARTITIONED]
|
02:AGGREGATE [FINALIZE]
@@ -875,6 +965,8 @@ group by month, year
select group_concat(distinct string_col), group_concat(distinct string_col, '-'),
group_concat(distinct string_col, '---') from functional.alltypesagg
---- PLAN
+PLAN-ROOT SINK
+|
02:AGGREGATE [FINALIZE]
| output: group_concat(string_col), group_concat(string_col, '-'), group_concat(string_col, '---')
|
@@ -884,6 +976,8 @@ group_concat(distinct string_col, '---') from functional.alltypesagg
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
06:AGGREGATE [FINALIZE]
| output: group_concat:merge(string_col), group_concat:merge(string_col, '-'), group_concat:merge(string_col, '---')
|
@@ -906,6 +1000,8 @@ group_concat(distinct string_col, '---') from functional.alltypesagg
# IMPALA-852: Aggregation only in the HAVING clause.
select 1 from functional.alltypestiny having count(*) > 0
---- PLAN
+PLAN-ROOT SINK
+|
01:AGGREGATE [FINALIZE]
| output: count(*)
| having: count(*) > 0
@@ -923,6 +1019,8 @@ group by 1
having count(*) < 150000
limit 1000000
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
06:EXCHANGE [UNPARTITIONED]
| limit: 1000000
|
@@ -957,6 +1055,8 @@ select col from (
where col > 50
limit 50
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
08:AGGREGATE [FINALIZE]
| output: count:merge(c_custkey)
| having: count(c_custkey) > 50
@@ -992,6 +1092,8 @@ select straight_join c_custkey, count(distinct c_custkey)
from tpch_parquet.orders inner join [shuffle] tpch_parquet.customer on c_custkey = o_custkey
group by 1
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
07:EXCHANGE [UNPARTITIONED]
|
04:AGGREGATE [FINALIZE]
@@ -1029,6 +1131,8 @@ group by 1, 2
having count(*) > 10
limit 10
---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
09:EXCHANGE [UNPARTITIONED]
| limit: 10
|