You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/23 04:40:11 UTC

impala git commit: IMPALA-6482: add EXEC_TIME_LIMIT_S option

Repository: impala
Updated Branches:
  refs/heads/master 24b4ed0b2 -> ff3ddb51a


IMPALA-6482: add EXEC_TIME_LIMIT_S option

This is similar to the QUERY_TIMEOUT_S option and shares most of the
implementation. The difference is that the timeout doesn't reset at
any point.

The time limit is measured from the start of query execution,
after the query is admitted, so planning, scheduling and
time spent in admission control is not counted towards the time limit.

Also fix validation of the related QUERY_TIMEOUT_S option, which
previously could ignore invalid input.

Testing:
Added tests for various permutations:
* With and without query_timeout_s set
* With and without result fetching keeping the query active

Change-Id: Id81772ee223ffb64746e241027a5a734a811e1b8
Reviewed-on: http://gerrit.cloudera.org:8080/9227
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ff3ddb51
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ff3ddb51
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ff3ddb51

Branch: refs/heads/master
Commit: ff3ddb51a49dcd38da2e0d1a09a1cc9405649f40
Parents: 24b4ed0
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Feb 6 00:15:22 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 23 04:22:28 2018 +0000

----------------------------------------------------------------------
 be/src/service/client-request-state.cc        |  16 +-
 be/src/service/client-request-state.h         |  45 ++++--
 be/src/service/impala-server.cc               | 110 +++++++++-----
 be/src/service/impala-server.h                |  33 ++++-
 be/src/service/query-options-test.cc          |   4 +-
 be/src/service/query-options.cc               |  27 +++-
 be/src/service/query-options.h                |   3 +-
 common/thrift/ImpalaInternalService.thrift    |   7 +-
 common/thrift/ImpalaService.thrift            |   7 +-
 tests/custom_cluster/test_query_expiration.py | 163 +++++++++++++++++----
 10 files changed, 304 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 64ab950..282f736 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -68,30 +68,16 @@ ClientRequestState::ClientRequestState(
     ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session)
   : query_ctx_(query_ctx),
     last_active_time_ms_(numeric_limits<int64_t>::max()),
-    ref_count_(0L),
     child_query_executor_(new ChildQueryExecutor),
     exec_env_(exec_env),
-    is_block_on_wait_joining_(false),
     session_(session),
-    schedule_(NULL),
-    coord_(NULL),
-    result_cache_max_size_(-1),
     // Profile is assigned name w/ id after planning
     profile_(RuntimeProfile::Create(&profile_pool_, "Query")),
     server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer")),
     summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary")),
-    is_cancelled_(false),
-    eos_(false),
-    query_state_(beeswax::QueryState::CREATED),
-    user_has_profile_access_(true),
-    current_batch_(NULL),
-    current_batch_row_(0),
-    num_rows_fetched_(0),
-    fetched_rows_(false),
     frontend_(frontend),
     parent_server_(server),
