You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/10/08 19:23:16 UTC

[impala] branch master updated: IMPALA-8962: FETCH_ROWS_TIMEOUT_MS should apply before rows are available

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new c47fca5  IMPALA-8962: FETCH_ROWS_TIMEOUT_MS should apply before rows are available
c47fca5 is described below

commit c47fca5960b5be1a8e2013c4c4ffe260e98a1bff
Author: stakiar <st...@cloudera.com>
AuthorDate: Mon Sep 23 11:48:16 2019 -0700

    IMPALA-8962: FETCH_ROWS_TIMEOUT_MS should apply before rows are available
    
    IMPALA-7312 added the query option FETCH_ROWS_TIMEOUT_MS, but it only
    applies to fetch requests against a query that has already transitioned
    to the 'FINISHED' state. This patch changes the timeout so that it
    applies to queries in the 'RUNNING' state as well. Before this patch,
    fetch requests issued while a query was 'RUNNING' blocked until the query
    transitioned to the 'FINISHED' state, and then it fetched results and
    returned them. After this patch, fetch requests against queries in the
    'RUNNING' state will block for 'FETCH_ROWS_TIMEOUT_MS' and then return.
    
    For HS2 clients, fetch requests that return while a query is 'RUNNING'
    set their TStatusCode to STILL_EXECUTING_STATUS. For Beeswax clients,
    fetch requests that return while a query is 'RUNNING' set the 'ready'
    flag to false. For both clients, hasMoreRows is set to true.
    
    If the following sequence of events occurs:
    * A fetch request is issued and blocks on a 'RUNNING' query
    * The query transitions to the 'FINISHED' state
    * The fetch request attempts to read multiple batches
    Then the time spent waiting for the query to finish is deducted from
    the timeout used when waiting for rows to be produced by the Coordinator
    fragment.
    
    Fixed a bug in the current usage of FETCH_ROWS_TIMEOUT_MS where the
    time units for FETCH_ROWS_TIMEOUT_MS and MonotonicStopWatch were not
    being converted properly.
    
    Tests:
    * Moved existing fetch timeout tests from hs2/test_fetch.py into a new
    test file hs2/test_fetch_timeout.py.
    * Added several new tests to hs2/test_fetch_timeout.py to validate that
    the timeout is applied to 'RUNNING' queries and that the timeout applies
    across a 'RUNNING' and 'FINISHED' query.
    * Added new tests to query_test/test_fetch.py to validate the timeout
    while using the Beeswax protocol.
    
    Change-Id: I2cba6bf062dcc1af19471d21857caa797c1ea4a4
    Reviewed-on: http://gerrit.cloudera.org:8080/14332
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/status.cc                 |  13 +-
 be/src/exec/blocking-plan-root-sink.cc  |  40 ++--
 be/src/exec/blocking-plan-root-sink.h   |   4 +-
 be/src/exec/buffered-plan-root-sink.cc  |  30 +--
 be/src/exec/buffered-plan-root-sink.h   |   4 +-
 be/src/exec/plan-root-sink.cc           |   4 +-
 be/src/exec/plan-root-sink.h            |  18 +-
 be/src/runtime/coordinator.cc           |  20 +-
 be/src/runtime/coordinator.h            |   8 +-
 be/src/service/client-request-state.cc  |  39 +++-
 be/src/service/client-request-state.h   |  29 ++-
 be/src/service/impala-beeswax-server.cc |  13 +-
 be/src/service/impala-hs2-server.cc     |  14 +-
 be/src/util/error-util.cc               |   2 -
 common/thrift/ImpalaService.thrift      |   3 +-
 tests/hs2/hs2_test_suite.py             |  27 ++-
 tests/hs2/test_fetch.py                 | 105 +---------
 tests/hs2/test_fetch_timeout.py         | 332 ++++++++++++++++++++++++++++++++
 tests/query_test/test_fetch.py          |  47 +++++
 19 files changed, 563 insertions(+), 189 deletions(-)

diff --git a/be/src/common/status.cc b/be/src/common/status.cc
index b2a617f..eca2104 100644
--- a/be/src/common/status.cc
+++ b/be/src/common/status.cc
@@ -25,6 +25,8 @@
 #include "gen-cpp/common.pb.h"
 #include "gen-cpp/ErrorCodes_types.h"
 
