You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:55:57 UTC
[03/13] incubator-impala git commit: IMPALA-2550: Switch to per-query
exec rpc
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h
deleted file mode 100644
index 8ac066e..0000000
--- a/be/src/service/query-exec-state.h
+++ /dev/null
@@ -1,408 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_SERVICE_QUERY_EXEC_STATE_H
-#define IMPALA_SERVICE_QUERY_EXEC_STATE_H
-
-#include "common/status.h"
-#include "exec/catalog-op-executor.h"
-#include "util/runtime-profile.h"
-#include "runtime/timestamp-value.h"
-#include "service/child-query.h"
-#include "scheduling/query-schedule.h"
-#include "gen-cpp/Frontend_types.h"
-#include "service/impala-server.h"
-#include "gen-cpp/Frontend_types.h"
-#include "util/auth-util.h"
-
-#include <boost/thread.hpp>
-#include <boost/unordered_set.hpp>
-#include <vector>
-
-namespace impala {
-
-class ExecEnv;
-class Coordinator;
-class RuntimeState;
-class RowBatch;
-class Expr;
-class TupleRow;
-class Frontend;
-class QueryExecStateCleaner;
-
-/// Execution state of a query. This captures everything necessary
-/// to convert row batches received by the coordinator into results
-/// we can return to the client. It also captures all state required for
-/// servicing query-related requests from the client.
-/// Thread safety: this class is generally not thread-safe, callers need to
-/// synchronize access explicitly via lock(). See the ImpalaServer class comment for
-/// the required lock acquisition order.
-///
-/// TODO: Consider renaming to RequestExecState for consistency.
-/// TODO: Compute stats is the only stmt that requires child queries. Once the
-/// CatalogService performs background stats gathering the concept of child queries
-/// will likely become obsolete. Remove all child-query related code from this class.
-class ImpalaServer::QueryExecState {
- public:
- QueryExecState(const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
- ImpalaServer* server, std::shared_ptr<ImpalaServer::SessionState> session);
-
- ~QueryExecState();
-
- /// Initiates execution of a exec_request.
- /// Non-blocking.
- /// Must *not* be called with lock_ held.
- Status Exec(TExecRequest* exec_request);
-
- /// Execute a HiveServer2 metadata operation
- /// TODO: This is likely a superset of GetTableNames/GetDbs. Coalesce these different
- /// code paths.
- Status Exec(const TMetadataOpRequest& exec_request);
-
- /// Call this to ensure that rows are ready when calling FetchRows(). Updates the
- /// query_status_, and advances query_state_ to FINISHED or EXCEPTION. Must be preceded
- /// by call to Exec(). Waits for all child queries to complete. Takes lock_.
- void Wait();
-
- /// Calls Wait() asynchronously in a thread and returns immediately.
- void WaitAsync();
-
- /// BlockOnWait() may be called after WaitAsync() has been called in order to wait
- /// for the asynchronous thread (wait_thread_) to complete. It is safe to call this
- /// from multiple threads (all threads will block until wait_thread_ has completed)
- /// and multiple times (non-blocking once wait_thread_ has completed). Do not call
- /// while holding lock_.
- void BlockOnWait();
-
- /// Return at most max_rows from the current batch. If the entire current batch has
- /// been returned, fetch another batch first.
- /// Caller needs to hold fetch_rows_lock_ and lock_.
- /// Caller should verify that EOS has not be reached before calling.
- /// Must be preceeded by call to Wait() (or WaitAsync()/BlockOnWait()).
- /// Also updates query_state_/status_ in case of error.
- Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows);
-
- /// Resets the state of this query such that the next fetch() returns results from the
- /// beginning of the query result set (by using the using result_cache_).
- /// It is valid to call this function for any type of statement that returns a result
- /// set, including queries, show stmts, compute stats, etc.
- /// Returns a recoverable error status if the restart is not possible, ok() otherwise.
- /// The error is recoverable to allow clients to resume fetching.
- /// The caller must hold fetch_rows_lock_ and lock_.
- Status RestartFetch();
-
- /// Update query state if the requested state isn't already obsolete. This is only for
- /// non-error states - if the query encounters an error the query status needs to be set
- /// with information about the error so UpdateQueryStatus must be used instead.
- /// Takes lock_.
- void UpdateNonErrorQueryState(beeswax::QueryState::type query_state);
-
- /// Update the query status and the "Query Status" summary profile string.
- /// If current status is already != ok, no update is made (we preserve the first error)
- /// If called with a non-ok argument, the expectation is that the query will be aborted
- /// quickly.
- /// Returns the status argument (so we can write
- /// RETURN_IF_ERROR(UpdateQueryStatus(SomeOperation())).
- /// Does not take lock_, but requires it: caller must ensure lock_
- /// is taken before calling UpdateQueryStatus
- Status UpdateQueryStatus(const Status& status);
-
- /// Cancels the child queries and the coordinator with the given cause.
- /// If cause is NULL, assume this was deliberately cancelled by the user.
- /// Otherwise, sets state to EXCEPTION.
- /// Does nothing if the query has reached EOS or already cancelled.
- ///
- /// Only returns an error if 'check_inflight' is true and the query is not yet
- /// in-flight. Otherwise, proceed and return Status::OK() even if the query isn't
- /// in-flight (for cleaning up after an error on the query issuing path).
- Status Cancel(bool check_inflight, const Status* cause);
-
- /// This is called when the query is done (finished, cancelled, or failed).
- /// Takes lock_: callers must not hold lock() before calling.
- void Done();
-
- /// Sets the API-specific (Beeswax, HS2) result cache and its size bound.
- /// The given cache is owned by this query exec state, even if an error is returned.
- /// Returns a non-ok status if max_size exceeds the per-impalad allowed maximum.
- Status SetResultCache(QueryResultSet* cache, int64_t max_size);
-
- ImpalaServer::SessionState* session() const { return session_.get(); }
-
- /// Queries are run and authorized on behalf of the effective_user.
- const std::string& effective_user() const {
- return GetEffectiveUser(query_ctx_.session);
- }
- const std::string& connected_user() const { return query_ctx_.session.connected_user; }
- const std::string& do_as_user() const { return query_ctx_.session.delegated_user; }
- TSessionType::type session_type() const { return query_ctx_.session.session_type; }
- const TUniqueId& session_id() const { return query_ctx_.session.session_id; }
- const std::string& default_db() const { return query_ctx_.session.database; }
- bool eos() const { return eos_; }
- Coordinator* coord() const { return coord_.get(); }
- QuerySchedule* schedule() { return schedule_.get(); }
-
- /// Resource pool associated with this query, or an empty string if the schedule has not
- /// been created and had the pool set yet, or this StmtType doesn't go through admission
- /// control.
- std::string request_pool() const {
- return schedule_ == nullptr ? "" : schedule_->request_pool();
- }
- int num_rows_fetched() const { return num_rows_fetched_; }
- void set_fetched_rows() { fetched_rows_ = true; }
- bool fetched_rows() const { return fetched_rows_; }
- bool returns_result_set() { return !result_metadata_.columns.empty(); }
- const TResultSetMetadata* result_metadata() { return &result_metadata_; }
- const TUniqueId& query_id() const { return query_ctx_.query_id; }
- const TExecRequest& exec_request() const { return exec_request_; }
- TStmtType::type stmt_type() const { return exec_request_.stmt_type; }
- TCatalogOpType::type catalog_op_type() const {
- return exec_request_.catalog_op_request.op_type;
- }
- TDdlType::type ddl_type() const {
- return exec_request_.catalog_op_request.ddl_params.ddl_type;
- }
- boost::mutex* lock() { return &lock_; }
- boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
- beeswax::QueryState::type query_state() const { return query_state_; }
- const Status& query_status() const { return query_status_; }
- void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; }
- const RuntimeProfile& profile() const { return profile_; }
- const RuntimeProfile& summary_profile() const { return summary_profile_; }
- const TimestampValue& start_time() const { return start_time_; }
- const TimestampValue& end_time() const { return end_time_; }
- const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; }
- const TQueryOptions& query_options() const {
- return query_ctx_.client_request.query_options;
- }
- /// Returns 0:0 if this is a root query
- TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
-
- const std::vector<std::string>& GetAnalysisWarnings() const {
- return exec_request_.analysis_warnings;
- }
-
- inline int64_t last_active_ms() const {
- boost::lock_guard<boost::mutex> l(expiration_data_lock_);
- return last_active_time_ms_;
- }
-
- /// Returns true if Impala is actively processing this query.
- inline bool is_active() const {
- boost::lock_guard<boost::mutex> l(expiration_data_lock_);
- return ref_count_ > 0;
- }
-
- RuntimeProfile::EventSequence* query_events() const { return query_events_; }
- RuntimeProfile* summary_profile() { return &summary_profile_; }
-
- private:
- const TQueryCtx query_ctx_;
-
- /// Ensures single-threaded execution of FetchRows(). Callers of FetchRows() are
- /// responsible for acquiring this lock. To avoid deadlocks, callers must not hold lock_
- /// while acquiring this lock (since FetchRows() will release and re-acquire lock_ during
- /// its execution).
- /// See "Locking" in the class comment for lock acquisition order.
- boost::mutex fetch_rows_lock_;
-
- /// Protects last_active_time_ms_ and ref_count_. Only held during short function calls
- /// - no other locks should be acquired while holding this lock.
- mutable boost::mutex expiration_data_lock_;
-
- /// Stores the last time that the query was actively doing work, in Unix milliseconds.
- int64_t last_active_time_ms_;
-
- /// ref_count_ > 0 if Impala is currently performing work on this query's behalf. Every
- /// time a client instructs Impala to do work on behalf of this query, the ref count is
- /// increased, and decreased once that work is completed.
- uint32_t ref_count_;
-
- /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL.
- const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_;
-
- /// Protects all following fields. Acquirers should be careful not to hold it for too
- /// long, e.g. during RPCs because this lock is required to make progress on various
- /// ImpalaServer requests. If held for too long it can block progress of client
- /// requests for this query, e.g. query status and cancellation. Furthermore, until
- /// IMPALA-3882 is fixed, it can indirectly block progress on all other queries.
- /// See "Locking" in the class comment for lock acquisition order.
- boost::mutex lock_;
-
- ExecEnv* exec_env_;
-
- /// Thread for asynchronously running Wait().
- boost::scoped_ptr<Thread> wait_thread_;
-
- /// Condition variable to make BlockOnWait() thread-safe. One thread joins
- /// wait_thread_, and all other threads block on this cv. Used with lock_.
- boost::condition_variable block_on_wait_cv_;
-
- /// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe.
- bool is_block_on_wait_joining_;
-
- /// Session that this query is from
- std::shared_ptr<SessionState> session_;
-
- /// Resource assignment determined by scheduler. Owned by obj_pool_.
- boost::scoped_ptr<QuerySchedule> schedule_;
-
- /// Not set for ddl queries.
- boost::scoped_ptr<Coordinator> coord_;
-
- /// Runs statements that query or modify the catalog via the CatalogService.
- boost::scoped_ptr<CatalogOpExecutor> catalog_op_executor_;
-
- /// Result set used for requests that return results and are not QUERY
- /// statements. For example, EXPLAIN, LOAD, and SHOW use this.
- boost::scoped_ptr<std::vector<TResultRow>> request_result_set_;
-
- /// Cache of the first result_cache_max_size_ query results to allow clients to restart
- /// fetching from the beginning of the result set. This cache is appended to in
- /// FetchInternal(), and set to NULL if its bound is exceeded. If the bound is exceeded,
- /// then clients cannot restart fetching because some results have been lost since the
- /// last fetch. Only set if result_cache_max_size_ > 0.
- boost::scoped_ptr<QueryResultSet> result_cache_;
-
- /// Max size of the result_cache_ in number of rows. A value <= 0 means no caching.
- int64_t result_cache_max_size_;
-
- ObjectPool profile_pool_;
-
- /// The QueryExecState builds three separate profiles.
- /// * profile_ is the top-level profile which houses the other
- /// profiles, plus the query timeline
- /// * summary_profile_ contains mostly static information about the
- /// query, including the query statement, the plan and the user who submitted it.
- /// * server_profile_ tracks time spent inside the ImpalaServer,
- /// but not inside fragment execution, i.e. the time taken to
- /// register and set-up the query and for rows to be fetched.
- //
- /// There's a fourth profile which is not built here (but is a
- /// child of profile_); the execution profile which tracks the
- /// actual fragment execution.
- RuntimeProfile profile_;
- RuntimeProfile server_profile_;
- RuntimeProfile summary_profile_;
- RuntimeProfile::Counter* row_materialization_timer_;
-
- /// Tracks how long we are idle waiting for a client to fetch rows.
- RuntimeProfile::Counter* client_wait_timer_;
- /// Timer to track idle time for the above counter.
- MonotonicStopWatch client_wait_sw_;
-
- RuntimeProfile::EventSequence* query_events_;
-
- bool is_cancelled_; // if true, Cancel() was called.
- bool eos_; // if true, there are no more rows to return
- // We enforce the invariant that query_status_ is not OK iff query_state_
- // is EXCEPTION, given that lock_ is held.
- beeswax::QueryState::type query_state_;
- Status query_status_;
- TExecRequest exec_request_;
-
- TResultSetMetadata result_metadata_; // metadata for select query
- RowBatch* current_batch_; // the current row batch; only applicable if coord is set
- int current_batch_row_; // number of rows fetched within the current batch
- int num_rows_fetched_; // number of rows fetched by client for the entire query
-
- /// True if a fetch was attempted by a client, regardless of whether a result set
- /// (or error) was returned to the client.
- bool fetched_rows_;
-
- /// To get access to UpdateCatalog, LOAD, and DDL methods. Not owned.
- Frontend* frontend_;
-
- /// The parent ImpalaServer; called to wait until the the impalad has processed a
- /// catalog update request. Not owned.
- ImpalaServer* parent_server_;
-
- /// Start/end time of the query
- TimestampValue start_time_, end_time_;
-
- /// Executes a local catalog operation (an operation that does not need to execute
- /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
- Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op);
-
- /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently not
- /// doing any work. Takes expiration_data_lock_.
- void MarkInactive();
-
- /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently being
- /// actively processed. Takes expiration_data_lock_.
- void MarkActive();
-
- /// Core logic of initiating a query or dml execution request.
- /// Initiates execution of plan fragments, if there are any, and sets
- /// up the output exprs for subsequent calls to FetchRows().
- /// 'coord_' is only valid after this method is called, and may be invalid if it
- /// returns an error.
- /// Also sets up profile and pre-execution counters.
- /// Non-blocking.
- Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request);
-
- /// Core logic of executing a ddl statement. May internally initiate execution of
- /// queries (e.g., compute stats) or dml (e.g., create table as select)
- Status ExecDdlRequest();
-
- /// Executes a LOAD DATA
- Status ExecLoadDataRequest();
-
- /// Core logic of Wait(). Does not update query_state_/status_.
- Status WaitInternal();
-
- /// Core logic of FetchRows(). Does not update query_state_/status_.
- /// Caller needs to hold fetch_rows_lock_ and lock_.
- Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows);
-
- /// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in
- /// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'.
- /// result and scales must have been resized to the number of columns before call.
- Status GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales);
-
- /// Gather and publish all required updates to the metastore
- Status UpdateCatalog();
-
- /// Copies results into request_result_set_
- /// TODO: Have the FE return list<Data.TResultRow> so that this isn't necessary
- void SetResultSet(const TDdlExecResponse* ddl_resp);
- void SetResultSet(const std::vector<std::string>& results);
- void SetResultSet(const std::vector<std::string>& col1,
- const std::vector<std::string>& col2);
- void SetResultSet(const std::vector<std::string>& col1,
- const std::vector<std::string>& col2, const std::vector<std::string>& col3,
- const std::vector<std::string>& col4);
-
- /// Sets the result set for a CREATE TABLE AS SELECT statement. The results will not be
- /// ready until all BEs complete execution. This can be called as part of Wait(),
- /// at which point results will be avilable.
- void SetCreateTableAsSelectResultSet();
-
- /// Updates the metastore's table and column statistics based on the child-query results
- /// of a compute stats command.
- /// TODO: Unify the various ways that the Metastore is updated for DDL/DML.
- /// For example, INSERT queries update partition metadata in UpdateCatalog() using a
- /// TUpdateCatalogRequest, whereas our DDL uses a TCatalogOpRequest for very similar
- /// purposes. Perhaps INSERT should use a TCatalogOpRequest as well.
- Status UpdateTableAndColumnStats(const std::vector<ChildQuery*>& child_queries);
-
- /// Sets result_cache_ to NULL and updates its associated metrics and mem consumption.
- /// This function is a no-op if the cache has already been cleared.
- void ClearResultCache();
-};
-
-}
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/testutil/desc-tbl-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/desc-tbl-builder.cc b/be/src/testutil/desc-tbl-builder.cc
index 77be724..b4b0a1f 100644
--- a/be/src/testutil/desc-tbl-builder.cc
+++ b/be/src/testutil/desc-tbl-builder.cc
@@ -61,7 +61,7 @@ DescriptorTbl* DescriptorTblBuilder::Build() {
DCHECK(buildDescTblStatus.ok()) << buildDescTblStatus.GetDetail();
DescriptorTbl* desc_tbl;
- Status status = DescriptorTbl::Create(obj_pool_, thrift_desc_tbl_, &desc_tbl);
+ Status status = DescriptorTbl::Create(obj_pool_, thrift_desc_tbl_, nullptr, &desc_tbl);
DCHECK(status.ok()) << status.GetDetail();
return desc_tbl;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/testutil/fault-injection-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h
index c99001c..99816a4 100644
--- a/be/src/testutil/fault-injection-util.h
+++ b/be/src/testutil/fault-injection-util.h
@@ -30,8 +30,8 @@ namespace impala {
#ifndef NDEBUG
enum RpcCallType {
RPC_NULL = 0,
- RPC_EXECPLANFRAGMENT,
- RPC_CANCELPLANFRAGMENT,
+ RPC_EXECQUERYFINSTANCES,
+ RPC_CANCELQUERYFINSTANCES,
RPC_PUBLISHFILTER,
RPC_UPDATEFILTER,
RPC_TRANSMITDATA,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/error-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/error-util-test.cc b/be/src/util/error-util-test.cc
index 2390630..0c940cb 100644
--- a/be/src/util/error-util-test.cc
+++ b/be/src/util/error-util-test.cc
@@ -47,7 +47,7 @@ TEST(ErrorMsg, MergeMap) {
right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p");
right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 3;
- MergeErrorMaps(&left, right);
+ MergeErrorMaps(right, &left);
ASSERT_EQ(2, left.size());
ASSERT_EQ(2, left[TErrorCode::GENERAL].messages.size());
@@ -55,7 +55,7 @@ TEST(ErrorMsg, MergeMap) {
right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p");
right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 3;
- MergeErrorMaps(&left, right);
+ MergeErrorMaps(right, &left);
ASSERT_EQ(2, left.size());
ASSERT_EQ(2, left[TErrorCode::GENERAL].messages.size());
ASSERT_EQ(6, left[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
@@ -74,7 +74,7 @@ TEST(ErrorMsg, MergeMap) {
ASSERT_EQ(2, cleared.size());
ASSERT_EQ(1, cleared.count(TErrorCode::RPC_RECV_TIMEOUT));
- MergeErrorMaps(&dummy, cleared);
+ MergeErrorMaps(cleared, &dummy);
ASSERT_EQ(3, dummy.size());
ASSERT_EQ(3, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
ASSERT_EQ(1, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());
@@ -84,7 +84,7 @@ TEST(ErrorMsg, MergeMap) {
ASSERT_EQ(0, dummy[TErrorCode::GENERAL].count);
ASSERT_EQ(1, dummy[TErrorCode::GENERAL].messages.size());
- MergeErrorMaps(&cleared, dummy);
+ MergeErrorMaps(dummy, &cleared);
ASSERT_EQ(3, cleared.size());
ASSERT_EQ(3, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
ASSERT_EQ(1, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/error-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/error-util.cc b/be/src/util/error-util.cc
index db9aef6..69a0355 100644
--- a/be/src/util/error-util.cc
+++ b/be/src/util/error-util.cc
@@ -159,9 +159,9 @@ string PrintErrorMapToString(const ErrorLogMap& errors) {
return stream.str();
}
-void MergeErrorMaps(ErrorLogMap* left, const ErrorLogMap& right) {
- for (const ErrorLogMap::value_type& v: right) {
- TErrorLogEntry& target = (*left)[v.first];
+void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2) {
+ for (const ErrorLogMap::value_type& v: m1) {
+ TErrorLogEntry& target = (*m2)[v.first];
const TErrorLogEntry& source = v.second;
// Append generic message, append specific codes or increment count if exists
if (v.first == TErrorCode::GENERAL) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/error-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/error-util.h b/be/src/util/error-util.h
index f245c2c..c366ab2 100644
--- a/be/src/util/error-util.h
+++ b/be/src/util/error-util.h
@@ -139,11 +139,11 @@ private:
/// Track log messages per error code.
typedef std::map<TErrorCode::type, TErrorLogEntry> ErrorLogMap;
-/// Merge error maps. Merging of error maps occurs, when the errors from multiple backends
-/// are merged into a single error map. General log messages are simply appended,
-/// specific errors are deduplicated by either appending a new instance or incrementing
-/// the count of an existing one.
-void MergeErrorMaps(ErrorLogMap* left, const ErrorLogMap& right);
+/// Merge error map m1 into m2. Merging of error maps occurs when the errors from
+/// multiple backends are merged into a single error map. General log messages are
+/// simply appended, specific errors are deduplicated by either appending a new
+/// instance or incrementing the count of an existing one.
+void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2);
/// Append an error to the error map. Performs the aggregation as follows: GENERAL errors
/// are appended to the list of GENERAL errors, to keep one item each in the map, while
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/uid-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index 37f804a..1f57298 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -78,6 +78,10 @@ inline int32_t GetInstanceIdx(const TUniqueId& fragment_instance_id) {
return fragment_instance_id.lo & FRAGMENT_IDX_MASK;
}
+inline bool IsValidFInstanceId(const TUniqueId& fragment_instance_id) {
+ return fragment_instance_id.hi != 0L;
+}
+
inline TUniqueId CreateInstanceId(
const TUniqueId& query_id, int32_t instance_idx) {
DCHECK_EQ(GetInstanceIdx(query_id), 0); // well-formed query id
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/common/thrift/ExecStats.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 8e88b20..68a8fd8 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -43,6 +43,7 @@ struct TExecStats {
// Total CPU time spent across all threads. For operators that have an async
// component (e.g. multi-threaded) this will be >= latency_ns.
+ // TODO-MT: remove this or latency_ns
2: optional i64 cpu_time_ns
// Number of rows returned.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 433f3b4..b17aeec 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -280,8 +280,17 @@ struct TClientRequest {
3: optional string redacted_stmt
}
+// Debug options: perform some action in a particular phase of a particular node
+// TODO: find a better name
+struct TDebugOptions {
+ 1: optional Types.TPlanNodeId node_id
+ 2: optional PlanNodes.TExecNodePhase phase
+ 3: optional PlanNodes.TDebugAction action
+}
+
// Context of this query, including the client request, session state and
// global query parameters needed for consistent expr evaluation (e.g., now()).
+//
// TODO: Separate into FE/BE initialized vars.
struct TQueryCtx {
// Client request containing stmt to execute and query options.
@@ -326,8 +335,7 @@ struct TQueryCtx {
// This defaults to -1 when no timestamp is specified.
11: optional i64 snapshot_timestamp = -1;
- // Contains only the union of those descriptors referenced by list of fragments destined
- // for a single host. Optional for frontend tests.
+ // Optional for frontend tests.
12: optional Descriptors.TDescriptorTable desc_tbl
// Milliseconds since UNIX epoch at the start of query execution.
@@ -340,12 +348,31 @@ struct TQueryCtx {
// List of tables with scan ranges that map to blocks with missing disk IDs.
15: optional list<CatalogObjects.TTableName> tables_missing_diskids
+
+ // The pool to which this request has been submitted. Used to update pool statistics
+ // for admission control.
+ 16: optional string request_pool
+}
+
+// Specification of one output destination of a plan fragment
+struct TPlanFragmentDestination {
+ // the globally unique fragment instance id
+ 1: required Types.TUniqueId fragment_instance_id
+
+ // ... which is being executed on this server
+ 2: required Types.TNetworkAddress server
}
// Context to collect information, which is shared among all instances of that plan
// fragment.
struct TPlanFragmentCtx {
1: required Planner.TPlanFragment fragment
+
+ // Output destinations, one per output partition.
+ // The partitioning of the output is specified by
+ // TPlanFragment.output_sink.output_partition.
+ // The number of output partitions is destinations.size().
+ 2: list<TPlanFragmentDestination> destinations
}
// A scan range plus the parameters needed to execute that scan.
@@ -356,56 +383,37 @@ struct TScanRangeParams {
4: optional bool is_remote
}
-// Specification of one output destination of a plan fragment
-struct TPlanFragmentDestination {
- // the globally unique fragment instance id
- 1: required Types.TUniqueId fragment_instance_id
-
- // ... which is being executed on this server
- 2: required Types.TNetworkAddress server
-}
-
-// Execution parameters of a fragment instance, including its unique id, the total number
-// of fragment instances, the query context, the coordinator address, etc.
-// TODO: for range partitioning, we also need to specify the range boundaries
+// Execution parameters of a single fragment instance.
struct TPlanFragmentInstanceCtx {
+ // TPlanFragment.idx
+ 1: required Types.TFragmentIdx fragment_idx
+
// The globally unique fragment instance id.
// Format: query id + query-wide fragment instance index
- // The query-wide fragment instance index starts at 0, so that the query id
- // and the id of the first fragment instance are identical.
+ // The query-wide fragment instance index enumerates all fragment instances of a
+ // particular query. It starts at 0, so that the query id and the id of the first
+ // fragment instance are identical.
// If there is a coordinator instance, it is the first one, with index 0.
- 1: required Types.TUniqueId fragment_instance_id
+ // Range: [0, TExecQueryFInstancesParams.fragment_instance_ctxs.size()-1]
+ 2: required Types.TUniqueId fragment_instance_id
- // Index of this fragment instance accross all instances of its parent fragment,
- // range [0, TPlanFragmentCtx.num_fragment_instances).
- 2: required i32 per_fragment_instance_idx
+ // Index of this fragment instance across all instances of its parent fragment
+ // (TPlanFragment with idx = TPlanFragmentInstanceCtx.fragment_idx).
+ // Range: [0, <# of instances of parent fragment> - 1]
+ 3: required i32 per_fragment_instance_idx
// Initial scan ranges for each scan node in TPlanFragment.plan_tree
- 3: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
+ 4: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
// Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
// needed to create a DataStreamRecvr
// TODO for per-query exec rpc: move these to TPlanFragmentCtx
- 4: required map<Types.TPlanNodeId, i32> per_exch_num_senders
+ 5: required map<Types.TPlanNodeId, i32> per_exch_num_senders
- // Output destinations, one per output partition.
- // The partitioning of the output is specified by
- // TPlanFragment.output_sink.output_partition.
- // The number of output partitions is destinations.size().
- // TODO for per-query exec rpc: move these to TPlanFragmentCtx
- 5: list<TPlanFragmentDestination> destinations
-
- // Debug options: perform some action in a particular phase of a particular node
- 6: optional Types.TPlanNodeId debug_node_id
- 7: optional PlanNodes.TExecNodePhase debug_phase
- 8: optional PlanNodes.TDebugAction debug_action
-
- // The pool to which this request has been submitted. Used to update pool statistics
- // for admission control.
- 9: optional string request_pool
+ // Id of this instance in its role as a sender.
+ 6: optional i32 sender_id
- // Id of this fragment in its role as a sender.
- 10: optional i32 sender_id
+ 7: optional TDebugOptions debug_options
}
@@ -415,29 +423,37 @@ enum ImpalaInternalServiceVersion {
V1
}
+// The following contains the per-rpc structs for the parameters and the result.
-// ExecPlanFragment
+// ExecQueryFInstances
-struct TExecPlanFragmentParams {
+struct TExecQueryFInstancesParams {
1: required ImpalaInternalServiceVersion protocol_version
- // Context of the query, which this fragment is part of.
- 2: optional TQueryCtx query_ctx
+ // this backend's index into Coordinator::backend_states_,
+ // needed for subsequent rpcs to the coordinator
+ // required in V1
+ 2: optional i32 coord_state_idx
- // Context of this fragment.
- 3: optional TPlanFragmentCtx fragment_ctx
+ // required in V1
+ 3: optional TQueryCtx query_ctx
- // Context of this fragment instance, including its instance id, the total number
- // fragment instances, the query context, etc.
- 4: optional TPlanFragmentInstanceCtx fragment_instance_ctx
+ // required in V1
+ 4: list<TPlanFragmentCtx> fragment_ctxs
+
+ // the order corresponds to the order of fragments in fragment_ctxs
+ // required in V1
+ 5: list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
}
-struct TExecPlanFragmentResult {
+struct TExecQueryFInstancesResult {
// required in V1
1: optional Status.TStatus status
}
+
// ReportExecStatus
+
struct TParquetInsertStats {
// For each column, the on disk byte size
1: required map<string, i64> per_column_size
@@ -509,33 +525,43 @@ struct TErrorLogEntry {
2: list<string> messages
}
-struct TReportExecStatusParams {
- 1: required ImpalaInternalServiceVersion protocol_version
-
- // required in V1
- 2: optional Types.TUniqueId query_id
-
+struct TFragmentInstanceExecStatus {
// required in V1
- 3: optional Types.TUniqueId fragment_instance_id
+ 1: optional Types.TUniqueId fragment_instance_id
// Status of fragment execution; any error status means it's done.
// required in V1
- 4: optional Status.TStatus status
+ 2: optional Status.TStatus status
// If true, fragment finished executing.
// required in V1
- 5: optional bool done
+ 3: optional bool done
// cumulative profile
// required in V1
- 6: optional RuntimeProfile.TRuntimeProfileTree profile
+ 4: optional RuntimeProfile.TRuntimeProfileTree profile
+}
+
+struct TReportExecStatusParams {
+ 1: required ImpalaInternalServiceVersion protocol_version
- // Cumulative structural changes made by a table sink
+ // required in V1
+ 2: optional Types.TUniqueId query_id
+
+ // same as TExecQueryFInstancesParams.coord_state_idx
+ // required in V1
+ 3: optional i32 coord_state_idx
+
+ 4: list<TFragmentInstanceExecStatus> instance_exec_status
+
+ // Cumulative structural changes made by the table sink of any instance
+ // included in instance_exec_status
// optional in V1
- 7: optional TInsertExecStatus insert_exec_status;
+ 5: optional TInsertExecStatus insert_exec_status;
- // New errors that have not been reported to the coordinator
- 8: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
+ // New errors that have not been reported to the coordinator by any of the
+ // instances included in instance_exec_status
+ 6: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
}
struct TReportExecStatusResult {
@@ -544,16 +570,16 @@ struct TReportExecStatusResult {
}
-// CancelPlanFragment
+// CancelQueryFInstances
-struct TCancelPlanFragmentParams {
+struct TCancelQueryFInstancesParams {
1: required ImpalaInternalServiceVersion protocol_version
// required in V1
- 2: optional Types.TUniqueId fragment_instance_id
+ 2: optional Types.TUniqueId query_id
}
-struct TCancelPlanFragmentResult {
+struct TCancelQueryFInstancesResult {
// required in V1
1: optional Status.TStatus status
}
@@ -573,7 +599,7 @@ struct TTransmitDataParams {
// required in V1
4: optional Types.TPlanNodeId dest_node_id
- // required in V1
+ // optional in V1
5: optional Results.TRowBatch row_batch
// if set to true, indicates that no more row batches will be sent
@@ -587,6 +613,7 @@ struct TTransmitDataResult {
}
// Parameters for RequestPoolService.resolveRequestPool()
+// TODO: why is this here?
struct TResolveRequestPoolParams {
// User to resolve to a pool via the allocation placement policy and
// authorize for pool access.
@@ -611,6 +638,7 @@ struct TResolveRequestPoolResult {
}
// Parameters for RequestPoolService.getPoolConfig()
+// TODO: why is this here?
struct TPoolConfigParams {
// Pool name
1: required string pool
@@ -655,48 +683,68 @@ struct TBloomFilter {
4: required bool always_true
}
-struct TUpdateFilterResult {
-}
+// UpdateFilter
struct TUpdateFilterParams {
+ 1: required ImpalaInternalServiceVersion protocol_version
+
// Filter ID, unique within a query.
- 1: required i32 filter_id
+ // required in V1
+ 2: optional i32 filter_id
// Query that this filter is for.
- 2: required Types.TUniqueId query_id
+ // required in V1
+ 3: optional Types.TUniqueId query_id
- 3: required TBloomFilter bloom_filter
+ // required in V1
+ 4: optional TBloomFilter bloom_filter
}
-struct TPublishFilterResult {
-
+struct TUpdateFilterResult {
}
+
+// PublishFilter
+
struct TPublishFilterParams {
+ 1: required ImpalaInternalServiceVersion protocol_version
+
// Filter ID to update
- 1: required i32 filter_id
+ // required in V1
+ 2: optional i32 filter_id
- // ID of fragment to receive this filter
- 2: required Types.TUniqueId dst_instance_id
+ // required in V1
+ 3: optional Types.TUniqueId dst_query_id
+
+ // Index of fragment to receive this filter
+ // required in V1
+ 4: optional Types.TFragmentIdx dst_fragment_idx
// Actual bloom_filter payload
- 3: required TBloomFilter bloom_filter
+ // required in V1
+ 5: optional TBloomFilter bloom_filter
}
+struct TPublishFilterResult {
+}
+
+
service ImpalaInternalService {
- // Called by coord to start asynchronous execution of plan fragment in backend.
+ // Called by coord to start asynchronous execution of a query's fragment instances in
+ // backend.
// Returns as soon as all incoming data streams have been set up.
- TExecPlanFragmentResult ExecPlanFragment(1:TExecPlanFragmentParams params);
+ TExecQueryFInstancesResult ExecQueryFInstances(1:TExecQueryFInstancesParams params);
- // Periodically called by backend to report status of plan fragment execution
+ // Periodically called by backend to report status of fragment instance execution
// back to coord; also called when execution is finished, for whatever reason.
TReportExecStatusResult ReportExecStatus(1:TReportExecStatusParams params);
- // Called by coord to cancel execution of a single plan fragment, which this
- // coordinator initiated with a prior call to ExecPlanFragment.
+ // Called by coord to cancel execution of a single query's fragment instances, which
+ // the coordinator initiated with a prior call to ExecQueryFInstances.
// Cancellation is asynchronous.
- TCancelPlanFragmentResult CancelPlanFragment(1:TCancelPlanFragmentParams params);
+ TCancelQueryFInstancesResult CancelQueryFInstances(
+ 1:TCancelQueryFInstancesParams params);
// Called by sender to transmit single row batch. Returns error indication
// if params.fragmentId or params.destNodeId are unknown or if data couldn't be read.
@@ -706,7 +754,7 @@ service ImpalaInternalService {
// the coordinator for aggregation and broadcast.
TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);
- // Called by the coordinator to deliver global runtime filters to fragment instances for
+ // Called by the coordinator to deliver global runtime filters to fragments for
// application at plan nodes.
TPublishFilterResult PublishFilter(1:TPublishFilterParams params);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/tests/common/test_result_verifier.py
----------------------------------------------------------------------
diff --git a/tests/common/test_result_verifier.py b/tests/common/test_result_verifier.py
index a816eaf..7e929f1 100644
--- a/tests/common/test_result_verifier.py
+++ b/tests/common/test_result_verifier.py
@@ -490,7 +490,6 @@ def compute_aggregation(function, field, runtime_profile):
field_regex_re = re.compile(field_regex)
inside_avg_fragment = False
avg_fragment_indent = None
- past_avg_fragment = False
match_list = []
for line in runtime_profile.splitlines():
# Detect the boundaries of the averaged fragment by looking at indentation.
@@ -498,18 +497,17 @@ def compute_aggregation(function, field, runtime_profile):
# its children are at a greater indent. When the indentation gets back to
# the level of the the averaged fragment start, then the averaged fragment
# is done.
+ if start_avg_fragment_re.match(line):
+ inside_avg_fragment = True
+ avg_fragment_indent = len(line) - len(line.lstrip())
+ continue
+
if inside_avg_fragment:
indentation = len(line) - len(line.lstrip())
if indentation > avg_fragment_indent:
continue
else:
inside_avg_fragment = False
- past_avg_fragment = True
-
- if not past_avg_fragment and start_avg_fragment_re.match(line):
- inside_avg_fragment = True
- avg_fragment_indent = len(line) - len(line.lstrip())
- continue
if (field_regex_re.search(line)):
match_list.extend(re.findall(field_regex, line))