You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:55:57 UTC

[03/13] incubator-impala git commit: IMPALA-2550: Switch to per-query exec rpc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h
deleted file mode 100644
index 8ac066e..0000000
--- a/be/src/service/query-exec-state.h
+++ /dev/null
@@ -1,408 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_SERVICE_QUERY_EXEC_STATE_H
-#define IMPALA_SERVICE_QUERY_EXEC_STATE_H
-
-#include "common/status.h"
-#include "exec/catalog-op-executor.h"
-#include "util/runtime-profile.h"
-#include "runtime/timestamp-value.h"
-#include "service/child-query.h"
-#include "scheduling/query-schedule.h"
-#include "gen-cpp/Frontend_types.h"
-#include "service/impala-server.h"
-#include "gen-cpp/Frontend_types.h"
-#include "util/auth-util.h"
-
-#include <boost/thread.hpp>
-#include <boost/unordered_set.hpp>
-#include <vector>
-
-namespace impala {
-
-class ExecEnv;
-class Coordinator;
-class RuntimeState;
-class RowBatch;
-class Expr;
-class TupleRow;
-class Frontend;
-class QueryExecStateCleaner;
-
-/// Execution state of a query. This captures everything necessary
-/// to convert row batches received by the coordinator into results
-/// we can return to the client. It also captures all state required for
-/// servicing query-related requests from the client.
-/// Thread safety: this class is generally not thread-safe, callers need to
-/// synchronize access explicitly via lock(). See the ImpalaServer class comment for
-/// the required lock acquisition order.
-///
-/// TODO: Consider renaming to RequestExecState for consistency.
-/// TODO: Compute stats is the only stmt that requires child queries. Once the
-/// CatalogService performs background stats gathering the concept of child queries
-/// will likely become obsolete. Remove all child-query related code from this class.
-class ImpalaServer::QueryExecState {
- public:
-  QueryExecState(const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
-      ImpalaServer* server, std::shared_ptr<ImpalaServer::SessionState> session);
-
-  ~QueryExecState();
-
-  /// Initiates execution of a exec_request.
-  /// Non-blocking.
-  /// Must *not* be called with lock_ held.
-  Status Exec(TExecRequest* exec_request);
-
-  /// Execute a HiveServer2 metadata operation
-  /// TODO: This is likely a superset of GetTableNames/GetDbs. Coalesce these different
-  /// code paths.
-  Status Exec(const TMetadataOpRequest& exec_request);
-
-  /// Call this to ensure that rows are ready when calling FetchRows(). Updates the
-  /// query_status_, and advances query_state_ to FINISHED or EXCEPTION. Must be preceded
-  /// by call to Exec(). Waits for all child queries to complete. Takes lock_.
-  void Wait();
-
-  /// Calls Wait() asynchronously in a thread and returns immediately.
-  void WaitAsync();
-
-  /// BlockOnWait() may be called after WaitAsync() has been called in order to wait
-  /// for the asynchronous thread (wait_thread_) to complete. It is safe to call this
-  /// from multiple threads (all threads will block until wait_thread_ has completed)
-  /// and multiple times (non-blocking once wait_thread_ has completed). Do not call
-  /// while holding lock_.
-  void BlockOnWait();
-
-  /// Return at most max_rows from the current batch. If the entire current batch has
-  /// been returned, fetch another batch first.
-  /// Caller needs to hold fetch_rows_lock_ and lock_.
-  /// Caller should verify that EOS has not be reached before calling.
-  /// Must be preceeded by call to Wait() (or WaitAsync()/BlockOnWait()).
-  /// Also updates query_state_/status_ in case of error.
-  Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows);
-
-  /// Resets the state of this query such that the next fetch() returns results from the
-  /// beginning of the query result set (by using the using result_cache_).
-  /// It is valid to call this function for any type of statement that returns a result
-  /// set, including queries, show stmts, compute stats, etc.
-  /// Returns a recoverable error status if the restart is not possible, ok() otherwise.
-  /// The error is recoverable to allow clients to resume fetching.
-  /// The caller must hold fetch_rows_lock_ and lock_.
-  Status RestartFetch();
-
-  /// Update query state if the requested state isn't already obsolete. This is only for
-  /// non-error states - if the query encounters an error the query status needs to be set
-  /// with information about the error so UpdateQueryStatus must be used instead.
-  /// Takes lock_.
-  void UpdateNonErrorQueryState(beeswax::QueryState::type query_state);
-
-  /// Update the query status and the "Query Status" summary profile string.
-  /// If current status is already != ok, no update is made (we preserve the first error)
-  /// If called with a non-ok argument, the expectation is that the query will be aborted
-  /// quickly.
-  /// Returns the status argument (so we can write
-  /// RETURN_IF_ERROR(UpdateQueryStatus(SomeOperation())).
-  /// Does not take lock_, but requires it: caller must ensure lock_
-  /// is taken before calling UpdateQueryStatus
-  Status UpdateQueryStatus(const Status& status);
-
-  /// Cancels the child queries and the coordinator with the given cause.
-  /// If cause is NULL, assume this was deliberately cancelled by the user.
-  /// Otherwise, sets state to EXCEPTION.
-  /// Does nothing if the query has reached EOS or already cancelled.
-  ///
-  /// Only returns an error if 'check_inflight' is true and the query is not yet
-  /// in-flight. Otherwise, proceed and return Status::OK() even if the query isn't
-  /// in-flight (for cleaning up after an error on the query issuing path).
-  Status Cancel(bool check_inflight, const Status* cause);
-
-  /// This is called when the query is done (finished, cancelled, or failed).
-  /// Takes lock_: callers must not hold lock() before calling.
-  void Done();
-
-  /// Sets the API-specific (Beeswax, HS2) result cache and its size bound.
-  /// The given cache is owned by this query exec state, even if an error is returned.
-  /// Returns a non-ok status if max_size exceeds the per-impalad allowed maximum.
-  Status SetResultCache(QueryResultSet* cache, int64_t max_size);
-
-  ImpalaServer::SessionState* session() const { return session_.get(); }
-
-  /// Queries are run and authorized on behalf of the effective_user.
-  const std::string& effective_user() const {
-      return GetEffectiveUser(query_ctx_.session);
-  }
-  const std::string& connected_user() const { return query_ctx_.session.connected_user; }
-  const std::string& do_as_user() const { return query_ctx_.session.delegated_user; }
-  TSessionType::type session_type() const { return query_ctx_.session.session_type; }
-  const TUniqueId& session_id() const { return query_ctx_.session.session_id; }
-  const std::string& default_db() const { return query_ctx_.session.database; }
-  bool eos() const { return eos_; }
-  Coordinator* coord() const { return coord_.get(); }
-  QuerySchedule* schedule() { return schedule_.get(); }
-
-  /// Resource pool associated with this query, or an empty string if the schedule has not
-  /// been created and had the pool set yet, or this StmtType doesn't go through admission
-  /// control.
-  std::string request_pool() const {
-    return schedule_ == nullptr ? "" : schedule_->request_pool();
-  }
-  int num_rows_fetched() const { return num_rows_fetched_; }
-  void set_fetched_rows() { fetched_rows_ = true; }
-  bool fetched_rows() const { return fetched_rows_; }
-  bool returns_result_set() { return !result_metadata_.columns.empty(); }
-  const TResultSetMetadata* result_metadata() { return &result_metadata_; }
-  const TUniqueId& query_id() const { return query_ctx_.query_id; }
-  const TExecRequest& exec_request() const { return exec_request_; }
-  TStmtType::type stmt_type() const { return exec_request_.stmt_type; }
-  TCatalogOpType::type catalog_op_type() const {
-    return exec_request_.catalog_op_request.op_type;
-  }
-  TDdlType::type ddl_type() const {
-    return exec_request_.catalog_op_request.ddl_params.ddl_type;
-  }
-  boost::mutex* lock() { return &lock_; }
-  boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
-  beeswax::QueryState::type query_state() const { return query_state_; }
-  const Status& query_status() const { return query_status_; }
-  void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; }
-  const RuntimeProfile& profile() const { return profile_; }
-  const RuntimeProfile& summary_profile() const { return summary_profile_; }
-  const TimestampValue& start_time() const { return start_time_; }
-  const TimestampValue& end_time() const { return end_time_; }
-  const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; }
-  const TQueryOptions& query_options() const {
-    return query_ctx_.client_request.query_options;
-  }
-  /// Returns 0:0 if this is a root query
-  TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
-
-  const std::vector<std::string>& GetAnalysisWarnings() const {
-    return exec_request_.analysis_warnings;
-  }
-
-  inline int64_t last_active_ms() const {
-    boost::lock_guard<boost::mutex> l(expiration_data_lock_);
-    return last_active_time_ms_;
-  }
-
-  /// Returns true if Impala is actively processing this query.
-  inline bool is_active() const {
-    boost::lock_guard<boost::mutex> l(expiration_data_lock_);
-    return ref_count_ > 0;
-  }
-
-  RuntimeProfile::EventSequence* query_events() const { return query_events_; }
-  RuntimeProfile* summary_profile() { return &summary_profile_; }
-
- private:
-  const TQueryCtx query_ctx_;
-
-  /// Ensures single-threaded execution of FetchRows(). Callers of FetchRows() are
-  /// responsible for acquiring this lock. To avoid deadlocks, callers must not hold lock_
-  /// while acquiring this lock (since FetchRows() will release and re-acquire lock_ during
-  /// its execution).
-  /// See "Locking" in the class comment for lock acquisition order.
-  boost::mutex fetch_rows_lock_;
-
-  /// Protects last_active_time_ms_ and ref_count_. Only held during short function calls
-  /// - no other locks should be acquired while holding this lock.
-  mutable boost::mutex expiration_data_lock_;
-
-  /// Stores the last time that the query was actively doing work, in Unix milliseconds.
-  int64_t last_active_time_ms_;
-
-  /// ref_count_ > 0 if Impala is currently performing work on this query's behalf. Every
-  /// time a client instructs Impala to do work on behalf of this query, the ref count is
-  /// increased, and decreased once that work is completed.
-  uint32_t ref_count_;
-
-  /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL.
-  const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_;
-
-  /// Protects all following fields. Acquirers should be careful not to hold it for too
-  /// long, e.g. during RPCs because this lock is required to make progress on various
-  /// ImpalaServer requests. If held for too long it can block progress of client
-  /// requests for this query, e.g. query status and cancellation. Furthermore, until
-  /// IMPALA-3882 is fixed, it can indirectly block progress on all other queries.
-  /// See "Locking" in the class comment for lock acquisition order.
-  boost::mutex lock_;
-
-  ExecEnv* exec_env_;
-
-  /// Thread for asynchronously running Wait().
-  boost::scoped_ptr<Thread> wait_thread_;
-
-  /// Condition variable to make BlockOnWait() thread-safe. One thread joins
-  /// wait_thread_, and all other threads block on this cv. Used with lock_.
-  boost::condition_variable block_on_wait_cv_;
-
-  /// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe.
-  bool is_block_on_wait_joining_;
-
-  /// Session that this query is from
-  std::shared_ptr<SessionState> session_;
-
-  /// Resource assignment determined by scheduler. Owned by obj_pool_.
-  boost::scoped_ptr<QuerySchedule> schedule_;
-
-  /// Not set for ddl queries.
-  boost::scoped_ptr<Coordinator> coord_;
-
-  /// Runs statements that query or modify the catalog via the CatalogService.
-  boost::scoped_ptr<CatalogOpExecutor> catalog_op_executor_;
-
-  /// Result set used for requests that return results and are not QUERY
-  /// statements. For example, EXPLAIN, LOAD, and SHOW use this.
-  boost::scoped_ptr<std::vector<TResultRow>> request_result_set_;
-
-  /// Cache of the first result_cache_max_size_ query results to allow clients to restart
-  /// fetching from the beginning of the result set. This cache is appended to in
-  /// FetchInternal(), and set to NULL if its bound is exceeded. If the bound is exceeded,
-  /// then clients cannot restart fetching because some results have been lost since the
-  /// last fetch. Only set if result_cache_max_size_ > 0.
-  boost::scoped_ptr<QueryResultSet> result_cache_;
-
-  /// Max size of the result_cache_ in number of rows. A value <= 0 means no caching.
-  int64_t result_cache_max_size_;
-
-  ObjectPool profile_pool_;
-
-  /// The QueryExecState builds three separate profiles.
-  /// * profile_ is the top-level profile which houses the other
-  ///   profiles, plus the query timeline
-  /// * summary_profile_ contains mostly static information about the
-  ///   query, including the query statement, the plan and the user who submitted it.
-  /// * server_profile_ tracks time spent inside the ImpalaServer,
-  ///   but not inside fragment execution, i.e. the time taken to
-  ///   register and set-up the query and for rows to be fetched.
-  //
-  /// There's a fourth profile which is not built here (but is a
-  /// child of profile_); the execution profile which tracks the
-  /// actual fragment execution.
-  RuntimeProfile profile_;
-  RuntimeProfile server_profile_;
-  RuntimeProfile summary_profile_;
-  RuntimeProfile::Counter* row_materialization_timer_;
-
-  /// Tracks how long we are idle waiting for a client to fetch rows.
-  RuntimeProfile::Counter* client_wait_timer_;
-  /// Timer to track idle time for the above counter.
-  MonotonicStopWatch client_wait_sw_;
-
-  RuntimeProfile::EventSequence* query_events_;
-
-  bool is_cancelled_; // if true, Cancel() was called.
-  bool eos_;  // if true, there are no more rows to return
-  // We enforce the invariant that query_status_ is not OK iff query_state_
-  // is EXCEPTION, given that lock_ is held.
-  beeswax::QueryState::type query_state_;
-  Status query_status_;
-  TExecRequest exec_request_;
-
-  TResultSetMetadata result_metadata_; // metadata for select query
-  RowBatch* current_batch_; // the current row batch; only applicable if coord is set
-  int current_batch_row_; // number of rows fetched within the current batch
-  int num_rows_fetched_; // number of rows fetched by client for the entire query
-
-  /// True if a fetch was attempted by a client, regardless of whether a result set
-  /// (or error) was returned to the client.
-  bool fetched_rows_;
-
-  /// To get access to UpdateCatalog, LOAD, and DDL methods. Not owned.
-  Frontend* frontend_;
-
-  /// The parent ImpalaServer; called to wait until the the impalad has processed a
-  /// catalog update request. Not owned.
-  ImpalaServer* parent_server_;
-
-  /// Start/end time of the query
-  TimestampValue start_time_, end_time_;
-
-  /// Executes a local catalog operation (an operation that does not need to execute
-  /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
-  Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op);
-
-  /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently not
-  /// doing any work. Takes expiration_data_lock_.
-  void MarkInactive();
-
-  /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently being
-  /// actively processed. Takes expiration_data_lock_.
-  void MarkActive();
-
-  /// Core logic of initiating a query or dml execution request.
-  /// Initiates execution of plan fragments, if there are any, and sets
-  /// up the output exprs for subsequent calls to FetchRows().
-  /// 'coord_' is only valid after this method is called, and may be invalid if it
-  /// returns an error.
-  /// Also sets up profile and pre-execution counters.
-  /// Non-blocking.
-  Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request);
-
-  /// Core logic of executing a ddl statement. May internally initiate execution of
-  /// queries (e.g., compute stats) or dml (e.g., create table as select)
-  Status ExecDdlRequest();
-
-  /// Executes a LOAD DATA
-  Status ExecLoadDataRequest();
-
-  /// Core logic of Wait(). Does not update query_state_/status_.
-  Status WaitInternal();
-
-  /// Core logic of FetchRows(). Does not update query_state_/status_.
-  /// Caller needs to hold fetch_rows_lock_ and lock_.
-  Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows);
-
-  /// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in
-  /// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'.
-  /// result and scales must have been resized to the number of columns before call.
-  Status GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales);
-
-  /// Gather and publish all required updates to the metastore
-  Status UpdateCatalog();
-
-  /// Copies results into request_result_set_
-  /// TODO: Have the FE return list<Data.TResultRow> so that this isn't necessary
-  void SetResultSet(const TDdlExecResponse* ddl_resp);
-  void SetResultSet(const std::vector<std::string>& results);
-  void SetResultSet(const std::vector<std::string>& col1,
-      const std::vector<std::string>& col2);
-  void SetResultSet(const std::vector<std::string>& col1,
-      const std::vector<std::string>& col2, const std::vector<std::string>& col3,
-      const std::vector<std::string>& col4);
-
-  /// Sets the result set for a CREATE TABLE AS SELECT statement. The results will not be
-  /// ready until all BEs complete execution. This can be called as part of Wait(),
-  /// at which point results will be avilable.
-  void SetCreateTableAsSelectResultSet();
-
-  /// Updates the metastore's table and column statistics based on the child-query results
-  /// of a compute stats command.
-  /// TODO: Unify the various ways that the Metastore is updated for DDL/DML.
-  /// For example, INSERT queries update partition metadata in UpdateCatalog() using a
-  /// TUpdateCatalogRequest, whereas our DDL uses a TCatalogOpRequest for very similar
-  /// purposes. Perhaps INSERT should use a TCatalogOpRequest as well.
-  Status UpdateTableAndColumnStats(const std::vector<ChildQuery*>& child_queries);
-
-  /// Sets result_cache_ to NULL and updates its associated metrics and mem consumption.
-  /// This function is a no-op if the cache has already been cleared.
-  void ClearResultCache();
-};
-
-}
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/testutil/desc-tbl-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/desc-tbl-builder.cc b/be/src/testutil/desc-tbl-builder.cc
index 77be724..b4b0a1f 100644
--- a/be/src/testutil/desc-tbl-builder.cc
+++ b/be/src/testutil/desc-tbl-builder.cc
@@ -61,7 +61,7 @@ DescriptorTbl* DescriptorTblBuilder::Build() {
   DCHECK(buildDescTblStatus.ok()) << buildDescTblStatus.GetDetail();
 
   DescriptorTbl* desc_tbl;
-  Status status = DescriptorTbl::Create(obj_pool_, thrift_desc_tbl_, &desc_tbl);
+  Status status = DescriptorTbl::Create(obj_pool_, thrift_desc_tbl_, nullptr, &desc_tbl);
   DCHECK(status.ok()) << status.GetDetail();
   return desc_tbl;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/testutil/fault-injection-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h
index c99001c..99816a4 100644
--- a/be/src/testutil/fault-injection-util.h
+++ b/be/src/testutil/fault-injection-util.h
@@ -30,8 +30,8 @@ namespace impala {
 #ifndef NDEBUG
   enum RpcCallType {
     RPC_NULL = 0,
-    RPC_EXECPLANFRAGMENT,
-    RPC_CANCELPLANFRAGMENT,
+    RPC_EXECQUERYFINSTANCES,
+    RPC_CANCELQUERYFINSTANCES,
     RPC_PUBLISHFILTER,
     RPC_UPDATEFILTER,
     RPC_TRANSMITDATA,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/error-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/error-util-test.cc b/be/src/util/error-util-test.cc
index 2390630..0c940cb 100644
--- a/be/src/util/error-util-test.cc
+++ b/be/src/util/error-util-test.cc
@@ -47,7 +47,7 @@ TEST(ErrorMsg, MergeMap) {
   right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p");
   right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 3;
 
-  MergeErrorMaps(&left, right);
+  MergeErrorMaps(right, &left);
   ASSERT_EQ(2, left.size());
   ASSERT_EQ(2, left[TErrorCode::GENERAL].messages.size());
 
@@ -55,7 +55,7 @@ TEST(ErrorMsg, MergeMap) {
   right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p");
   right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 3;
 
-  MergeErrorMaps(&left, right);
+  MergeErrorMaps(right, &left);
   ASSERT_EQ(2, left.size());
   ASSERT_EQ(2, left[TErrorCode::GENERAL].messages.size());
   ASSERT_EQ(6, left[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
@@ -74,7 +74,7 @@ TEST(ErrorMsg, MergeMap) {
   ASSERT_EQ(2, cleared.size());
   ASSERT_EQ(1, cleared.count(TErrorCode::RPC_RECV_TIMEOUT));
 
-  MergeErrorMaps(&dummy, cleared);
+  MergeErrorMaps(cleared, &dummy);
   ASSERT_EQ(3, dummy.size());
   ASSERT_EQ(3, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
   ASSERT_EQ(1, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());
@@ -84,7 +84,7 @@ TEST(ErrorMsg, MergeMap) {
   ASSERT_EQ(0, dummy[TErrorCode::GENERAL].count);
   ASSERT_EQ(1, dummy[TErrorCode::GENERAL].messages.size());
 
-  MergeErrorMaps(&cleared, dummy);
+  MergeErrorMaps(dummy, &cleared);
   ASSERT_EQ(3, cleared.size());
   ASSERT_EQ(3, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
   ASSERT_EQ(1, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/error-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/error-util.cc b/be/src/util/error-util.cc
index db9aef6..69a0355 100644
--- a/be/src/util/error-util.cc
+++ b/be/src/util/error-util.cc
@@ -159,9 +159,9 @@ string PrintErrorMapToString(const ErrorLogMap& errors) {
   return stream.str();
 }
 
-void MergeErrorMaps(ErrorLogMap* left, const ErrorLogMap& right) {
-  for (const ErrorLogMap::value_type& v: right) {
-    TErrorLogEntry& target = (*left)[v.first];
+void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2) {
+  for (const ErrorLogMap::value_type& v: m1) {
+    TErrorLogEntry& target = (*m2)[v.first];
     const TErrorLogEntry& source = v.second;
     // Append generic message, append specific codes or increment count if exists
     if (v.first == TErrorCode::GENERAL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/error-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/error-util.h b/be/src/util/error-util.h
index f245c2c..c366ab2 100644
--- a/be/src/util/error-util.h
+++ b/be/src/util/error-util.h
@@ -139,11 +139,11 @@ private:
 /// Track log messages per error code.
 typedef std::map<TErrorCode::type, TErrorLogEntry> ErrorLogMap;
 
-/// Merge error maps. Merging of error maps occurs, when the errors from multiple backends
-/// are merged into a single error map.  General log messages are simply appended,
-/// specific errors are deduplicated by either appending a new instance or incrementing
-/// the count of an existing one.
-void MergeErrorMaps(ErrorLogMap* left, const ErrorLogMap& right);
+/// Merge error map m1 into m2. Merging of error maps occurs when the errors from
+/// multiple backends are merged into a single error map.  General log messages are
+/// simply appended, specific errors are deduplicated by either appending a new
+/// instance or incrementing the count of an existing one.
+void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2);
 
 /// Append an error to the error map. Performs the aggregation as follows: GENERAL errors
 /// are appended to the list of GENERAL errors, to keep one item each in the map, while

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/util/uid-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index 37f804a..1f57298 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -78,6 +78,10 @@ inline int32_t GetInstanceIdx(const TUniqueId& fragment_instance_id) {
   return fragment_instance_id.lo & FRAGMENT_IDX_MASK;
 }
 
+inline bool IsValidFInstanceId(const TUniqueId& fragment_instance_id) {
+  return fragment_instance_id.hi != 0L;
+}
+
 inline TUniqueId CreateInstanceId(
     const TUniqueId& query_id, int32_t instance_idx) {
   DCHECK_EQ(GetInstanceIdx(query_id), 0);  // well-formed query id

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/common/thrift/ExecStats.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 8e88b20..68a8fd8 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -43,6 +43,7 @@ struct TExecStats {
 
   // Total CPU time spent across all threads. For operators that have an async
   // component (e.g. multi-threaded) this will be >= latency_ns.
+  // TODO-MT: remove this or latency_ns
   2: optional i64 cpu_time_ns
 
   // Number of rows returned.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 433f3b4..b17aeec 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -280,8 +280,17 @@ struct TClientRequest {
   3: optional string redacted_stmt
 }
 
+// Debug options: perform some action in a particular phase of a particular node
+// TODO: find a better name
+struct TDebugOptions {
+  1: optional Types.TPlanNodeId node_id
+  2: optional PlanNodes.TExecNodePhase phase
+  3: optional PlanNodes.TDebugAction action
+}
+
 // Context of this query, including the client request, session state and
 // global query parameters needed for consistent expr evaluation (e.g., now()).
+//
 // TODO: Separate into FE/BE initialized vars.
 struct TQueryCtx {
   // Client request containing stmt to execute and query options.
@@ -326,8 +335,7 @@ struct TQueryCtx {
   // This defaults to -1 when no timestamp is specified.
   11: optional i64 snapshot_timestamp = -1;
 
-  // Contains only the union of those descriptors referenced by list of fragments destined
-  // for a single host. Optional for frontend tests.
+  // Optional for frontend tests.
   12: optional Descriptors.TDescriptorTable desc_tbl
 
   // Milliseconds since UNIX epoch at the start of query execution.
@@ -340,12 +348,31 @@ struct TQueryCtx {
 
   // List of tables with scan ranges that map to blocks with missing disk IDs.
   15: optional list<CatalogObjects.TTableName> tables_missing_diskids
+
+  // The pool to which this request has been submitted. Used to update pool statistics
+  // for admission control.
+  16: optional string request_pool
+}
+
+// Specification of one output destination of a plan fragment
+struct TPlanFragmentDestination {
+  // the globally unique fragment instance id
+  1: required Types.TUniqueId fragment_instance_id
+
+  // ... which is being executed on this server
+  2: required Types.TNetworkAddress server
 }
 
 // Context to collect information, which is shared among all instances of that plan
 // fragment.
 struct TPlanFragmentCtx {
   1: required Planner.TPlanFragment fragment
+
+  // Output destinations, one per output partition.
+  // The partitioning of the output is specified by
+  // TPlanFragment.output_sink.output_partition.
+  // The number of output partitions is destinations.size().
+  2: list<TPlanFragmentDestination> destinations
 }
 
 // A scan range plus the parameters needed to execute that scan.
@@ -356,56 +383,37 @@ struct TScanRangeParams {
   4: optional bool is_remote
 }
 
-// Specification of one output destination of a plan fragment
-struct TPlanFragmentDestination {
-  // the globally unique fragment instance id
-  1: required Types.TUniqueId fragment_instance_id
-
-  // ... which is being executed on this server
-  2: required Types.TNetworkAddress server
-}
-
-// Execution parameters of a fragment instance, including its unique id, the total number
-// of fragment instances, the query context, the coordinator address, etc.
-// TODO: for range partitioning, we also need to specify the range boundaries
+// Execution parameters of a single fragment instance.
 struct TPlanFragmentInstanceCtx {
+  // TPlanFragment.idx
+  1: required Types.TFragmentIdx fragment_idx
+
   // The globally unique fragment instance id.
   // Format: query id + query-wide fragment instance index
-  // The query-wide fragment instance index starts at 0, so that the query id
-  // and the id of the first fragment instance are identical.
+  // The query-wide fragment instance index enumerates all fragment instances of a
+  // particular query. It starts at 0, so that the query id and the id of the first
+  // fragment instance are identical.
   // If there is a coordinator instance, it is the first one, with index 0.
-  1: required Types.TUniqueId fragment_instance_id
+  // Range: [0, TExecQueryFInstancesParams.fragment_instance_ctxs.size()-1]
+  2: required Types.TUniqueId fragment_instance_id
 
-  // Index of this fragment instance accross all instances of its parent fragment,
-  // range [0, TPlanFragmentCtx.num_fragment_instances).
-  2: required i32 per_fragment_instance_idx
+  // Index of this fragment instance across all instances of its parent fragment
+  // (TPlanFragment with idx = TPlanFragmentInstanceCtx.fragment_idx).
+  // Range: [0, <# of instances of parent fragment> - 1]
+  3: required i32 per_fragment_instance_idx
 
   // Initial scan ranges for each scan node in TPlanFragment.plan_tree
-  3: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
+  4: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
 
   // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
   // needed to create a DataStreamRecvr
   // TODO for per-query exec rpc: move these to TPlanFragmentCtx
-  4: required map<Types.TPlanNodeId, i32> per_exch_num_senders
+  5: required map<Types.TPlanNodeId, i32> per_exch_num_senders
 
-  // Output destinations, one per output partition.
-  // The partitioning of the output is specified by
-  // TPlanFragment.output_sink.output_partition.
-  // The number of output partitions is destinations.size().
-  // TODO for per-query exec rpc: move these to TPlanFragmentCtx
-  5: list<TPlanFragmentDestination> destinations
-
-  // Debug options: perform some action in a particular phase of a particular node
-  6: optional Types.TPlanNodeId debug_node_id
-  7: optional PlanNodes.TExecNodePhase debug_phase
-  8: optional PlanNodes.TDebugAction debug_action
-
-  // The pool to which this request has been submitted. Used to update pool statistics
-  // for admission control.
-  9: optional string request_pool
+  // Id of this instance in its role as a sender.
+  6: optional i32 sender_id
 
-  // Id of this fragment in its role as a sender.
-  10: optional i32 sender_id
+  7: optional TDebugOptions debug_options
 }
 
 
@@ -415,29 +423,37 @@ enum ImpalaInternalServiceVersion {
   V1
 }
 
+// The following contains the per-rpc structs for the parameters and the result.
 
-// ExecPlanFragment
+// ExecQueryFInstances
 
-struct TExecPlanFragmentParams {
+struct TExecQueryFInstancesParams {
   1: required ImpalaInternalServiceVersion protocol_version
 
-  // Context of the query, which this fragment is part of.
-  2: optional TQueryCtx query_ctx
+  // this backend's index into Coordinator::backend_states_,
+  // needed for subsequent rpcs to the coordinator
+  // required in V1
+  2: optional i32 coord_state_idx
 
-  // Context of this fragment.
-  3: optional TPlanFragmentCtx fragment_ctx
+  // required in V1
+  3: optional TQueryCtx query_ctx
 
-  // Context of this fragment instance, including its instance id, the total number
-  // fragment instances, the query context, etc.
-  4: optional TPlanFragmentInstanceCtx fragment_instance_ctx
+  // required in V1
+  4: list<TPlanFragmentCtx> fragment_ctxs
+
+  // the order corresponds to the order of fragments in fragment_ctxs
+  // required in V1
+  5: list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
 }
 
-struct TExecPlanFragmentResult {
+struct TExecQueryFInstancesResult {
   // required in V1
   1: optional Status.TStatus status
 }
 
+
 // ReportExecStatus
+
 struct TParquetInsertStats {
   // For each column, the on disk byte size
   1: required map<string, i64> per_column_size
@@ -509,33 +525,43 @@ struct TErrorLogEntry {
   2: list<string> messages
 }
 
-struct TReportExecStatusParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // required in V1
-  2: optional Types.TUniqueId query_id
-
+struct TFragmentInstanceExecStatus {
   // required in V1
-  3: optional Types.TUniqueId fragment_instance_id
+  1: optional Types.TUniqueId fragment_instance_id
 
   // Status of fragment execution; any error status means it's done.
   // required in V1
-  4: optional Status.TStatus status
+  2: optional Status.TStatus status
 
   // If true, fragment finished executing.
   // required in V1
-  5: optional bool done
+  3: optional bool done
 
   // cumulative profile
   // required in V1
-  6: optional RuntimeProfile.TRuntimeProfileTree profile
+  4: optional RuntimeProfile.TRuntimeProfileTree profile
+}
+
+struct TReportExecStatusParams {
+  1: required ImpalaInternalServiceVersion protocol_version
 
-  // Cumulative structural changes made by a table sink
+  // required in V1
+  2: optional Types.TUniqueId query_id
+
+  // same as TExecQueryFInstancesParams.coord_state_idx
+  // required in V1
+  3: optional i32 coord_state_idx
+
+  4: list<TFragmentInstanceExecStatus> instance_exec_status
+
+  // Cumulative structural changes made by the table sink of any instance
+  // included in instance_exec_status
   // optional in V1
-  7: optional TInsertExecStatus insert_exec_status;
+  5: optional TInsertExecStatus insert_exec_status;
 
-  // New errors that have not been reported to the coordinator
-  8: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
+  // New errors that have not been reported to the coordinator by any of the
+  // instances included in instance_exec_status
+  6: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
 }
 
 struct TReportExecStatusResult {
@@ -544,16 +570,16 @@ struct TReportExecStatusResult {
 }
 
 
-// CancelPlanFragment
+// CancelQueryFInstances
 
-struct TCancelPlanFragmentParams {
+struct TCancelQueryFInstancesParams {
   1: required ImpalaInternalServiceVersion protocol_version
 
   // required in V1
-  2: optional Types.TUniqueId fragment_instance_id
+  2: optional Types.TUniqueId query_id
 }
 
-struct TCancelPlanFragmentResult {
+struct TCancelQueryFInstancesResult {
   // required in V1
   1: optional Status.TStatus status
 }
@@ -573,7 +599,7 @@ struct TTransmitDataParams {
   // required in V1
   4: optional Types.TPlanNodeId dest_node_id
 
-  // required in V1
+  // optional in V1
   5: optional Results.TRowBatch row_batch
 
   // if set to true, indicates that no more row batches will be sent
@@ -587,6 +613,7 @@ struct TTransmitDataResult {
 }
 
 // Parameters for RequestPoolService.resolveRequestPool()
+// TODO: why is this here?
 struct TResolveRequestPoolParams {
   // User to resolve to a pool via the allocation placement policy and
   // authorize for pool access.
@@ -611,6 +638,7 @@ struct TResolveRequestPoolResult {
 }
 
 // Parameters for RequestPoolService.getPoolConfig()
+// TODO: why is this here?
 struct TPoolConfigParams {
   // Pool name
   1: required string pool
@@ -655,48 +683,68 @@ struct TBloomFilter {
   4: required bool always_true
 }
 
-struct TUpdateFilterResult {
 
-}
+// UpdateFilter
 
 struct TUpdateFilterParams {
+  1: required ImpalaInternalServiceVersion protocol_version
+
   // Filter ID, unique within a query.
-  1: required i32 filter_id
+  // required in V1
+  2: optional i32 filter_id
 
   // Query that this filter is for.
-  2: required Types.TUniqueId query_id
+  // required in V1
+  3: optional Types.TUniqueId query_id
 
-  3: required TBloomFilter bloom_filter
+  // required in V1
+  4: optional TBloomFilter bloom_filter
 }
 
-struct TPublishFilterResult {
-
+struct TUpdateFilterResult {
 }
 
+
+// PublishFilter
+
 struct TPublishFilterParams {
+  1: required ImpalaInternalServiceVersion protocol_version
+
   // Filter ID to update
-  1: required i32 filter_id
+  // required in V1
+  2: optional i32 filter_id
 
-  // ID of fragment to receive this filter
-  2: required Types.TUniqueId dst_instance_id
+  // required in V1
+  3: optional Types.TUniqueId dst_query_id
+
+  // Index of fragment to receive this filter
+  // required in V1
+  4: optional Types.TFragmentIdx dst_fragment_idx
 
   // Actual bloom_filter payload
-  3: required TBloomFilter bloom_filter
+  // required in V1
+  5: optional TBloomFilter bloom_filter
 }
 
+struct TPublishFilterResult {
+}
+
+
 service ImpalaInternalService {
-  // Called by coord to start asynchronous execution of plan fragment in backend.
+  // Called by coord to start asynchronous execution of a query's fragment instances in
+  // backend.
   // Returns as soon as all incoming data streams have been set up.
-  TExecPlanFragmentResult ExecPlanFragment(1:TExecPlanFragmentParams params);
+  TExecQueryFInstancesResult ExecQueryFInstances(1:TExecQueryFInstancesParams params);
 
-  // Periodically called by backend to report status of plan fragment execution
+  // Periodically called by backend to report status of fragment instance execution
   // back to coord; also called when execution is finished, for whatever reason.
   TReportExecStatusResult ReportExecStatus(1:TReportExecStatusParams params);
 
-  // Called by coord to cancel execution of a single plan fragment, which this
-  // coordinator initiated with a prior call to ExecPlanFragment.
+  // Called by coord to cancel execution of a single query's fragment instances, which
+  // the coordinator initiated with a prior call to ExecQueryFInstances.
   // Cancellation is asynchronous.
-  TCancelPlanFragmentResult CancelPlanFragment(1:TCancelPlanFragmentParams params);
+  TCancelQueryFInstancesResult CancelQueryFInstances(
+      1:TCancelQueryFInstancesParams params);
 
   // Called by sender to transmit single row batch. Returns error indication
   // if params.fragmentId or params.destNodeId are unknown or if data couldn't be read.
@@ -706,7 +754,7 @@ service ImpalaInternalService {
   // the coordinator for aggregation and broadcast.
   TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);
 
-  // Called by the coordinator to deliver global runtime filters to fragment instances for
+  // Called by the coordinator to deliver global runtime filters to fragments for
   // application at plan nodes.
   TPublishFilterResult PublishFilter(1:TPublishFilterParams params);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/tests/common/test_result_verifier.py
----------------------------------------------------------------------
diff --git a/tests/common/test_result_verifier.py b/tests/common/test_result_verifier.py
index a816eaf..7e929f1 100644
--- a/tests/common/test_result_verifier.py
+++ b/tests/common/test_result_verifier.py
@@ -490,7 +490,6 @@ def compute_aggregation(function, field, runtime_profile):
   field_regex_re = re.compile(field_regex)
   inside_avg_fragment = False
   avg_fragment_indent = None
-  past_avg_fragment = False
   match_list = []
   for line in runtime_profile.splitlines():
     # Detect the boundaries of the averaged fragment by looking at indentation.
@@ -498,18 +497,17 @@ def compute_aggregation(function, field, runtime_profile):
     # its children are at a greater indent. When the indentation gets back to
     # the level of the the averaged fragment start, then the averaged fragment
     # is done.
+    if start_avg_fragment_re.match(line):
+      inside_avg_fragment = True
+      avg_fragment_indent = len(line) - len(line.lstrip())
+      continue
+
     if inside_avg_fragment:
       indentation = len(line) - len(line.lstrip())
       if indentation > avg_fragment_indent:
         continue
       else:
         inside_avg_fragment = False
-        past_avg_fragment = True
-
-    if not past_avg_fragment and start_avg_fragment_re.match(line):
-      inside_avg_fragment = True
-      avg_fragment_indent = len(line) - len(line.lstrip())
-      continue
 
     if (field_regex_re.search(line)):
       match_list.extend(re.findall(field_regex, line))