+using namespace apache::hive::service::cli::thrift;
+
 namespace impala {
 
 const char* Status::LLVM_CLASS_NAME = "class.impala::Status";
@@ -156,12 +158,11 @@ Status& Status::operator=(const TStatus& status) {
 }
 
 Status::Status(const apache::hive::service::cli::thrift::TStatus& hs2_status)
-  : msg_(
-      hs2_status.statusCode
-        == apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS ? NULL
-          : new ErrorMsg(HS2TStatusCodeToTErrorCode(hs2_status.statusCode),
-              hs2_status.errorMessage)) {
-}
+  : msg_(hs2_status.statusCode == TStatusCode::SUCCESS_STATUS
+                || hs2_status.statusCode == TStatusCode::STILL_EXECUTING_STATUS ?
+            NULL :
+            new ErrorMsg(HS2TStatusCodeToTErrorCode(hs2_status.statusCode),
+                hs2_status.errorMessage)) {}
 
 Status Status::Expected(const ErrorMsg& error_msg) {
   return Status(error_msg, true);
diff --git a/be/src/exec/blocking-plan-root-sink.cc b/be/src/exec/blocking-plan-root-sink.cc
index d839415..b81d290 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -107,10 +107,11 @@ void BlockingPlanRootSink::Cancel(RuntimeState* state) {
   consumer_cv_.NotifyAll();
 }
 
-Status BlockingPlanRootSink::GetNext(
-    RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
+Status BlockingPlanRootSink::GetNext(RuntimeState* state, QueryResultSet* results,
+    int num_results, bool* eos, int64_t timeout_us) {
   // Used to track how long the consumer waits for RowBatches to be produced and
   // materialized.
+  DCHECK_GE(timeout_us, 0);
   MonotonicStopWatch wait_timeout_timer;
   wait_timeout_timer.Start();
 
@@ -130,21 +131,26 @@ Status BlockingPlanRootSink::GetNext(
   // result set.
   while (sender_state_ == SenderState::ROWS_PENDING && results_ != nullptr
       && !state->is_cancelled() && !timed_out) {
-    // It is possible for the timeout to expire, and for the QueryResultSet to still have
-    // some rows appended to it. This can happen if the producer acquires the lock, the
-    // timeout expires, and then the producer appends rows to the QueryResultSet. This
-    // does not affect correctness because the producer always sets 'results_' to nullptr
-    // if it appends any rows to the QueryResultSet and it always appends either an entire
-    // RowBatch, or as many rows as requested.
-    uint64_t wait_duration = max(static_cast<uint64_t>(1),
-        PlanRootSink::fetch_rows_timeout_us() - wait_timeout_timer.ElapsedTime());
-    if (!consumer_cv_.WaitFor(l, wait_duration)) {
-      timed_out = true;
-
-      // If the consumer timed out, make sure results_ is set to nullptr because the
-      // consumer will destroy the current QueryResultSet and create a new one for the
-      // next fetch request.
-      results_ = nullptr;
+    if (timeout_us == 0) {
+      consumer_cv_.Wait(l);
+    } else {
+      // It is possible for the timeout to expire, and for the QueryResultSet to still
+      // have some rows appended to it. This can happen if the producer acquires the lock,
+      // the timeout expires, and then the producer appends rows to the QueryResultSet.
+      // This does not affect correctness because the producer always sets 'results_' to
+      // nullptr if it appends any rows to the QueryResultSet and it always appends either
+      // an entire RowBatch, or as many rows as requested.
+      int64_t wait_duration_us = max(static_cast<int64_t>(1),
+          timeout_us - static_cast<int64_t>(
+                           round(wait_timeout_timer.ElapsedTime() / NANOS_PER_MICRO)));
+      if (!consumer_cv_.WaitFor(l, wait_duration_us)) {
+        timed_out = true;
+
+        // If the consumer timed out, make sure results_ is set to nullptr because the
+        // consumer will destroy the current QueryResultSet and create a new one for the
+        // next fetch request.
+        results_ = nullptr;
+      }
     }
   }
 
diff --git a/be/src/exec/blocking-plan-root-sink.h b/be/src/exec/blocking-plan-root-sink.h
index b6d6c23..38e4439 100644
--- a/be/src/exec/blocking-plan-root-sink.h
+++ b/be/src/exec/blocking-plan-root-sink.h
@@ -68,8 +68,8 @@ class BlockingPlanRootSink : public PlanRootSink {
 
   /// Only a single RowBatch is passed from the producer at a time, so QueryResultSet will
   /// only be filled up to 'min(num_rows, batch->num_rows())'.
-  virtual Status GetNext(
-      RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos) override;
+  virtual Status GetNext(RuntimeState* state, QueryResultSet* result_set, int num_rows,
+      bool* eos, int64_t timeout_us) override;
 
   /// Notifies both consumer and producer threads so they can check the cancellation
   /// status.
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 9fbb82c..6427bf6 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -143,11 +143,12 @@ void BufferedPlanRootSink::Cancel(RuntimeState* state) {
   batch_queue_has_capacity_.NotifyAll();
 }
 
-Status BufferedPlanRootSink::GetNext(
-    RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
+Status BufferedPlanRootSink::GetNext(RuntimeState* state, QueryResultSet* results,
+    int num_results, bool* eos, int64_t timeout_us) {
   {
     // Used to track how long the consumer waits for RowBatches to be produced and
     // materialized.
+    DCHECK_GE(timeout_us, 0);
     MonotonicStopWatch wait_timeout_timer;
     wait_timeout_timer.Start();
 
@@ -175,14 +176,19 @@ Status BufferedPlanRootSink::GetNext(
       // Wait for the queue to have rows in it.
       while (!IsCancelledOrClosed(state) && IsQueueEmpty(state)
           && sender_state_ == SenderState::ROWS_PENDING && !timed_out) {
-        // Wait fetch_rows_timeout_us_ - row_batches_get_wait_timer_ microseconds for
-        // rows to become available before returning to the client. Subtracting
-        // wait_timeout_timer ensures the client only ever waits up to
-        // fetch_rows_timeout_us_ microseconds before returning.
-        uint64_t wait_duration = max(static_cast<uint64_t>(1),
-            PlanRootSink::fetch_rows_timeout_us() - wait_timeout_timer.ElapsedTime());
-        SCOPED_TIMER(row_batches_get_wait_timer_);
-        timed_out = !rows_available_.WaitFor(l, wait_duration);
+        if (timeout_us == 0) {
+          rows_available_.Wait(l);
+        } else {
+          // Wait fetch_rows_timeout_us_ - row_batches_get_wait_timer_ microseconds for
+          // rows to become available before returning to the client. Subtracting
+          // wait_timeout_timer ensures the client only ever waits up to
+          // fetch_rows_timeout_us_ microseconds before returning.
+          int64_t wait_duration_us = max(static_cast<int64_t>(1),
+              timeout_us - static_cast<int64_t>(round(
+                               wait_timeout_timer.ElapsedTime() / NANOS_PER_MICRO)));
+          SCOPED_TIMER(row_batches_get_wait_timer_);
+          timed_out = !rows_available_.WaitFor(l, wait_duration_us);
+        }
       }
 
       // If the query was cancelled while the sink was waiting for rows to become
@@ -227,8 +233,8 @@ Status BufferedPlanRootSink::GetNext(
         // Prevent expr result allocations from accumulating.
         expr_results_pool_->Clear();
       }
-      timed_out = timed_out
-          || wait_timeout_timer.ElapsedTime() >= PlanRootSink::fetch_rows_timeout_us();
+      timed_out =
+          timed_out || wait_timeout_timer.ElapsedTime() / NANOS_PER_MICRO >= timeout_us;
       // If we have read all rows, then break out of the while loop.
       *eos = IsGetNextEos(state);
     }
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index 8261e95..2c46e4f 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -70,8 +70,8 @@ class BufferedPlanRootSink : public PlanRootSink {
   /// Blocks until rows are available for consumption. GetNext() always returns 'num_rows'
   /// rows unless (1) there are not enough rows left in the result set to return
   /// 'num_rows' rows, or (2) the value of 'num_rows' exceeds MAX_FETCH_SIZE.
-  virtual Status GetNext(
-      RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos) override;
+  virtual Status GetNext(RuntimeState* state, QueryResultSet* result_set, int num_rows,
+      bool* eos, int64_t timeout_us) override;
 
   /// Notifies both consumer and producer threads so they can check the cancellation
   /// status.
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index efa2b5d..45368f9 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -36,9 +36,7 @@ namespace impala {
 PlanRootSink::PlanRootSink(
     TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
   : DataSink(sink_id, row_desc, "PLAN_ROOT_SINK", state),
-    num_rows_produced_limit_(state->query_options().num_rows_produced_limit),
-    fetch_rows_timeout_us_(
-        MICROS_PER_MILLI * state->query_options().fetch_rows_timeout_ms) {}
+    num_rows_produced_limit_(state->query_options().num_rows_produced_limit) {}
 
 PlanRootSink::~PlanRootSink() {}
 
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 15b1940..ad4dc3e 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -77,8 +77,14 @@ class PlanRootSink : public DataSink {
   /// there are no more rows to consume. If Cancel() or Close() are called concurrently,
   /// GetNext() will return and may not populate 'result_set'. All subsequent calls
   /// after Cancel() or Close() set eos and then return the current query status.
-  virtual Status GetNext(
-      RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos) = 0;
+  /// 'timeout' is the amount of time (in microseonds) this method should wait for enough
+  /// rows to become available before returning (e.g. how long the consumer thread waits
+  /// for the producer thread to produce RowBatches). If the timeout is hit, GetNext() can
+  /// return before adding 'num_rows' rows to 'result_set'. It is possible 0 rows are
+  /// added to the 'result_set' if the producer thread does not produce rows within the
+  /// timeout. A timeout of 0 causes this method to wait indefinitely.
+  virtual Status GetNext(RuntimeState* state, QueryResultSet* result_set, int num_rows,
+      bool* eos, int64_t timeout_us) = 0;
 
   /// Notifies both the consumer and sender that the query has been cancelled so they can
   /// check the cancellation flag in the RuntimeState. The cancellation flag should be set
@@ -106,9 +112,6 @@ class PlanRootSink : public DataSink {
   enum class SenderState { ROWS_PENDING, EOS, CLOSED_NOT_EOS };
   SenderState sender_state_ = SenderState::ROWS_PENDING;
 
-  /// Returns the FETCH_ROWS_TIMEOUT_MS value for this query (converted to microseconds).
-  uint64_t fetch_rows_timeout_us() const { return fetch_rows_timeout_us_; }
-
   /// The number of rows sent to this PlanRootSink via Send(). Initialized in
   /// Prepare().
   RuntimeProfile::Counter* rows_sent_counter_ = nullptr;
@@ -121,11 +124,6 @@ class PlanRootSink : public DataSink {
   /// Limit on the number of rows produced by this query, initialized by the constructor.
   const int64_t num_rows_produced_limit_;
 
-  /// Timeout, in microseconds, when waiting for rows to become available. How long the
-  /// consumer thread waits for the producer thread to produce RowBatches. Derived from
-  /// query option FETCH_ROWS_TIMEOUT_MS.
-  const uint64_t fetch_rows_timeout_us_;
-
   /// Updated by CheckRowsProducedLimit() to indicate the total number of rows produced
   /// by query execution.
   int64_t num_rows_produced_ = 0;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 9fc79cb..2b5a13c 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -666,7 +666,8 @@ Status Coordinator::Wait() {
   return Status::OK();
 }
 
-Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
+Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos,
+    int64_t block_on_wait_time_us) {
   VLOG_ROW << "GetNext() query_id=" << PrintId(query_id());
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
@@ -680,7 +681,22 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
   DCHECK(coord_sink_ != nullptr)     << "Exec() should be called first";
   RuntimeState* runtime_state = coord_instance_->runtime_state();
 
-  Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos);
+  // If FETCH_ROWS_TIMEOUT_MS is 0, then the timeout passed to PlanRootSink::GetNext()
+  // should be 0 as well so that the method waits for rows indefinitely.
+  // If the first row has been fetched, then set the timeout to FETCH_ROWS_TIMEOUT_MS. If
+  // the first row has not been fetched, then it is possible the client spent time
+  // waiting for the query to 'finish' before issuing a GetNext() request.
+  int64_t timeout_us;
+  if (parent_request_state_->fetch_rows_timeout_us() == 0) {
+    timeout_us = 0;
+  } else {
+    timeout_us = !first_row_fetched_ ?
+        max(static_cast<int64_t>(1),
+            parent_request_state_->fetch_rows_timeout_us() - block_on_wait_time_us) :
+        parent_request_state_->fetch_rows_timeout_us();
+  }
+
+  Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos, timeout_us);
   if (!first_row_fetched_ && results->size() > 0) {
     query_events_->MarkEvent("First row fetched");
     first_row_fetched_ = true;
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index e984927..5650403 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -120,11 +120,15 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// rows, but will not return more. If *eos is true, all rows have been returned.
   /// Returns a non-OK status if an error was encountered either locally or by any of
   /// the executing backends, or if the query was cancelled via Cancel().  After *eos
-  /// is true, subsequent calls to GetNext() will be a no-op.
+  /// is true, subsequent calls to GetNext() will be a no-op. 'block_on_wait_time_us' is
+  /// the amount of time the client spent (in microseconds) waiting in BlockOnWait(). It
+  /// is used to set the correct timeout value for
+  /// PlanRootSink::GetNext(..., int64_t timeout_us).
   ///
   /// GetNext() is not thread-safe: multiple threads must not make concurrent GetNext()
   /// calls.
-  Status GetNext(QueryResultSet* results, int max_rows, bool* eos) WARN_UNUSED_RESULT;
+  Status GetNext(QueryResultSet* results, int max_rows, bool* eos,
+      int64_t block_on_wait_time_us) WARN_UNUSED_RESULT;
 
   /// Cancel execution of query and sets the overall query status to CANCELLED if the
   /// query is still executing. Idempotent.
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 9328ddd..9f66a1b 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -103,7 +103,8 @@ ClientRequestState::ClientRequestState(
     summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary")),
     frontend_(frontend),
     parent_server_(server),
-    start_time_us_(UnixMicros()) {
+    start_time_us_(UnixMicros()),
+    fetch_rows_timeout_us_(MICROS_PER_MILLI * query_options().fetch_rows_timeout_ms) {
 #ifndef NDEBUG
   profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
       "DEBUG build of Impala. Use RELEASE builds to measure query performance.");
@@ -721,7 +722,9 @@ void ClientRequestState::Done() {
   MarkActive();
   // Make sure we join on wait_thread_ before we finish (and especially before this object
   // is destroyed).
-  BlockOnWait();
+  int64_t block_on_wait_time_us = 0;
+  BlockOnWait(0, &block_on_wait_time_us);
+  DCHECK_EQ(block_on_wait_time_us, 0);
 
   // Update latest observed Kudu timestamp stored in the session from the coordinator.
   // Needs to take the session_ lock which must not be taken while holding lock_, so this
@@ -782,12 +785,28 @@ Status ClientRequestState::WaitAsync() {
       &ClientRequestState::Wait, this, &wait_thread_, false);
 }
 
-void ClientRequestState::BlockOnWait() {
+bool ClientRequestState::BlockOnWait(int64_t timeout_us, int64_t* block_on_wait_time_us) {
+  DCHECK_GE(timeout_us, 0);
   unique_lock<mutex> l(lock_);
+  *block_on_wait_time_us = 0;
   // Some metadata operations like GET_COLUMNS do not rely on WaitAsync() to launch
   // the wait thread. In such cases this method is expected to be a no-op.
-  if (wait_thread_.get() == nullptr) return;
-  while (!is_wait_done_) block_on_wait_cv_.Wait(l);
+  if (wait_thread_.get() == nullptr) return true;
+  while (!is_wait_done_) {
+    if (timeout_us == 0) {
+      block_on_wait_cv_.Wait(l);
+      return true;
+    } else {
+      MonotonicStopWatch wait_timeout_timer;
+      wait_timeout_timer.Start();
+      bool notified = block_on_wait_cv_.WaitFor(l, timeout_us);
+      if (notified) {
+        *block_on_wait_time_us = wait_timeout_timer.ElapsedTime() / NANOS_PER_MICRO;
+      }
+      return notified;
+    }
+  }
+  return true;
 }
 
 void ClientRequestState::Wait() {
@@ -868,12 +887,13 @@ Status ClientRequestState::WaitInternal() {
 }
 
 Status ClientRequestState::FetchRows(const int32_t max_rows,
-    QueryResultSet* fetched_rows) {
+    QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) {
   // Pause the wait timer, since the client has instructed us to do work on its behalf.
   MarkActive();
 
   // ImpalaServer::FetchInternal has already taken our lock_
-  discard_result(UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows)));
+  discard_result(UpdateQueryStatus(
+      FetchRowsInternal(max_rows, fetched_rows, block_on_wait_time_us)));
 
   MarkInactive();
   return query_status_;
@@ -931,7 +951,7 @@ Status ClientRequestState::UpdateQueryStatus(const Status& status) {
 }
 
 Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
-    QueryResultSet* fetched_rows) {
+    QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) {
   // Wait() guarantees that we've transitioned at least to FINISHED_STATE (and any
   // state beyond that should have a non-OK query_status_ set).
   DCHECK(operation_state_ == TOperationState::FINISHED_STATE);
@@ -982,7 +1002,8 @@ Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
     // concurrently.
     // TODO: Simplify this.
     lock_.unlock();
-    Status status = coordinator->GetNext(fetched_rows, max_coord_rows, &eos_);
+    Status status =
+        coordinator->GetNext(fetched_rows, max_coord_rows, &eos_, block_on_wait_time_us);
     lock_.lock();
     int num_fetched = fetched_rows->size() - before;
     DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute(
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index f1b05c2..97ea18c 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -105,8 +105,12 @@ class ClientRequestState {
   /// for the asynchronous thread (wait_thread_) to signal block_on_wait_cv_. It is
   /// thread-safe and all the caller threads will block until wait_thread_ has
   /// completed) and multiple times (non-blocking once wait_thread_ has completed).
-  /// Do not call while holding lock_.
-  void BlockOnWait();
+  /// Do not call while holding lock_. 'timeout' is the amount of time (in microseconds)
+  /// that the thread waits for WaitAsync() to complete before returning. If WaitAsync()
+  /// completed within the timeout, this method returns true, false otherwise. A value of
+  /// 0 causes this method to wait indefinitely. 'block_on_wait_time_us_' is the amount of
+  /// time the client spent (in microseconds) waiting in BlockOnWait().
+  bool BlockOnWait(int64_t timeout_us, int64_t* block_on_wait_time_us);
 
   /// Return at most max_rows from the current batch. If the entire current batch has
   /// been returned, fetch another batch first.
@@ -114,8 +118,10 @@ class ClientRequestState {
   /// Caller should verify that EOS has not be reached before calling.
   /// Must be preceeded by call to Wait() (or WaitAsync()/BlockOnWait()).
   /// Also updates operation_state_/query_status_ in case of error.
-  Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows)
-      WARN_UNUSED_RESULT;
+  /// 'block_on_wait_time_us' is the amount of time spent waiting in BlockOnWait(). It
+  /// should be 0 if BlockOnWait() was never called.
+  Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows,
+      int64_t block_on_wait_time_us) WARN_UNUSED_RESULT;
 
   /// Resets the state of this query such that the next fetch() returns results from the
   /// beginning of the query result set (by using the using result_cache_).
@@ -277,6 +283,10 @@ class ClientRequestState {
   const TDdlExecResponse* ddl_exec_response() const {
     return catalog_op_executor_->ddl_exec_response();
   }
+
+  /// Returns the FETCH_ROWS_TIMEOUT_MS value for this query (converted to microseconds).
+  int64_t fetch_rows_timeout_us() const { return fetch_rows_timeout_us_; }
+
 protected:
   /// Updates the end_time_us_ of this query if it isn't set. The end time is determined
   /// when this function is called for the first time, calling it multiple times does not
@@ -482,6 +492,10 @@ protected:
   /// coordinator relases its admission control resources.
   AtomicInt64 end_time_us_{0};
 
+  /// Timeout, in microseconds, when waiting for rows to become available. Derived from
+  /// the query option FETCH_ROWS_TIMEOUT_MS.
+  const int64_t fetch_rows_timeout_us_;
+
   /// Executes a local catalog operation (an operation that does not need to execute
   /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
   Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op) WARN_UNUSED_RESULT;
@@ -518,9 +532,10 @@ protected:
   Status WaitInternal() WARN_UNUSED_RESULT;
 
   /// Core logic of FetchRows(). Does not update operation_state_/query_status_.
-  /// Caller needs to hold fetch_rows_lock_ and lock_.
-  Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows)
-      WARN_UNUSED_RESULT;
+  /// Caller needs to hold fetch_rows_lock_ and lock_. 'block_on_wait_time_us_' is the
+  /// amount of time the client spent (in microseconds) waiting in BlockOnWait().
+  Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows,
+      int64_t block_on_wait_time_us) WARN_UNUSED_RESULT;
 
   /// Gather and publish all required updates to the metastore.
   /// For transactional queries:
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 2515394..4304ce3 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -537,7 +537,15 @@ Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
   // ensures that rows are ready to be fetched (e.g., Wait() opens
   // ClientRequestState::output_exprs_, which are evaluated in
   // ClientRequestState::FetchRows() below).
-  request_state->BlockOnWait();
+  int64_t block_on_wait_time_us = 0;
+  if (!request_state->BlockOnWait(
+          request_state->fetch_rows_timeout_us(), &block_on_wait_time_us)) {
+    query_results->__set_ready(false);
+    query_results->__set_has_more(true);
+    query_results->__isset.columns = false;
+    query_results->__isset.data = false;
+    return Status::OK();
+  }
 
   lock_guard<mutex> frl(*request_state->fetch_rows_lock());
   lock_guard<mutex> l(*request_state->lock());
@@ -578,7 +586,8 @@ Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
   if (!request_state->eos()) {
     scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateAsciiQueryResultSet(
         *request_state->result_metadata(), &query_results->data));
-    fetch_rows_status = request_state->FetchRows(fetch_size, result_set.get());
+    fetch_rows_status =
+        request_state->FetchRows(fetch_size, result_set.get(), block_on_wait_time_us);
   }
   query_results->__set_has_more(!request_state->eos());
   query_results->__isset.data = true;
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 56be626..31b5fa9 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -186,7 +186,14 @@ Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
   // ensures that rows are ready to be fetched (e.g., Wait() opens
   // ClientRequestState::output_exprs_, which are evaluated in
   // ClientRequestState::FetchRows() below).
-  request_state->BlockOnWait();
+  int64_t block_on_wait_time_us = 0;
+  if (!request_state->BlockOnWait(
+          request_state->fetch_rows_timeout_us(), &block_on_wait_time_us)) {
+    fetch_results->status.__set_statusCode(thrift::TStatusCode::STILL_EXECUTING_STATUS);
+    fetch_results->__set_hasMoreRows(true);
+    fetch_results->__isset.results = false;
+    return Status::OK();
+  }
 
   lock_guard<mutex> frl(*request_state->fetch_rows_lock());
   lock_guard<mutex> l(*request_state->lock());
@@ -209,7 +216,8 @@ Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
       TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version;
   scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateHS2ResultSet(
       version, *(request_state->result_metadata()), &(fetch_results->results)));
-  RETURN_IF_ERROR(request_state->FetchRows(fetch_size, result_set.get()));
+  RETURN_IF_ERROR(
+      request_state->FetchRows(fetch_size, result_set.get(), block_on_wait_time_us));
   fetch_results->__isset.results = true;
   fetch_results->__set_hasMoreRows(!request_state->eos());
   return Status::OK();
@@ -852,8 +860,8 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
       discard_result(UnregisterQuery(query_id, false, &status));
     }
     HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
+    return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
   }
-  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
 }
 
 void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
diff --git a/be/src/util/error-util.cc b/be/src/util/error-util.cc
index 34f0356..27fde1f 100644
--- a/be/src/util/error-util.cc
+++ b/be/src/util/error-util.cc
@@ -231,8 +231,6 @@ TErrorCode::type HS2TStatusCodeToTErrorCode(const TStatusCode::type& hs2Code) {
   // So we return a "GENERAL" error type for ERROR_STATUS code. This lets the callers
   // pick their own error message for substitution.
   switch (hs2Code) {
-    case TStatusCode::SUCCESS_STATUS:
-      return TErrorCode::OK;
     case TStatusCode::ERROR_STATUS:
       return TErrorCode::GENERAL;
     default:
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 2006f97..a80eb45 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -458,7 +458,8 @@ enum TImpalaQueryOptions {
   // become available and materialize). When result spooling is enabled, a fetch request
   // to may read multiple RowBatches, in which case, the timeout controls how long the
   // client waits for all returned RowBatches to be produced. If the timeout is hit, the
-  // client returns whatever rows it has already read. Defaults to 10000 milliseconds.
+  // client returns whatever rows it has already read. Defaults to 10000 milliseconds. A
+  // value of 0 causes fetch requests to wait indefinitely.
   FETCH_ROWS_TIMEOUT_MS = 93
 
   // For testing purposes only. This can provide a datetime string to use as now() for
diff --git a/tests/hs2/hs2_test_suite.py b/tests/hs2/hs2_test_suite.py
index f89aaac..ef3875a 100644
--- a/tests/hs2/hs2_test_suite.py
+++ b/tests/hs2/hs2_test_suite.py
@@ -168,7 +168,8 @@ class HS2TestSuite(ImpalaTestSuite):
     close_op_resp = self.hs2_client.CloseOperation(close_op_req)
     assert close_op_resp.status.statusCode == TCLIService.TStatusCode.SUCCESS_STATUS
 
-  def get_num_rows(self, result_set):
+  @staticmethod
+  def get_num_rows(result_set):
     # rows will always be set, so the only way to tell if we should use it is to see if
     # any columns are set
     if result_set.columns is None or len(result_set.columns) == 0:
@@ -182,6 +183,21 @@ class HS2TestSuite(ImpalaTestSuite):
 
     assert False
 
+  def fetch(self, fetch_results_req):
+    """Wrapper around ImpalaHiveServer2Service.FetchResults(fetch_results_req) that
+    issues the given fetch request until the TCLIService.TStatusCode transitions from
+    STILL_EXECUTING_STATUS to SUCCESS_STATUS. If a fetch response contains the
+    STILL_EXECUTING_STATUS then rows are not yet available for consumption (e.g. the
+    query is still running and has not produced any rows yet). This status may be
+    returned to the client if the FETCH_ROWS_TIMEOUT_MS is hit."""
+    fetch_results_resp = None
+    while fetch_results_resp is None or \
+        fetch_results_resp.status.statusCode == \
+          TCLIService.TStatusCode.STILL_EXECUTING_STATUS:
+      fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
+    HS2TestSuite.check_response(fetch_results_resp)
+    return fetch_results_resp
+
   def fetch_at_most(self, handle, orientation, size, expected_num_rows = None):
     """Fetches at most size number of rows from the query identified by the given
     operation handle. Uses the given fetch orientation. Asserts that the fetch returns a
@@ -194,8 +210,7 @@ class HS2TestSuite(ImpalaTestSuite):
     fetch_results_req.operationHandle = handle
     fetch_results_req.orientation = orientation
     fetch_results_req.maxRows = size
-    fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
-    HS2TestSuite.check_response(fetch_results_resp)
+    fetch_results_resp = self.fetch(fetch_results_req)
     if expected_num_rows is not None:
       assert self.get_num_rows(fetch_results_resp.results) == expected_num_rows
     return fetch_results_resp
@@ -212,16 +227,14 @@ class HS2TestSuite(ImpalaTestSuite):
     fetch_results_req.operationHandle = handle
     fetch_results_req.orientation = orientation
     fetch_results_req.maxRows = size
-    fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
-    HS2TestSuite.check_response(fetch_results_resp)
+    fetch_results_resp = self.fetch(fetch_results_req)
     num_rows_fetched = self.get_num_rows(fetch_results_resp.results)
     if expected_num_rows is None: expected_num_rows = size
     while num_rows_fetched < expected_num_rows:
       # Always try to fetch at most 'size'
       fetch_results_req.maxRows = size - num_rows_fetched
       fetch_results_req.orientation = TCLIService.TFetchOrientation.FETCH_NEXT
-      fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
-      HS2TestSuite.check_response(fetch_results_resp)
+      fetch_results_resp = self.fetch(fetch_results_req)
       last_fetch_size = self.get_num_rows(fetch_results_resp.results)
       assert last_fetch_size > 0
       num_rows_fetched += last_fetch_size
diff --git a/tests/hs2/test_fetch.py b/tests/hs2/test_fetch.py
index 392aa11..747adfb 100644
--- a/tests/hs2/test_fetch.py
+++ b/tests/hs2/test_fetch.py
@@ -143,8 +143,7 @@ class TestFetch(HS2TestSuite):
     fetch_results_req = TCLIService.TFetchResultsReq()
     fetch_results_req.operationHandle = execute_statement_resp.operationHandle
     fetch_results_req.maxRows = 1024
-    fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
-    HS2TestSuite.check_response(fetch_results_resp)
+    fetch_results_resp = self.fetch(fetch_results_req)
 
     return fetch_results_resp
 
@@ -219,8 +218,7 @@ class TestFetch(HS2TestSuite):
     fetch_results_req = TCLIService.TFetchResultsReq()
     fetch_results_req.operationHandle = execute_statement_resp.operationHandle
     fetch_results_req.maxRows = 100
-    fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
-    HS2TestSuite.check_response(fetch_results_resp)
+    fetch_results_resp = self.fetch(fetch_results_req)
 
     assert len(fetch_results_resp.results.rows) == 1
     assert fetch_results_resp.results.startRowOffset == 0
@@ -253,8 +251,7 @@ class TestFetch(HS2TestSuite):
     fetch_results_req = TCLIService.TFetchResultsReq()
     fetch_results_req.operationHandle = execute_statement_resp.operationHandle
     fetch_results_req.maxRows = 1
-    fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
-    HS2TestSuite.check_response(fetch_results_resp)
+    fetch_results_resp = self.fetch(fetch_results_req)
     assert fetch_results_resp.results.columns[0].boolVal is not None
 
     assert self.column_results_to_string(
@@ -296,99 +293,3 @@ class TestFetch(HS2TestSuite):
         TCLIService.TGetResultSetMetadataReq(operationHandle=good_handle)))
     HS2TestSuite.check_response(self.hs2_client.CloseOperation(
         TCLIService.TCloseOperationReq(operationHandle=good_handle)))
-
-  @needs_session()
-  def test_fetch_timeout(self):
-    """Test FETCH_ROWS_TIMEOUT_MS with default configs."""
-    self.__test_fetch_timeout()
-    self.__test_fetch_materialization_timeout()
-
-  @needs_session(conf_overlay={'spool_query_results': 'true'})
-  def test_fetch_result_spooling_timeout(self):
-    """Test FETCH_ROWS_TIMEOUT_MS with result spooling enabled, and test that the timeout
-    applies when reading multiple RowBatches."""
-    self.__test_fetch_timeout()
-    self.__test_fetch_materialization_timeout()
-
-    # Validate that the timeout applies when reading multiple RowBatches.
-    num_rows = 100
-    statement = "select id from functional.alltypes limit {0}".format(num_rows)
-    execute_statement_resp = self.execute_statement(statement,
-        conf_overlay={'batch_size': '10',
-                      'debug_action': '0:GETNEXT:DELAY',
-                      'fetch_rows_timeout_ms': '500'})
-    HS2TestSuite.check_response(execute_statement_resp)
-
-    # Issue a fetch request to read all rows, and validate that only a subset of the rows
-    # are returned.
-    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
-        operationHandle=execute_statement_resp.operationHandle, maxRows=num_rows))
-    HS2TestSuite.check_response(fetch_results_resp)
-    num_rows_fetched = self.get_num_rows(fetch_results_resp.results)
-    assert num_rows_fetched > 0 and num_rows_fetched < num_rows
-    assert fetch_results_resp.hasMoreRows
-
-    self.__fetch_remaining(execute_statement_resp.operationHandle,
-        num_rows - num_rows_fetched, statement)
-
-  def __test_fetch_timeout(self):
-    """Test the query option FETCH_ROWS_TIMEOUT_MS by running a query with a DELAY
-    DEBUG_ACTION and a low value for the fetch timeout. Validates that when the timeout
-    is hit, 0 rows are returned."""
-    num_rows = 1
-    statement = "select id from functional.alltypes limit {0}".format(num_rows)
-    execute_statement_resp = self.execute_statement(statement,
-        conf_overlay={'debug_action': '0:GETNEXT:DELAY', 'fetch_rows_timeout_ms': '1'})
-    HS2TestSuite.check_response(execute_statement_resp)
-
-    # Assert that the first fetch request returns 0 rows.
-    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
-        operationHandle=execute_statement_resp.operationHandle, maxRows=1024))
-    HS2TestSuite.check_response(fetch_results_resp)
-    assert self.get_num_rows(fetch_results_resp.results) == 0
-    assert fetch_results_resp.hasMoreRows
-
-    # Assert that all remaining rows can be fetched.
-    self.__fetch_remaining(execute_statement_resp.operationHandle, num_rows, statement)
-
-  def __test_fetch_materialization_timeout(self):
-    """Test the query option FETCH_ROWS_TIMEOUT_MS applies to the time taken to
-    materialize rows. Runs a query with a sleep() which is evaluated during
-    materialization and validates the timeout is applied appropriately."""
-    num_rows = 2
-    statement = "select sleep(2500) from functional.alltypes limit {0}".format(num_rows)
-    execute_statement_resp = self.execute_statement(statement,
-        conf_overlay={'batch_size': '1', 'fetch_rows_timeout_ms': '3750'})
-    HS2TestSuite.check_response(execute_statement_resp)
-
-    # Only one row should be returned because the timeout should be hit after
-    # materializing the first row, but before materializing the second one.
-    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
-        operationHandle=execute_statement_resp.operationHandle, maxRows=2))
-    HS2TestSuite.check_response(fetch_results_resp)
-    assert self.get_num_rows(fetch_results_resp.results) == 1
-
-    # Assert that all remaining rows can be fetched.
-    self.__fetch_remaining(execute_statement_resp.operationHandle, num_rows - 1,
-        statement)
-
-  def __fetch_remaining(self, op_handle, num_rows, statement):
-    """Fetch the remaining rows in the given op_handle and validate that the number of
-    rows returned matches the expected number of rows. If the op_handle does not return
-    the expected number of rows within a timeout, an error is thrown."""
-    # The timeout to wait for fetch requests to fetch all rows.
-    timeout = 10
-
-    start_time = time()
-    num_fetched = 0
-
-    # Fetch results until either the timeout is hit or all rows have been fetched.
-    while num_fetched != num_rows and time() - start_time < timeout:
-      sleep(0.5)
-      fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
-          operationHandle=op_handle, maxRows=1024))
-      HS2TestSuite.check_response(fetch_results_resp)
-      num_fetched += self.get_num_rows(fetch_results_resp.results)
-    if num_fetched != num_rows:
-      raise Timeout("Query {0} did not fetch all results within the timeout {1}"
-                    .format(statement, timeout))
diff --git a/tests/hs2/test_fetch_timeout.py b/tests/hs2/test_fetch_timeout.py
new file mode 100644
index 0000000..01faac7
--- /dev/null
+++ b/tests/hs2/test_fetch_timeout.py
@@ -0,0 +1,332 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+from time import time
+from tests.common.errors import Timeout
+from tests.hs2.hs2_test_suite import (HS2TestSuite, needs_session)
+from TCLIService import TCLIService
+
+
+# Tests for the query option FETCH_ROWS_TIMEOUT_MS, which is the maximum amount of
+# time, in milliseconds, a fetch rows request (TFetchResultsReq) from the client should
+# spend fetching results (including waiting for results to become available and
+# materialize).
+
+class TestFetchTimeout(HS2TestSuite):
+  """This class contains all fetch timeout tests applicable when result spooling is
+  enabled and disabled. Since the fetch timeout code changes based on whether result
+  spooling is enabled, some tests are run both when result spooling is enabled and
+  disabled."""
+
+  @needs_session()
+  def test_fetch_timeout(self):
+    """Delegates to self.__test_fetch_timeout()."""
+    self.__test_fetch_timeout()
+
+  @needs_session(conf_overlay={'spool_query_results': 'true'})
+  def test_fetch_timeout_with_result_spooling(self):
+    """Delegates to self.__test_fetch_timeout()."""
+    self.__test_fetch_timeout()
+
+  def __test_fetch_timeout(self):
+    """Tests FETCH_ROWS_TIMEOUT_MS by running a query that produces RowBatches with a
+    large delay. The test waits for the query to 'finish' and then fetches the first
+    RowBatch, which should always be available since a query is only considered
+    'finished' if rows are available. Subsequent fetches should time out because
+    RowBatch production has been delayed."""
+    # Construct a query where there is a large delay between RowBatch production.
+    num_rows = 2
+    statement = "select bool_col, avg(id) from functional.alltypes group by bool_col " \
+                "having avg(id) != sleep(5000)"
+    execute_statement_resp = self.execute_statement(statement,
+        conf_overlay={'fetch_rows_timeout_ms': '1', 'batch_size': '1', 'num_nodes': '1'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Wait for rows to be available for fetch.
+    get_operation_status_resp = self.wait_for_operation_state(
+        execute_statement_resp.operationHandle,
+        TCLIService.TOperationState.FINISHED_STATE, timeout=30)
+    HS2TestSuite.check_response(get_operation_status_resp)
+
+    # Assert that exactly 1 row can be fetched.
+    FetchTimeoutUtils.fetch_num_rows(self.hs2_client,
+        execute_statement_resp.operationHandle, 1, statement)
+
+    # Assert that the next fetch request times out while waiting for a RowBatch to be
+    # produced.
+    fetch_results_resp = self.hs2_client.FetchResults(
+        TCLIService.TFetchResultsReq(
+            operationHandle=execute_statement_resp.operationHandle, maxRows=num_rows))
+    HS2TestSuite.check_response(fetch_results_resp)
+    num_rows_fetched = HS2TestSuite.get_num_rows(fetch_results_resp.results)
+    assert num_rows_fetched == 0
+    assert fetch_results_resp.hasMoreRows
+    FetchTimeoutUtils.fetch_num_rows(self.hs2_client,
+        execute_statement_resp.operationHandle, 1, statement)
+
+  @needs_session()
+  def test_fetch_materialization_timeout(self):
+    """Delegates to self.__test_fetch_materialization_timeout()."""
+    self.__test_fetch_materialization_timeout()
+
+  @needs_session(conf_overlay={'spool_query_results': 'true'})
+  def test_fetch_materialization_timeout_with_result_spooling(self):
+    """Delegates to self.__test_fetch_materialization_timeout()."""
+    self.__test_fetch_materialization_timeout()
+
+  def __test_fetch_materialization_timeout(self):
+    """Test the query option FETCH_ROWS_TIMEOUT_MS applies to the time taken to
+    materialize rows. Runs a query with a sleep() which is evaluated during
+    materialization and validates the timeout is applied appropriately."""
+    num_rows = 2
+    statement = "select sleep(5000) from functional.alltypes limit {0}".format(num_rows)
+    execute_statement_resp = self.execute_statement(statement,
+        conf_overlay={'batch_size': '1', 'fetch_rows_timeout_ms': '2500'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Wait for rows to be available for fetch.
+    get_operation_status_resp = self.wait_for_operation_state(
+        execute_statement_resp.operationHandle,
+        TCLIService.TOperationState.FINISHED_STATE)
+    HS2TestSuite.check_response(get_operation_status_resp)
+
+    # Only one row should be returned because the timeout should be hit after
+    # materializing the first row, but before materializing the second one.
+    fetch_results_resp = self.hs2_client.FetchResults(
+        TCLIService.TFetchResultsReq(
+            operationHandle=execute_statement_resp.operationHandle, maxRows=2))
+    HS2TestSuite.check_response(fetch_results_resp)
+    assert HS2TestSuite.get_num_rows(fetch_results_resp.results) == 1
+
+    # Assert that all remaining rows can be fetched.
+    FetchTimeoutUtils.fetch_num_rows(self.hs2_client,
+        execute_statement_resp.operationHandle, num_rows - 1, statement)
+
+  @needs_session()
+  def test_fetch_before_finished_timeout(self):
+    """Delegates to self.__test_fetch_before_finished_timeout()."""
+    self.__test_fetch_before_finished_timeout()
+
+  @needs_session(conf_overlay={'spool_query_results': 'true'})
+  def test_fetch_before_finished_timeout_with_result_spooling(self):
+    """Delegates to self.__test_fetch_before_finished_timeout()."""
+    self.__test_fetch_before_finished_timeout()
+
+  def __test_fetch_before_finished_timeout(self):
+    """Tests the query option FETCH_ROWS_TIMEOUT_MS applies to fetch requests before the
+    query has 'finished'. Fetch requests issued before the query has finished, should
+    wait FETCH_ROWS_TIMEOUT_MS before returning. This test runs a query with a DELAY
+    DEBUG_ACTION before Coordinator starts. Fetch requests during this delay should
+    return with 0 rows."""
+    num_rows = 10
+    statement = "select * from functional.alltypes limit {0}".format(num_rows)
+    execute_statement_resp = self.execute_statement(statement,
+        conf_overlay={'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@5000',
+                      'fetch_rows_timeout_ms': '1000'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Assert that the first fetch request returns 0 rows.
+    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
+        operationHandle=execute_statement_resp.operationHandle, maxRows=1024))
+    HS2TestSuite.check_response(fetch_results_resp,
+        expected_status_code=TCLIService.TStatusCode.STILL_EXECUTING_STATUS)
+    assert fetch_results_resp.hasMoreRows
+    assert not fetch_results_resp.results
+
+    get_operation_status_resp = self.wait_for_operation_state(
+        execute_statement_resp.operationHandle,
+        TCLIService.TOperationState.FINISHED_STATE)
+    HS2TestSuite.check_response(get_operation_status_resp)
+
+    # Assert that all remaining rows can be fetched.
+    FetchTimeoutUtils.fetch_num_rows(self.hs2_client,
+        execute_statement_resp.operationHandle, num_rows, statement)
+
+  @needs_session()
+  def test_fetch_finished_timeout(self):
+    self.__test_fetch_finished_timeout()
+
+  @needs_session(conf_overlay={'spool_query_results': 'true'})
+  def test_fetch_finished_timeout_with_result_spooling(self):
+    self.__test_fetch_finished_timeout()
+
+  def __test_fetch_finished_timeout(self):
+    """Tests the query option FETCH_ROWS_TIMEOUT_MS applies to both the time spent
+    waiting for a query to finish and the time spent waiting for RowBatches to be sent,
+    and that the timeout it not reset for in-progress fetch requests when queries
+    transition to the 'finished' state."""
+    num_rows = 20
+    statement = "select sleep(500) from functional.alltypes limit {0}".format(num_rows)
+    execute_statement_resp = self.execute_statement(statement,
+            conf_overlay={'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@5000',
+                'batch_size': '10', 'fetch_rows_timeout_ms': '7500'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Assert that the first fetch request returns 0 rows.
+    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
+        operationHandle=execute_statement_resp.operationHandle, maxRows=1024))
+    HS2TestSuite.check_response(fetch_results_resp)
+    assert fetch_results_resp.hasMoreRows
+    assert HS2TestSuite.get_num_rows(fetch_results_resp.results) == 10
+
+    # Wait for rows to be available for fetch.
+    get_operation_status_resp = self.wait_for_operation_state(
+        execute_statement_resp.operationHandle,
+        TCLIService.TOperationState.FINISHED_STATE)
+    HS2TestSuite.check_response(get_operation_status_resp)
+
+    # Assert that all remaining rows can be fetched.
+    FetchTimeoutUtils.fetch_num_rows(self.hs2_client,
+        execute_statement_resp.operationHandle, num_rows - 10, statement)
+
+  @needs_session()
+  def test_fetch_no_timeout(self):
+    """Delegates to self.__test_fetch_no_timeout()."""
+    self.__test_fetch_no_timeout()
+
+  @needs_session(conf_overlay={'spool_query_results': 'true'})
+  def test_fetch_no_timeout_with_result_spooling(self):
+    """Delegates to self.__test_fetch_no_timeout()."""
+    self.__test_fetch_no_timeout()
+
+  def __test_fetch_no_timeout(self):
+    """Tests setting FETCH_ROWS_TIMEOUT_MS to 0, and validates that fetch requests wait
+    indefinitely when the timeout is 0."""
+    num_rows = 10
+    statement = "select * from functional.alltypes limit {0}".format(num_rows)
+    execute_statement_resp = self.execute_statement(statement,
+        conf_overlay={'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@5000',
+                      'fetch_rows_timeout_ms': '0'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Assert that the first fetch request returns 0 rows.
+    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
+        operationHandle=execute_statement_resp.operationHandle, maxRows=1024))
+    HS2TestSuite.check_response(fetch_results_resp)
+    assert self.get_num_rows(fetch_results_resp.results) == num_rows
+
+
+class TestFetchTimeoutWithResultSpooling(HS2TestSuite):
+  """Tests for FETCH_ROWS_TIMEOUT_MS that are specific to result spooling. Most of these
+  tests rely on the fact that when result spooling is enabled, multiple RowBatches can be
+  fetched at once."""
+
+  @needs_session(conf_overlay={'spool_query_results': 'true'})
+  def test_fetch_multiple_batches_timeout(self):
+    """Validate that FETCH_ROWS_TIMEOUT_MS applies when reading multiple RowBatches.
+    This test runs a query that produces multiple RowBatches with a fixed delay, and
+    asserts that a fetch request to read all rows only reads a subset of the rows (since
+    the timeout should ensure that a single fetch request cannot read all RowBatches)."""
+    num_rows = 500
+    statement = "select id from functional.alltypes limit {0}".format(num_rows)
+    execute_statement_resp = self.execute_statement(statement,
+        conf_overlay={'batch_size': '10',
+                      'debug_action': '0:GETNEXT:DELAY',
+                      'fetch_rows_timeout_ms': '500'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Wait for rows to be available for fetch.
+    get_operation_status_resp = self.wait_for_operation_state(
+        execute_statement_resp.operationHandle,
+        TCLIService.TOperationState.FINISHED_STATE)
+    HS2TestSuite.check_response(get_operation_status_resp)
+
+    # Issue a fetch request to read all rows, and validate that only a subset of the rows
+    # are returned.
+    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
+        operationHandle=execute_statement_resp.operationHandle, maxRows=num_rows))
+    HS2TestSuite.check_response(fetch_results_resp)
+    num_rows_fetched = HS2TestSuite.get_num_rows(fetch_results_resp.results)
+    assert num_rows_fetched > 0 and num_rows_fetched < num_rows
+    assert fetch_results_resp.hasMoreRows
+
+    # Assert that all remaining rows can be fetched.
+    FetchTimeoutUtils.fetch_num_rows(self.hs2_client,
+        execute_statement_resp.operationHandle, num_rows - num_rows_fetched, statement)
+
+  @needs_session(conf_overlay={'spool_query_results': 'true'})
+  def test_multiple_fetch_multiple_batches_timeout(self):
+    """Test the query option FETCH_ROWS_TIMEOUT_MS by running a query with a DELAY
+    DEBUG_ACTION and a low value for the fetch timeout. This test issues fetch requests
+    in a loop until all results have been returned, and validates that some of the fetch
+    requests timed out. It is similar to test_fetch_multiple_batches_timeout except it
+    issues multiple fetch requests that are expected to timeout."""
+    num_rows = 100
+    statement = "select * from functional.alltypes limit {0}".format(num_rows)
+    execute_statement_resp = self.execute_statement(statement,
+        conf_overlay={'batch_size': '1', 'debug_action': '0:GETNEXT:DELAY',
+                      'fetch_rows_timeout_ms': '1'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Wait for rows to be available for fetch.
+    get_operation_status_resp = self.wait_for_operation_state(
+        execute_statement_resp.operationHandle,
+        TCLIService.TOperationState.FINISHED_STATE, timeout=30)
+    HS2TestSuite.check_response(get_operation_status_resp)
+
+    # The timeout to wait for fetch requests to fetch all rows.
+    timeout = 30
+
+    start_time = time()
+    num_fetched = 0
+    num_fetch_requests = 0
+
+    # Fetch results until either the timeout is hit or all rows have been fetched.
+    while num_fetched != num_rows and time() - start_time < timeout:
+      sleep(0.5)
+      fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
+          operationHandle=execute_statement_resp.operationHandle, maxRows=num_rows))
+      HS2TestSuite.check_response(fetch_results_resp)
+      num_fetched += HS2TestSuite.get_num_rows(fetch_results_resp.results)
+      num_fetch_requests += 1
+    if num_fetched != num_rows:
+      raise Timeout("Query {0} did not fetch all results within the timeout {1}"
+                    .format(statement, timeout))
+    # The query produces 100 RowBatches, each batch was delayed 100ms before it was sent
+    # to the PlanRootSink. Each fetch request requested all 100 rows, but since the
+    # timeout is set to such a low value, multiple fetch requests should be necessary to
+    # read all rows.
+    assert num_fetch_requests >= 5
+
+
+class FetchTimeoutUtils():
+  """This class contains all common code used when testing fetch timeouts."""
+
+  @staticmethod
+  def fetch_num_rows(hs2_client, op_handle, num_rows, statement):
+    """Fetch the specified number of rows in the given op_handle and validate that the
+    number of rows returned matches the expected number of rows. If the op_handle does
+    not return the expected number of rows within a timeout, an error is thrown."""
+    # The timeout to wait for fetch requests to fetch all rows.
+    timeout = 30
+
+    start_time = time()
+    num_fetched = 0
+
+    # Fetch results until either the timeout is hit or all rows have been fetched.
+    while num_fetched != num_rows and time() - start_time < timeout:
+      sleep(0.5)
+      fetch_results_resp = hs2_client.FetchResults(
+          TCLIService.TFetchResultsReq(operationHandle=op_handle,
+              maxRows=num_rows - num_fetched))
+      HS2TestSuite.check_response(fetch_results_resp)
+      num_fetched += HS2TestSuite.get_num_rows(fetch_results_resp.results)
+    if num_fetched != num_rows:
+      raise Timeout("Query {0} did not fetch all results within the timeout {1}"
+                    .format(statement, timeout))
+    assert num_fetched == num_rows
diff --git a/tests/query_test/test_fetch.py b/tests/query_test/test_fetch.py
index 755d219..90db9e8 100644
--- a/tests/query_test/test_fetch.py
+++ b/tests/query_test/test_fetch.py
@@ -99,3 +99,50 @@ class TestFetchAndSpooling(ImpalaTestSuite):
     rows_sent_rate = re.search("RowsSentRate: (\d*\.?\d*)", result.runtime_profile)
     assert rows_sent_rate
     assert float(rows_sent_rate.group(1)) > 0
+
+
+class TestFetchTimeout(ImpalaTestSuite):
+  """A few basic tests for FETCH_ROWS_TIMEOUT_MS that are not specific to the HS2 protocol
+  (e.g. in contrast to the tests in tests/hs2/test_fetch_timeout.py). These tests are
+  necessary because part of the FETCH_ROWS_TIMEOUT_MS code is HS2/Beeswax specific.
+  Unlike the tests in hs2/test_fetch_timeout.py, these tests do not validate that
+  individual RPC calls timeout, instead they set a low value for the timeout and assert
+  that the query works end-to-end."""
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestFetchTimeout, cls).add_test_dimensions()
+    # Result fetching should be independent of file format, so only test against
+    # Parquet files.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet')
+    extend_exec_option_dimension(cls, 'spool_query_results', 'true')
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  def test_fetch_timeout(self, vector):
+    """A simple test that runs a query with a low timeout and introduces delays in
+    RowBatch production. Asserts that the query succeeds and returns the expected number
+    of rows."""
+    num_rows = 100
+    query = "select * from functional.alltypes limit {0}".format(num_rows)
+    vector.get_value('exec_option')['batch_size'] = 1
+    vector.get_value('exec_option')['fetch_rows_timeout_ms'] = 1
+    vector.get_value('exec_option')['debug_action'] = '0:GETNEXT:DELAY'
+    results = self.execute_query(query, vector.get_value('exec_option'))
+    assert results.success
+    assert len(results.data) == num_rows
+
+  def test_fetch_before_finished_timeout(self, vector):
+    """Tests that the FETCH_ROWS_TIMEOUT_MS timeout applies to queries that are not in
+    the 'finished' state. Similar to the test tests/hs2/test_fetch_timeout.py::
+    TestFetchTimeout::test_fetch_before_finished_timeout(_with_result_spooling)."""
+    num_rows = 10
+    query = "select * from functional.alltypes limit {0}".format(num_rows)
+    vector.get_value('exec_option')['debug_action'] = 'CRS_BEFORE_COORD_STARTS:SLEEP@5000'
+    vector.get_value('exec_option')['fetch_rows_timeout_ms'] = '1000'
+    results = self.execute_query(query, vector.get_value('exec_option'))
+    assert results.success
+    assert len(results.data) == num_rows