-    start_time_us_(UnixMicros()),
-    end_time_us_(0LL) {
+    start_time_us_(UnixMicros()) {
 #ifndef NDEBUG
   profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
       "DEBUG build of Impala. Use RELEASE builds to measure query performance.");

http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index d93512e..0341ec5 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -211,6 +211,16 @@ class ClientRequestState {
     return ref_count_ > 0;
   }
 
+  bool is_expired() const {
+    boost::lock_guard<boost::mutex> l(expiration_data_lock_);
+    return is_expired_;
+  }
+
+  void set_expired() {
+    boost::lock_guard<boost::mutex> l(expiration_data_lock_);
+    is_expired_ = true;
+  }
+
   RuntimeProfile::EventSequence* query_events() const { return query_events_; }
   RuntimeProfile* summary_profile() { return summary_profile_; }
 
@@ -224,8 +234,8 @@ class ClientRequestState {
   /// See "Locking" in the class comment for lock acquisition order.
   boost::mutex fetch_rows_lock_;
 
-  /// Protects last_active_time_ms_ and ref_count_. Only held during short function calls
-  /// - no other locks should be acquired while holding this lock.
+  /// Protects last_active_time_ms_, ref_count_ and is_expired_. Only held during short
+  /// function calls - no other locks should be acquired while holding this lock.
   mutable boost::mutex expiration_data_lock_;
 
   /// Stores the last time that the query was actively doing work, in Unix milliseconds.
@@ -234,7 +244,10 @@ class ClientRequestState {
   /// ref_count_ > 0 if Impala is currently performing work on this query's behalf. Every
   /// time a client instructs Impala to do work on behalf of this query, the ref count is
   /// increased, and decreased once that work is completed.
-  uint32_t ref_count_;
+  uint32_t ref_count_ = 0;
+
+  /// True if the query expired by timing out.
+  bool is_expired_ = false;
 
   /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL.
   const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_;
@@ -258,7 +271,7 @@ class ClientRequestState {
   ConditionVariable block_on_wait_cv_;
 
   /// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe.
-  bool is_block_on_wait_joining_;
+  bool is_block_on_wait_joining_ = false;
 
   /// Session that this query is from
   std::shared_ptr<ImpalaServer::SessionState> session_;
@@ -284,7 +297,7 @@ class ClientRequestState {
   boost::scoped_ptr<QueryResultSet> result_cache_;
 
   /// Max size of the result_cache_ in number of rows. A value <= 0 means no caching.
-  int64_t result_cache_max_size_;
+  int64_t result_cache_max_size_ = -1;
 
   ObjectPool profile_pool_;
 
@@ -320,25 +333,25 @@ class ClientRequestState {
 
   RuntimeProfile::EventSequence* query_events_;
 
-  bool is_cancelled_; // if true, Cancel() was called.
-  bool eos_;  // if true, there are no more rows to return
+  bool is_cancelled_ = false; // if true, Cancel() was called.
+  bool eos_ = false;  // if true, there are no more rows to return
   /// We enforce the invariant that query_status_ is not OK iff query_state_ is EXCEPTION,
   /// given that lock_ is held. query_state_ should only be updated using
   /// UpdateQueryState(), to ensure that the query profile is also updated.
-  beeswax::QueryState::type query_state_;
+  beeswax::QueryState::type query_state_ = beeswax::QueryState::CREATED;
   Status query_status_;
   TExecRequest exec_request_;
   /// If true, effective_user() has access to the runtime profile and execution
   /// summary.
-  bool user_has_profile_access_;
+  bool user_has_profile_access_ = true;
   TResultSetMetadata result_metadata_; // metadata for select query
-  RowBatch* current_batch_; // the current row batch; only applicable if coord is set
-  int current_batch_row_; // number of rows fetched within the current batch
-  int num_rows_fetched_; // number of rows fetched by client for the entire query
+  RowBatch* current_batch_ = nullptr; // the current row batch; only applicable if coord is set
+  int current_batch_row_ = 0 ; // number of rows fetched within the current batch
+  int num_rows_fetched_ = 0; // number of rows fetched by client for the entire query
 
   /// True if a fetch was attempted by a client, regardless of whether a result set
   /// (or error) was returned to the client.
-  bool fetched_rows_;
+  bool fetched_rows_ = false;
 
   /// To get access to UpdateCatalog, LOAD, and DDL methods. Not owned.
   Frontend* frontend_;
@@ -348,10 +361,10 @@ class ClientRequestState {
   ImpalaServer* parent_server_;
 
   /// Start/end time of the query, in Unix microseconds.
-  /// end_time_us_ is initialized to 0 in the constructor, which is used to indicate
-  /// that the query is not yet done. It is assinged the final value in
+  /// end_time_us_ is initialized to 0, which is used to indicate that the query is not
+  /// yet done. It is assinged the final value in
   /// ClientRequestState::Done().
-  int64_t start_time_us_, end_time_us_;
+  int64_t start_time_us_, end_time_us_ = 0;
 
   /// Executes a local catalog operation (an operation that does not need to execute
   /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.

http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c43c850..4452205 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -966,21 +966,31 @@ Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
   // Add query to the set that will be unregistered if sesssion is closed.
   session_state->inflight_queries.insert(query_id);
   ++session_state->total_queries;
-  // Set query expiration.
-  int32_t timeout_s = request_state->query_options().query_timeout_s;
-  if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) {
-    timeout_s = min(FLAGS_idle_query_timeout, timeout_s);
+
+  // If the query has a timeout or time limit, schedule checks.
+  int32_t idle_timeout_s = request_state->query_options().query_timeout_s;
+  if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
+    idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
   } else {
     // Use a non-zero timeout, if one exists
-    timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
+    idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
   }
-  if (timeout_s > 0) {
+  int32_t exec_time_limit_s = request_state->query_options().exec_time_limit_s;
+  if (idle_timeout_s > 0 || exec_time_limit_s > 0) {
     lock_guard<mutex> l2(query_expiration_lock_);
-    VLOG_QUERY << "Query " << PrintId(query_id) << " has timeout of "
-               << PrettyPrinter::Print(timeout_s * 1000L * 1000L * 1000L,
-                     TUnit::TIME_NS);
-    queries_by_timestamp_.insert(
-        make_pair(UnixMillis() + (1000L * timeout_s), query_id));
+    int64_t now = UnixMillis();
+    if (idle_timeout_s > 0) {
+      VLOG_QUERY << "Query " << PrintId(query_id) << " has idle timeout of "
+                 << PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S);
+      queries_by_timestamp_.emplace(ExpirationEvent{
+          now + (1000L * idle_timeout_s), query_id, ExpirationKind::IDLE_TIMEOUT});
+    }
+    if (exec_time_limit_s > 0) {
+      VLOG_QUERY << "Query " << PrintId(query_id) << " has execution time limit of "
+                 << PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S);
+      queries_by_timestamp_.emplace(ExpirationEvent{
+          now + (1000L * exec_time_limit_s), query_id, ExpirationKind::EXEC_TIME_LIMIT});
+    }
   }
   return Status::OK();
 }
@@ -1847,56 +1857,71 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
       ExpirationQueue::iterator expiration_event = queries_by_timestamp_.begin();
       now = UnixMillis();
       while (expiration_event != queries_by_timestamp_.end()) {
-        // If the last-observed expiration time for this query is still in the future, we
-        // know that the true expiration time will be at least that far off. So we can
-        // break here and sleep.
-        if (expiration_event->first > now) break;
+        // 'queries_by_timestamp_' is stored in ascending order of deadline so we can
+        // break out of the loop and sleep as soon as we see a deadline in the future.
+        if (expiration_event->deadline > now) break;
         shared_ptr<ClientRequestState> query_state =
-            GetClientRequestState(expiration_event->second);
-        if (query_state.get() == nullptr) {
-          // Query was deleted some other way.
-          queries_by_timestamp_.erase(expiration_event++);
+            GetClientRequestState(expiration_event->query_id);
+        if (query_state == nullptr || query_state->is_expired()) {
+          // Query was deleted or expired already from a previous expiration event.
+          expiration_event = queries_by_timestamp_.erase(expiration_event);
           continue;
         }
-        // First, check the actual expiration time in case the query has updated it
-        // since the last time we looked.
-        int32_t timeout_s = query_state->query_options().query_timeout_s;
-        if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) {
-          timeout_s = min(FLAGS_idle_query_timeout, timeout_s);
+
+        // If the query time limit expired, we must cancel the query.
+        if (expiration_event->kind == ExpirationKind::EXEC_TIME_LIMIT) {
+          int32_t exec_time_limit_s = query_state->query_options().exec_time_limit_s;
+          VLOG_QUERY << "Expiring query " << expiration_event->query_id
+                     << " due to execution time limit of " << exec_time_limit_s << "s.";
+          const string& err_msg = Substitute(
+              "Query $0 expired due to execution time limit of $1",
+              PrintId(expiration_event->query_id),
+              PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S));
+          ExpireQuery(query_state.get(), Status(err_msg));
+          expiration_event = queries_by_timestamp_.erase(expiration_event);
+          continue;
+        }
+        DCHECK(expiration_event->kind == ExpirationKind::IDLE_TIMEOUT)
+            << static_cast<int>(expiration_event->kind);
+
+        // Now check to see if the idle timeout has expired. We must check the actual
+        // expiration time in case the query has updated 'last_active_ms' since the last
+        // time we looked.
+        int32_t idle_timeout_s = query_state->query_options().query_timeout_s;
+        if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
+          idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
         } else {
           // Use a non-zero timeout, if one exists
-          timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
+          idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
         }
-        int64_t expiration = query_state->last_active_ms() + (timeout_s * 1000L);
+        int64_t expiration = query_state->last_active_ms() + (idle_timeout_s * 1000L);
         if (now < expiration) {
           // If the real expiration date is in the future we may need to re-insert the
           // query's expiration event at its correct location.
-          if (expiration == expiration_event->first) {
+          if (expiration == expiration_event->deadline) {
             // The query hasn't been updated since it was inserted, so we know (by the
             // fact that queries are inserted in-expiration-order initially) that it is
             // still the next query to expire. No need to re-insert it.
             break;
           } else {
             // Erase and re-insert with an updated expiration time.
-            TUniqueId query_id = expiration_event->second;
-            queries_by_timestamp_.erase(expiration_event++);
-            queries_by_timestamp_.insert(make_pair(expiration, query_id));
+            TUniqueId query_id = expiration_event->query_id;
+            expiration_event = queries_by_timestamp_.erase(expiration_event);
+            queries_by_timestamp_.emplace(ExpirationEvent{
+                expiration, query_id, ExpirationKind::IDLE_TIMEOUT});
           }
         } else if (!query_state->is_active()) {
           // Otherwise time to expire this query
           VLOG_QUERY
-              << "Expiring query due to client inactivity: " << expiration_event->second
-              << ", last activity was at: "
+              << "Expiring query due to client inactivity: "
+              << expiration_event->query_id << ", last activity was at: "
               << ToStringFromUnixMillis(query_state->last_active_ms());
           const string& err_msg = Substitute(
               "Query $0 expired due to client inactivity (timeout is $1)",
-              PrintId(expiration_event->second),
-              PrettyPrinter::Print(timeout_s * 1000000000L, TUnit::TIME_NS));
-
-          cancellation_thread_pool_->Offer(
-              CancellationWork(expiration_event->second, Status(err_msg), false));
-          queries_by_timestamp_.erase(expiration_event++);
-          ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
+              PrintId(expiration_event->query_id),
+              PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
+          ExpireQuery(query_state.get(), Status(err_msg));
+          expiration_event = queries_by_timestamp_.erase(expiration_event);
         } else {
           // Iterator is moved on in every other branch.
           ++expiration_event;
@@ -1911,6 +1936,13 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
   }
 }
 
+void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {
+  DCHECK(!status.ok());
+  cancellation_thread_pool_->Offer(CancellationWork(crs->query_id(), status, false));
+  ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
+  crs->set_expired();
+}
+
 Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
    int32_t hs2_port) {
   exec_env_->SetImpalaServer(this);

http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index abe8694..23d4687 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -467,9 +467,10 @@ class ImpalaServer : public ImpalaServiceIf,
   };
 
  private:
+  struct ExpirationEvent;
   friend class ChildQuery;
   friend class ImpalaHttpHandler;
-  friend class SessionState;
+  friend struct SessionState;
 
   boost::scoped_ptr<ImpalaHttpHandler> http_handler_;
 
@@ -816,6 +817,9 @@ class ImpalaServer : public ImpalaServiceIf,
   /// FLAGS_idle_query_timeout seconds.
   [[noreturn]] void ExpireQueries();
 
+  /// Expire 'crs' and cancel it with status 'status'.
+  void ExpireQuery(ClientRequestState* crs, const Status& status);
+
   /// Guards query_log_ and query_log_index_
   boost::mutex query_log_lock_;
 
@@ -1026,16 +1030,31 @@ class ImpalaServer : public ImpalaServiceIf,
   /// acquisition order.
   boost::mutex query_expiration_lock_;
 
-  /// Describes a query expiration event (t, q) where t is the expiration deadline in
-  /// seconds, and q is the query ID.
-  typedef std::pair<int64_t, TUniqueId> ExpirationEvent;
+  enum class ExpirationKind {
+    // The query is cancelled if the query has been inactive this long. The event may
+    // cancel the query after checking the last active time.
+    IDLE_TIMEOUT,
+    // A hard time limit on query execution. The query is cancelled if this event occurs
+    // before the query finishes.
+    EXEC_TIME_LIMIT
+  };
+
+  // Describes a query expiration event where the query identified by 'query_id' is
+  // checked for expiration when UnixMillis() exceeds 'deadline'.
+  struct ExpirationEvent {
+    int64_t deadline;
+    TUniqueId query_id;
+    ExpirationKind kind;
+  };
 
   /// Comparator that breaks ties when two queries have identical expiration deadlines.
   struct ExpirationEventComparator {
     bool operator()(const ExpirationEvent& t1, const ExpirationEvent& t2) {
-      if (t1.first < t2.first) return true;
-      if (t2.first < t1.first) return false;
-      return t1.second < t2.second;
+      if (t1.deadline < t2.deadline) return true;
+      if (t2.deadline < t1.deadline) return false;
+      if (t1.query_id < t2.query_id) return true;
+      if (t2.query_id < t1.query_id) return false;
+      return t1.kind < t2.kind;
     }
   };
 

http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 06be431..3352bd1 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -225,7 +225,9 @@ TEST(QueryOptions, SetIntOptions) {
       {MAKE_OPTIONDEF(mt_dop),                         {0, 64}},
       {MAKE_OPTIONDEF(disable_codegen_rows_threshold), {0, I32_MAX}},
       {MAKE_OPTIONDEF(max_num_runtime_filters),        {0, I32_MAX}},
-      {MAKE_OPTIONDEF(batch_size),                     {0, 65536}}
+      {MAKE_OPTIONDEF(batch_size),                     {0, 65536}},
+      {MAKE_OPTIONDEF(query_timeout_s),                {0, I32_MAX}},
+      {MAKE_OPTIONDEF(exec_time_limit_s),              {0, I32_MAX}},
   };
   for (const auto& test_case : case_set) {
     const OptionDef<int32_t>& option_def = test_case.first;

http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 7c76b10..3c56f89 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -295,9 +295,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_disable_outermost_topn(
             iequals(value, "true") || iequals(value, "1"));
         break;
-      case TImpalaQueryOptions::QUERY_TIMEOUT_S:
-        query_options->__set_query_timeout_s(atoi(value.c_str()));
+      case TImpalaQueryOptions::QUERY_TIMEOUT_S: {
+        StringParser::ParseResult result;
+        const int32_t timeout_s =
+            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || timeout_s < 0) {
+          return Status(
+              Substitute("Invalid query timeout: '$0'. "
+                         "Only non-negative numbers are allowed.", value));
+        }
+        query_options->__set_query_timeout_s(timeout_s);
         break;
+      }
       case TImpalaQueryOptions::BUFFER_POOL_LIMIT: {
         int64_t mem;
         RETURN_IF_ERROR(ParseMemValue(value, "buffer pool limit", &mem));
@@ -596,7 +605,7 @@ Status impala::SetQueryOption(const string& key, const string& value,
         if (result != StringParser::PARSE_SUCCESS || requested_timeout < 0) {
           return Status(
               Substitute("Invalid idle session timeout: '$0'. "
-                         "Only positive numbers are allowed.", value));
+                         "Only non-negative numbers are allowed.", value));
         }
         query_options->__set_idle_session_timeout(requested_timeout);
         break;
@@ -611,6 +620,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_compute_stats_min_sample_size(min_sample_size);
         break;
       }
+      case TImpalaQueryOptions::EXEC_TIME_LIMIT_S: {
+        StringParser::ParseResult result;
+        const int32_t time_limit =
+            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || time_limit < 0) {
+          return Status(
+              Substitute("Invalid query time limit: '$0'. "
+                         "Only non-negative numbers are allowed.", value));
+        }
+        query_options->__set_exec_time_limit_s(time_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";

http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index bb07552..2280cff 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::COMPUTE_STATS_MIN_SAMPLE_SIZE + 1);\
+      TImpalaQueryOptions::EXEC_TIME_LIMIT_S + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\
@@ -128,6 +128,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(idle_session_timeout, IDLE_SESSION_TIMEOUT, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(compute_stats_min_sample_size, COMPUTE_STATS_MIN_SAMPLE_SIZE,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(exec_time_limit_s, EXEC_TIME_LIMIT_S, TQueryOptionLevel::REGULAR)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.

http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 613e4b0..dc37fc2 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -115,7 +115,7 @@ struct TQueryOptions {
   24: optional bool disable_outermost_topn = 0
 
   // Time, in s, before a query will be timed out if it is inactive. May not exceed
-  // --idle_query_timeout if that flag > 0.
+  // --idle_query_timeout if that flag > 0. If 0, falls back to --idle_query_timeout.
   26: optional i32 query_timeout_s = 0
 
   // test hook to cap max memory for spilling operators (to force them to spill).
@@ -267,6 +267,11 @@ struct TQueryOptions {
   // Minimum number of bytes that will be scanned in COMPUTE STATS TABLESAMPLE,
   // regardless of the user-supplied sampling percent. Default value: 1GB
   62: optional i64 compute_stats_min_sample_size = 1073741824;
+
+  // Time limit, in s, before a query will be timed out after it starts executing. Does
+  // not include time spent in planning, scheduling or admission control. A value of 0
+  // means no time limit.
+  63: optional i32 exec_time_limit_s = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 1911fc6..356f5e5 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -143,7 +143,7 @@ enum TImpalaQueryOptions {
   RM_INITIAL_MEM, // Removed
 
   // Time, in s, before a query will be timed out if it is inactive. May not exceed
-  // --idle_query_timeout if that flag > 0.
+  // --idle_query_timeout if that flag > 0. If 0, falls back to --idle_query_timeout.
   QUERY_TIMEOUT_S,
 
   // Test hook for spill to disk operators
@@ -278,6 +278,11 @@ enum TImpalaQueryOptions {
   // Minimum number of bytes that will be scanned in COMPUTE STATS TABLESAMPLE,
   // regardless of the user-supplied sampling percent.
   COMPUTE_STATS_MIN_SAMPLE_SIZE,
+
+  // Time limit, in s, before a query will be timed out after it starts executing. Does
+  // not include time spent in planning, scheduling or admission control. A value of 0
+  // means no time limit.
+  EXEC_TIME_LIMIT_S,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/impala/blob/ff3ddb51/tests/custom_cluster/test_query_expiration.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_query_expiration.py b/tests/custom_cluster/test_query_expiration.py
index df203b3..0e47ce2 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -18,6 +18,7 @@
 # Tests for query expiration.
 
 import pytest
+import re
 import threading
 from time import sleep, time
 
@@ -43,31 +44,69 @@ class TestQueryExpiration(CustomClusterTestSuite):
     impalad = self.cluster.get_first_impalad()
     client = impalad.service.create_beeswax_client()
     num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
-    handle = client.execute_async("SELECT SLEEP(1000000)")
+    handles = []
+
+    # This query will time out with the default idle timeout (6s).
+    query1 = "SELECT SLEEP(1000000)"
+    default_timeout_expire_handle = client.execute_async(query1)
+    handles.append(default_timeout_expire_handle)
+
+    # This query will hit a lower time limit.
+    client.execute("SET EXEC_TIME_LIMIT_S=1")
+    time_limit_expire_handle = client.execute_async(query1);
+    handles.append(time_limit_expire_handle)
+
+    # This query will hit a lower idle timeout instead of the default timeout or time
+    # limit.
+    client.execute("SET EXEC_TIME_LIMIT_S=5")
     client.execute("SET QUERY_TIMEOUT_S=1")
-    handle2 = client.execute_async("SELECT SLEEP(2000000)")
+    short_timeout_expire_handle = client.execute_async("SELECT SLEEP(2000000)")
+    handles.append(short_timeout_expire_handle)
+    client.execute("SET EXEC_TIME_LIMIT_S=0")
 
     # Set a huge timeout, to check that the server bounds it by --idle_query_timeout
     client.execute("SET QUERY_TIMEOUT_S=1000")
-    handle3 = client.execute_async("SELECT SLEEP(3000000)")
-    self._check_num_executing(impalad, 3)
+    default_timeout_expire_handle2 = client.execute_async("SELECT SLEEP(3000000)")
+    handles.append(default_timeout_expire_handle2)
+    self._check_num_executing(impalad, len(handles))
 
     before = time()
     sleep(4)
 
-    # Query with timeout of 1 should have expired, other query should still be running.
-    assert num_expired + 1 == impalad.service.get_metric_value(
+    # Queries with timeout or time limit of 1 should have expired, other queries should
+    # still be running.
+    assert num_expired + 2 == impalad.service.get_metric_value(
       'impala-server.num-queries-expired')
+    assert (client.get_state(short_timeout_expire_handle) ==
+            client.QUERY_STATES['EXCEPTION'])
+    assert (client.get_state(time_limit_expire_handle) ==
+            client.QUERY_STATES['EXCEPTION'])
+    assert (client.get_state(default_timeout_expire_handle) ==
+            client.QUERY_STATES['FINISHED'])
+    assert (client.get_state(default_timeout_expire_handle2) ==
+            client.QUERY_STATES['FINISHED'])
+    self.__expect_expired(client, query1, short_timeout_expire_handle,
+        "Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 1s000ms\)")
+    self.__expect_expired(client, query1, time_limit_expire_handle,
+        "Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 1s000ms")
     self._check_num_executing(impalad, 2)
     self.assert_impalad_log_contains('INFO', "Expiring query due to client inactivity: "
         "[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d")
+    self.assert_impalad_log_contains('INFO',
+        "Expiring query [0-9a-f]+:[0-9a-f]+ due to execution time limit of 1s")
+
+    # Wait until the remaining queries expire. The time limit query will have hit
+    # expirations but only one should be counted.
     impalad.service.wait_for_metric_value('impala-server.num-queries-expired',
-                                          num_expired + 3)
+                                          num_expired + len(handles))
+    assert (client.get_state(default_timeout_expire_handle) ==
+            client.QUERY_STATES['EXCEPTION'])
+    assert (client.get_state(default_timeout_expire_handle2) ==
+            client.QUERY_STATES['EXCEPTION'])
 
     # Check that we didn't wait too long to be expired (double the timeout is sufficiently
     # large to avoid most noise in measurement)
     assert time() - before < 12
-    assert client.get_state(handle) == client.QUERY_STATES['EXCEPTION']
 
     client.execute("SET QUERY_TIMEOUT_S=0")
     # Synchronous execution; calls fetch() and query should not time out.
@@ -77,37 +116,66 @@ class TestQueryExpiration(CustomClusterTestSuite):
 
     # Confirm that no extra expirations happened
     assert impalad.service.get_metric_value('impala-server.num-queries-expired') \
-        == num_expired + 3
+        == len(handles)
     self._check_num_executing(impalad, 0)
+    for handle in handles:
+      try:
+        client.close_query(handle)
+      except Exception, e:
+        # We fetched from some cancelled handles above, which unregistered the queries.
+        assert 'Invalid or unknown query handle' in str(e)
 
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_query_timeout=0")
   def test_query_expiration_no_default(self, vector):
     """Confirm that single queries expire if no default is set, but a per-query
-    expiration is set"""
+    expiration or time limit is set"""
     impalad = self.cluster.get_any_impalad()
     client = impalad.service.create_beeswax_client()
     num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
+    query = "SELECT SLEEP(1000000)"
     client.execute("SET QUERY_TIMEOUT_S=1")
-    handle = client.execute_async("SELECT SLEEP(1000000)")
+    timeout_handle = client.execute_async(query)
+    client.execute("SET QUERY_TIMEOUT_S=0")
+
+    client.execute("SET EXEC_TIME_LIMIT_S=1")
+    time_limit_handle = client.execute_async(query)
+    client.execute("SET EXEC_TIME_LIMIT_S=0")
 
     # Set a huge timeout, server should not expire the query while this test is running
     client.execute("SET QUERY_TIMEOUT_S=1000")
-    handle3 = client.execute_async("SELECT SLEEP(2000000)")
+    no_timeout_handle = client.execute_async(query)
 
     before = time()
     sleep(4)
 
     # Query with timeout of 1 should have expired, other query should still be running.
-    assert num_expired + 1 == impalad.service.get_metric_value(
+    assert num_expired + 2 == impalad.service.get_metric_value(
       'impala-server.num-queries-expired')
 
+    assert client.get_state(timeout_handle) == client.QUERY_STATES['EXCEPTION']
+    assert client.get_state(time_limit_handle) == client.QUERY_STATES['EXCEPTION']
+    assert client.get_state(no_timeout_handle) == client.QUERY_STATES['FINISHED']
+    self.__expect_expired(client, query, timeout_handle,
+        "Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 1s000ms\)")
+    self.__expect_expired(client, query, time_limit_handle,
+        "Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 1s000ms")
+
+  def __expect_expired(self, client, query, handle, exception_regex):
+    """Check that the query handle expired, with an error containing exception_regex"""
+    try:
+      client.fetch(query, handle)
+      assert False
+    except Exception, e:
+      assert re.search(exception_regex, str(e))
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_query_timeout=1")
   def test_concurrent_query_expiration(self, vector):
     """Confirm that multiple concurrent queries are correctly expired if not fetched"""
     class ExpiringQueryThread(threading.Thread):
+      """Thread that runs a query and does not fetch so it will time out."""
       def __init__(self, client):
         super(ExpiringQueryThread, self).__init__()
         self.client = client
@@ -117,6 +185,7 @@ class TestQueryExpiration(CustomClusterTestSuite):
         self.handle = self.client.execute_async("SELECT SLEEP(3000000)")
 
     class NonExpiringQueryThread(threading.Thread):
+      """Thread that runs a query that does not hit the idle timeout."""
       def __init__(self, client):
         super(NonExpiringQueryThread, self).__init__()
         self.client = client
@@ -126,6 +195,37 @@ class TestQueryExpiration(CustomClusterTestSuite):
         result = self.client.execute("SELECT SLEEP(2500)")
         self.success = result.success
 
+    class TimeLimitThread(threading.Thread):
+      """Thread that runs a query that hits a time limit."""
+      def __init__(self, client):
+        super(TimeLimitThread, self).__init__()
+        self.client = client
+        self.success = False
+
+      def run(self):
+        # Query will not be idle but will hit time limit.
+        self.client.execute("SET EXEC_TIME_LIMIT_S=1")
+        try:
+          result = self.client.execute("SELECT SLEEP(2500)")
+          assert "Expected to hit time limit"
+        except Exception, e:
+          self.exception = e
+
+    class NonExpiringTimeLimitThread(threading.Thread):
+      """Thread that runs a query that finishes before time limit."""
+      def __init__(self, client):
+        super(NonExpiringTimeLimitThread, self).__init__()
+        self.client = client
+        self.success = False
+        self.data = None
+
+      def run(self):
+        # Query will complete before time limit.
+        self.client.execute("SET EXEC_TIME_LIMIT_S=10")
+        result = self.client.execute("SELECT count(*) FROM functional.alltypes")
+        self.success = result.success
+        self.data = result.data
+
     impalad = self.cluster.get_any_impalad()
     client = impalad.service.create_beeswax_client()
     num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
@@ -134,18 +234,27 @@ class TestQueryExpiration(CustomClusterTestSuite):
          for _ in xrange(5)]
     expiring_threads = [ExpiringQueryThread(impalad.service.create_beeswax_client())
                         for _ in xrange(5)]
-    all_threads = zip(non_expiring_threads, expiring_threads)
-    for n, e in all_threads:
-      n.start()
-      e.start()
-
-    for n, e in all_threads:
-      n.join()
-      e.join()
-
+    time_limit_threads = [TimeLimitThread(impalad.service.create_beeswax_client())
+                        for _ in xrange(5)]
+    non_expiring_time_limit_threads = [
+        NonExpiringTimeLimitThread(impalad.service.create_beeswax_client())
+        for _ in xrange(5)]
+    all_threads = non_expiring_threads + expiring_threads + time_limit_threads +\
+        non_expiring_time_limit_threads
+    for t in all_threads:
+      t.start()
+    for t in all_threads:
+      t.join()
     impalad.service.wait_for_metric_value('impala-server.num-queries-expired',
-                                          num_expired + 5)
-
-    for n, e in all_threads:
-      assert n.success
-      assert client.get_state(e.handle) == client.QUERY_STATES['EXCEPTION']
+                                          num_expired + 10)
+    for t in non_expiring_threads:
+      assert t.success
+    for t in expiring_threads:
+      assert client.get_state(t.handle) == client.QUERY_STATES['EXCEPTION']
+    for t in time_limit_threads:
+      assert re.search(
+          "Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 1s000ms",
+          str(t.exception))
+    for t in non_expiring_time_limit_threads:
+      assert t.success
+      assert t.data[0] == '7300' # Number of rows in alltypes.