You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2019/08/29 19:42:05 UTC

[impala] branch master updated: IMPALA-8819: BufferedPlanRootSink should handle non-default fetch sizes

This is an automated email from the ASF dual-hosted git repository.

arodoni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6308915  IMPALA-8819: BufferedPlanRootSink should handle non-default fetch sizes
6308915 is described below

commit 6308915a66d837b5545b601ef7f97caa5703c30f
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Aug 19 17:45:30 2019 -0700

    IMPALA-8819: BufferedPlanRootSink should handle non-default fetch sizes
    
    Adds support for non-default fetch sizes when result spooling is enabled
    (the default is to return BATCH_SIZE rows for each fetch request). When
    result spooling is disabled, Impala can only return up to BATCH_SIZE
    rows because it only buffers a single RowBatch at a time. When result
    spooling is enabled, each fetch request returns exactly the number of
    rows requested assuming there are that many rows left in the result set.
    There is also an upper limit on the fetch size to prevent the resulting
    QueryResultSet from getting too big.
    
    Unlike the behavior when result spooling is disabled, fetches do not
    break on RowBatch boundaries. For example, when result spooling is
    disabled, if the fetch size is 10 and the batch size is 15, the second
    fetch will return 5 rows. However, when result spooling is enabled the
    second fetch will return 10 rows (assuming there is another RowBatch to
    read).
    
    Testing:
    * Ran core tests
    * Added new tests to test_result_spooling.py
    * Added new tests to buffered-tuple-stream-test to validate writing to a
    BufferedTupleStream before releasing row batches with 'attach_on_read'
    set to true.
    
    Change-Id: I8dd4b397ab6457a4f85e635f239b2c67130fcce4
    Reviewed-on: http://gerrit.cloudera.org:8080/14129
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/buffered-plan-root-sink.cc       | 113 ++++++++++++++---------
 be/src/exec/buffered-plan-root-sink.h        |  29 +++++-
 be/src/runtime/buffered-tuple-stream-test.cc |  86 ++++++++++++++++++
 be/src/runtime/query-state.h                 |   5 +-
 tests/query_test/test_result_spooling.py     | 129 ++++++++++++++++++++++++---
 5 files changed, 306 insertions(+), 56 deletions(-)

diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 512cb8e..7b14794 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -22,6 +22,12 @@
 
 namespace impala {
 
+const int BufferedPlanRootSink::MAX_FETCH_SIZE;
+
+/// If the fetch size is <= 0, the default number of RowBatches to return in one call to
+/// 'GetNext'.
+const int FETCH_NUM_BATCHES = 10;
+
 BufferedPlanRootSink::BufferedPlanRootSink(TDataSinkId sink_id,
     const RowDescriptor* row_desc, RuntimeState* state,
     const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options)
@@ -39,6 +45,8 @@ Status BufferedPlanRootSink::Prepare(
 
 Status BufferedPlanRootSink::Open(RuntimeState* state) {
   RETURN_IF_ERROR(DataSink::Open(state));
+  current_batch_ =
+      make_unique<RowBatch>(row_desc_, state->batch_size(), mem_tracker());
   batch_queue_.reset(new SpillableRowBatchQueue(name_,
       state->query_options().max_spilled_result_spooling_mem, state, mem_tracker(),
       profile(), row_desc_, resource_profile_, debug_options_));
@@ -105,6 +113,10 @@ void BufferedPlanRootSink::Close(RuntimeState* state) {
   if (sender_state_ == SenderState::ROWS_PENDING) {
     sender_state_ = SenderState::CLOSED_NOT_EOS;
   }
+  if (current_batch_row_ != 0) {
+    current_batch_->Reset();
+  }
+  current_batch_.reset();
   if (batch_queue_ != nullptr) batch_queue_->Close();
   // While it should be safe to call NotifyOne() here, prefer to use NotifyAll() to
   // ensure that all sleeping threads are awoken. The call to NotifyAll() is not on the
@@ -128,50 +140,71 @@ Status BufferedPlanRootSink::GetNext(
     RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
   {
     unique_lock<mutex> l(lock_);
-    while (batch_queue_->IsEmpty() && sender_state_ == SenderState::ROWS_PENDING
-        && !state->is_cancelled()) {
-      SCOPED_TIMER(row_batches_get_wait_timer_);
-      rows_available_.Wait(l);
-    }
+    *eos = false;
+
+    // Cap the maximum number of results fetched by this call to GetNext so that the
+    // resulting QueryResultSet does not consume excessive amounts of memory.
+    num_results = min(num_results, MAX_FETCH_SIZE);
+
+    // Track the number of rows read from the queue and the number of rows to read.
+    int num_rows_read = 0;
+    // If 'num_results' <= 0 then by default fetch FETCH_NUM_BATCHES batches.
+    int num_rows_to_read =
+        num_results <= 0 ? FETCH_NUM_BATCHES * state->batch_size() : num_results;
+
+    // Read from the queue until all requested rows have been read, or eos is hit.
+    while (!*eos && num_rows_read < num_rows_to_read) {
+      // Wait for the queue to have rows in it.
+      while (IsQueueEmpty() && sender_state_ == SenderState::ROWS_PENDING
+          && !state->is_cancelled()) {
+        SCOPED_TIMER(row_batches_get_wait_timer_);
+        rows_available_.Wait(l);
+      }
 
-    // If the query was cancelled while the sink was waiting for rows to become available,
-    // or if the query was cancelled before the current call to GetNext, set eos and then
-    // return. The queue could be empty if the sink was closed while waiting for rows to
-    // become available, or if the sink was closed before the current call to GetNext.
-    if (!state->is_cancelled() && !batch_queue_->IsEmpty()) {
-      unique_ptr<RowBatch> batch =
-          make_unique<RowBatch>(row_desc_, state->batch_size(), mem_tracker());
-      RETURN_IF_ERROR(batch_queue_->GetBatch(batch.get()));
-      // TODO for now, if num_results < batch->num_rows(), we terminate returning results
-      // early until we can properly handle fetch requests where
-      // num_results < batch->num_rows().
-      if (num_results > 0 && num_results < batch->num_rows()) {
-        *eos = true;
-        batch_queue_has_capacity_.NotifyOne();
-        consumer_eos_.NotifyOne();
-        batch->Reset();
-        return Status::Expected(TErrorCode::NOT_IMPLEMENTED_ERROR,
-            "BufferedPlanRootSink does not support setting num_results < BATCH_SIZE");
+      // If the query was cancelled while the sink was waiting for rows to become
+      // available, or if the query was cancelled before the current call to GetNext, set
+      // eos and then return. The queue could be empty if the sink was closed while
+      // waiting for rows to become available, or if the sink was closed before the
+      // current call to GetNext.
+      if (!state->is_cancelled() && !IsQueueEmpty()) {
+        // If current_batch_ is empty, then read directly from the queue.
+        if (current_batch_row_ == 0) {
+          RETURN_IF_ERROR(batch_queue_->GetBatch(current_batch_.get()));
+
+          // After reading a RowBatch from the queue, it now has additional capacity,
+          // notify the producer so it can add more RowBatches. Even though the lock is
+          // still held when batch_queue_has_capacity_ is notified, the lock may be
+          // released if the current thread waits on rows_available_.
+          batch_queue_has_capacity_.NotifyOne();
+        }
+
+        // Set the number of rows to be fetched from 'current_batch_'. Either read all
+        // remaining rows in the batch, or read up to the 'num_rows_to_read' limit.
+        int num_rows_to_fetch = min(current_batch_->num_rows() - current_batch_row_,
+            num_rows_to_read - num_rows_read);
+        DCHECK_GE(num_rows_to_fetch, 0);
+
+        // Read rows from 'current_batch_' and add them to 'results'.
+        RETURN_IF_ERROR(results->AddRows(output_expr_evals_, current_batch_.get(),
+            current_batch_row_, num_rows_to_fetch));
+        num_rows_read += num_rows_to_fetch;
+        current_batch_row_ += num_rows_to_fetch;
+
+        // If all rows have been read from 'current_batch_' then reset the batch and its
+        // index.
+        DCHECK_LE(current_batch_row_, current_batch_->num_rows());
+        if (current_batch_row_ == current_batch_->num_rows()) {
+          current_batch_row_ = 0;
+          current_batch_->Reset();
+        }
+
+        // Prevent expr result allocations from accumulating.
+        expr_results_pool_->Clear();
       }
-      RETURN_IF_ERROR(
-          results->AddRows(output_expr_evals_, batch.get(), 0, batch->num_rows()));
-      // Prevent expr result allocations from accumulating.
-      expr_results_pool_->Clear();
-      batch->Reset();
+      *eos = IsQueueEmpty() && sender_state_ == SenderState::EOS;
     }
-    *eos = batch_queue_->IsEmpty() && sender_state_ == SenderState::EOS;
     if (*eos) consumer_eos_.NotifyOne();
   }
-  // Release the lock before calling notify so the consumer thread can immediately
-  // acquire the lock. It is safe to call notify batch_queue_has_capacity_ regardless of
-  // whether a RowBatch is read. Either (1) a RowBatch is read and the queue is no longer
-  // full, so notify the consumer thread or (2) a Rowbatch was not read, which means
-  // either FlushFinal was called or the query was cancelled. If FlushFinal was called
-  // then the consumer thread has completed. If the query is cancelled, then we wake up
-  // the consumer thread so it can check the cancellation status and return. Releasing
-  // the lock is safe because the consumer always loops until the queue actually has
-  // space.
-  batch_queue_has_capacity_.NotifyOne();
   return state->GetQueryStatus();
 }
-}
+} // namespace impala
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index d437b02..a8f4b70 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -19,6 +19,7 @@
 
 #include "exec/plan-root-sink.h"
 #include "runtime/spillable-row-batch-queue.h"
+#include "runtime/query-state.h"
 #include "util/condition-variable.h"
 
 namespace impala {
@@ -66,7 +67,9 @@ class BufferedPlanRootSink : public PlanRootSink {
   /// Releases resources and unblocks the consumer thread.
   virtual void Close(RuntimeState* state) override;
 
-  /// Blocks until rows are available for consumption.
+  /// Blocks until rows are available for consumption. GetNext() always returns 'num_rows'
+  /// rows unless (1) there are not enough rows left in the result set to return
+  /// 'num_rows' rows, or (2) the value of 'num_rows' exceeds MAX_FETCH_SIZE.
   virtual Status GetNext(
       RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos) override;
 
@@ -75,6 +78,13 @@ class BufferedPlanRootSink : public PlanRootSink {
   virtual void Cancel(RuntimeState* state) override;
 
  private:
+  /// The maximum number of rows that can be fetched at a time. Set to 100x the
+  /// DEFAULT_BATCH_SIZE. Limiting the fetch size is necessary so that the resulting
+  /// QueryResultSet does not take up too much memory. Memory used by a QueryResultSet
+  /// is not tracked or reserved, so creating QueryResultSets that are too big can throw
+  /// off admission control.
+  static const int MAX_FETCH_SIZE = QueryState::DEFAULT_BATCH_SIZE * 100;
+
   /// Protects the RowBatchQueue and all ConditionVariables.
   boost::mutex lock_;
 
@@ -119,5 +129,22 @@ class BufferedPlanRootSink : public PlanRootSink {
   /// them. Specifically, this counter measures the amount of time spent waiting on
   /// 'rows_available_' in the 'GetNext' method.
   RuntimeProfile::Counter* row_batches_get_wait_timer_ = nullptr;
+
+  /// The RowBatch currently being read by 'GetNext'. Necessary for calls to 'GetNext'
+  /// that only read part of a RowBatch from the queue. If nullptr, 'GetNext' will read
+  /// the next RowBatch from the queue. The pointer is reset whenever 'GetNext' has
+  /// finished reading all rows from the batch.
+  std::unique_ptr<RowBatch> current_batch_;
+
+  /// The index of the next row to be read from 'current_batch_' in the next call to
+  /// 'GetNext'. If 'current_batch_' is nullptr, the value of 'current_batch_row_' is 0.
+  int current_batch_row_ = 0;
+
+  /// Returns true if the 'queue' is empty (not the 'batch_queue_'). 'queue' refers to
+  /// the logical queue of RowBatches and thus includes any RowBatch that
+  /// 'current_batch_' points to. Must be called while holding 'lock_'.
+  bool IsQueueEmpty() const {
+    return batch_queue_->IsEmpty() && current_batch_row_ == 0;
+  }
 };
 }
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 6b097dd..ef0c189 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -1272,6 +1272,92 @@ TEST_F(SimpleTupleStreamTest, UnpinReadPage) {
     ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
     stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
+  write_batch->Reset();
+}
+
+// Test writing to a stream (AddRow and UnpinStream), even though attached pages have not
+// been released yet.
+TEST_F(SimpleTupleStreamTest, WriteAfterReadAttached) {
+  int buffer_size = 4 * 1024;
+  Init(100 * buffer_size);
+
+  bool eos;
+  bool got_reservation;
+  Status status;
+
+  vector<int> results;
+  RowBatch read_batch(int_desc_, 1, &tracker_);
+  RowBatch* write_batch = CreateIntBatch(0, 1, false);
+  ASSERT_EQ(write_batch->num_rows(), 1);
+
+  // Test adding a row to the stream before releasing an output batch returned by
+  // GetNext.
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+  ASSERT_OK(stream.Init("SimpleTupleStreamTest::InterleaveReadAndWrite", true));
+  ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+  ASSERT_TRUE(got_reservation);
+
+  // Add a row to the stream.
+  EXPECT_TRUE(stream.AddRow(write_batch->GetRow(0), &status));
+  ASSERT_OK(status);
+
+  // Read a row from the stream, but do not reset the read_batch.
+  ASSERT_OK(stream.GetNext(&read_batch, &eos));
+
+  // Add a row to the stream.
+  EXPECT_TRUE(stream.AddRow(write_batch->GetRow(0), &status));
+  ASSERT_OK(status);
+
+  // Validate the contents of read_batch.
+  AppendRowTuples(read_batch.GetRow(0), int_desc_, &results);
+  VerifyResults<int>(*int_desc_, results, 1, false);
+  results.clear();
+
+  // Validate the data just added to the stream.
+  ReadValues(&stream, int_desc_, &results);
+  VerifyResults<int>(*int_desc_, results, 1, false);
+  results.clear();
+
+  // Reset the read_batch and close the stream.
+  read_batch.Reset();
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+
+  // Read a batch from the stream, unpin the stream, add a row, and then validate the
+  // contents of read batch.
+  BufferedTupleStream unpin_stream(
+      runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+  ASSERT_OK(unpin_stream.Init("SimpleTupleStreamTest::InterleaveReadAndWrite", true));
+  ASSERT_OK(unpin_stream.PrepareForReadWrite(true, &got_reservation));
+  ASSERT_TRUE(got_reservation);
+
+  // Add a row to the stream.
+  EXPECT_TRUE(unpin_stream.AddRow(write_batch->GetRow(0), &status));
+  ASSERT_OK(status);
+
+  // Read a row from the stream, but do not reset the read_batch.
+  ASSERT_OK(unpin_stream.GetNext(&read_batch, &eos));
+
+  // Unpin the stream.
+  ASSERT_OK(unpin_stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+
+  // Add a row to the stream.
+  EXPECT_TRUE(unpin_stream.AddRow(write_batch->GetRow(0), &status));
+  ASSERT_OK(status);
+
+  // Validate the contents of read_batch.
+  AppendRowTuples(read_batch.GetRow(0), int_desc_, &results);
+  VerifyResults<int>(*int_desc_, results, 1, false);
+  results.clear();
+
+  // Validate the data just added to the stream.
+  ReadValues(&unpin_stream, int_desc_, &results);
+  VerifyResults<int>(*int_desc_, results, 1, false);
+  results.clear();
+
+  // Reset the read_batch and close the stream.
+  read_batch.Reset();
+  unpin_stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 
   write_batch->Reset();
 }
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 6ed38da..4e497aa 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -239,6 +239,9 @@ class QueryState {
   /// already. Also notifies anyone waiting on WaitForFinishOrTimeout().
   void ErrorDuringExecute(const Status& status, const TUniqueId& finst_id);
 
+  /// The default BATCH_SIZE.
+  static const int DEFAULT_BATCH_SIZE = 1024;
+
  private:
   friend class QueryExecMgr;
 
@@ -246,8 +249,6 @@ class QueryState {
   friend class RuntimeState;
   friend class TestEnv;
 
-  static const int DEFAULT_BATCH_SIZE = 1024;
-
   /// Blocks until all fragment instances have finished executing or until one of them
   /// hits an error, or until 'timeout_ms' milliseconds has elapsed. Returns 'true' if
   /// all fragment instances finished or one of them hits an error. Return 'false' on
diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py
index e06ec96..feb0041 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -22,18 +22,10 @@ import threading
 from time import sleep
 from tests.common.errors import Timeout
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.cancel_util import cancel_query_and_validate_state
 
-# Queries to execute, use the TPC-H dataset because tables are large so queries take some
-# time to execute.
-CANCELLATION_QUERIES = ["select l_returnflag from tpch_parquet.lineitem",
-                        "select * from tpch_parquet.lineitem limit 50",
-                        "select * from tpch_parquet.lineitem order by l_orderkey"]
-
-# Time to sleep between issuing query and canceling.
-CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4]
-
 
 class TestResultSpooling(ImpalaTestSuite):
   @classmethod
@@ -111,11 +103,23 @@ class TestResultSpooling(ImpalaTestSuite):
       self.client.close_query(handle)
 
   def test_full_queue(self, vector):
+    """Delegates to _test_full_queue."""
+    query = "select * from functional.alltypes order by id limit 1500"
+    self._test_full_queue(vector, query)
+
+  def test_full_queue_large_fetch(self, vector):
+    """Delegates to _test_full_queue, but specifies a fetch size equal to the number of
+    rows returned by the query. This tests that clients can fetch all rows from a full
+    queue."""
+    num_rows = 1500
+    query = "select * from functional.alltypes order by id limit {0}".format(num_rows)
+    self._test_full_queue(vector, query, fetch_size=num_rows)
+
+  def _test_full_queue(self, vector, query, fetch_size=-1):
     """Tests result spooling when there is no more space to buffer query results (the
     queue is full), and the client hasn't fetched any results. Validates that
     RowBatchSendWaitTime (amount of time Impala blocks waiting for the client to read
     buffered results and clear up space in the queue) is updated properly."""
-    query = "select * from functional.alltypes order by id limit 1500"
     exec_options = vector.get_value('exec_option')
 
     # Set lower values for spill-to-disk and result spooling configs so that the queue
@@ -140,7 +144,7 @@ class TestResultSpooling(ImpalaTestSuite):
       self.wait_for_any_state(handle, [self.client.QUERY_STATES['RUNNING'],
           self.client.QUERY_STATES['FINISHED']], timeout)
       time.sleep(5)
-      self.client.fetch(query, handle)
+      self.client.fetch(query, handle, max_rows=fetch_size)
       assert re.search(send_wait_time_regex, self.client.get_runtime_profile(handle)) \
           is not None
     finally:
@@ -193,6 +197,96 @@ class TestResultSpooling(ImpalaTestSuite):
                                      "spooling was enabled".format(query)
 
 
+class TestResultSpoolingFetchSize(ImpalaTestSuite):
+  """Tests fetching logic when result spooling is enabled. When result spooling is
+  disabled, Impala only supports fetching up to BATCH_SIZE rows at a time (since only
+  one RowBatch is ever buffered). When result spooling is enabled, clients can specify
+  any fetch size (up to a limit) and Impala will return exactly that number of rows
+  (assuming there are that many rows left to fetch). This class validates the
+  aformentioned result spooling fetch logic using different fetch and batch sizes."""
+
+  # The different values of BATCH_SIZE 'test_fetch' will be parameterized by.
+  _batch_sizes = [100, 1024, 2048]
+
+  # The number of rows to fetch from the query handle.
+  _fetch_sizes = [7, 23, 321, 512, 2048, 4321, 5000, 10000]
+
+  # The number of rows in functional_parquet.alltypes.
+  _num_rows = 7300
+
+  # The query that 'test_fetch' will run.
+  _query = "select id from functional_parquet.alltypes order by id"
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestResultSpoolingFetchSize, cls).add_test_dimensions()
+    # Create a test matrix with three different dimensions: BATCH_SIZE, the number of
+    # rows to fetch at a time, and whether the tests should wait for all results to be
+    # spooled before fetching any rows.
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        batch_sizes=cls._batch_sizes))
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('fetch_size',
+        *cls._fetch_sizes))
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('wait_for_finished',
+        *[True, False]))
+
+    # Result spooling should be independent of file format, so only testing for
+    # table_format=parquet/none in order to avoid a test dimension explosion.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  @classmethod
+  def setup_class(cls):
+    super(TestResultSpoolingFetchSize, cls).setup_class()
+    # All tests only ever run a single query, so rather than re-run this query for every
+    # test, run it once and store the results.
+    base_result = cls.client.execute(cls._query)
+    assert base_result.success, "Failed to run {0} when result spooling is " \
+                                "enabled".format(cls._query)
+    cls._base_data = base_result.data
+
+  def test_fetch(self, vector):
+    """Run '_query' with result spooling enabled and with the specified BATCH_SIZE. Use
+    the 'fetch_size' parameter to determine how many rows to fetch from the query handle
+    at a time. Fetch all results and then validate they match '_base_data'."""
+    exec_options = vector.get_value('exec_option')
+    exec_options['spool_query_results'] = 'true'
+    fetch_size = vector.get_value('fetch_size')
+
+    # Amount of time to wait for the query to reach a running state before through a
+    # Timeout exception.
+    timeout = 10
+
+    results = []
+    handle = self.execute_query_async(self._query, exec_options)
+    try:
+      # If 'wait_for_finished' is True, wait for the query to reach the FINISHED state.
+      # When it reaches this state all results should be successfully spooled.
+      if vector.get_value('wait_for_finished'):
+          self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], timeout)
+      rows_fetched = 0
+
+      # Call 'fetch' on the query handle enough times to read all rows.
+      while rows_fetched < self._num_rows:
+        result_data = self.client.fetch(self._query, handle, fetch_size).data
+        # Assert that each fetch request returns exactly the number of rows requested,
+        # unless less than that many rows were left in the result set.
+        assert len(result_data) == min(fetch_size, self._num_rows - rows_fetched)
+        rows_fetched += len(result_data)
+        results.extend(result_data)
+    finally:
+       self.client.close_query(handle)
+
+    # Assert that the fetched results match the '_base_data'.
+    assert self._num_rows == rows_fetched
+    assert self._base_data == results
+
+
 class TestResultSpoolingCancellation(ImpalaTestSuite):
   """Test cancellation of queries when result spooling is enabled. This class heavily
   borrows from the cancellation tests in test_cancellation.py. It uses the following test
@@ -201,6 +295,15 @@ class TestResultSpoolingCancellation(ImpalaTestSuite):
   before being cancelled.
   """
 
+  # Queries to execute, use the TPC-H dataset because tables are large so queries take
+  # some time to execute.
+  _cancellation_queries = ["select l_returnflag from tpch_parquet.lineitem",
+                          "select * from tpch_parquet.lineitem limit 50",
+                          "select * from tpch_parquet.lineitem order by l_orderkey"]
+
+  # Time to sleep between issuing query and canceling.
+  _cancel_delay_in_seconds = [0, 0.01, 0.1, 1, 4]
+
   @classmethod
   def get_workload(cls):
     return 'tpch'
@@ -209,9 +312,9 @@ class TestResultSpoolingCancellation(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestResultSpoolingCancellation, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('query',
-        *CANCELLATION_QUERIES))
+        *cls._cancellation_queries))
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('cancel_delay',
-        *CANCEL_DELAY_IN_SECONDS))
+        *cls._cancel_delay_in_seconds))
 
     # Result spooling should be independent of file format, so only testing for
     # table_format=parquet/none in order to avoid a test dimension explosion.