You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/08/27 10:51:31 UTC

[impala] 01/02: IMPALA-9225: Query option for retryable queries to spool all results before returning any to the client

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 61dcc805e536af0f160225cc928aa188aa861225
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Tue Aug 11 16:35:15 2020 +0800

    IMPALA-9225: Query option for retryable queries to spool all results before returning any to the client
    
    If we have returned any results to the client in the original query,
    query retry will be skipped to avoid incorrect results. This patch adds
    a query option, spool_all_results_for_retries, for retryable queries to
    spool all results before returning any to the client. It defaults to
    true. If all query results cannot be contained in the allocated result
    spooling space, we'll return results and thus disabled query retry on
    the query.
    
    Setting spool_all_results_for_retries to false will fallback to the
    original behavior - client can fetch results when any of them are ready.
    So we explicitly set it to false in the retried query since it won't be
    retried. For non retryable queries or queries that don't enable results
    spooling, the spool_all_results_for_retries option takes no effect.
    
    To implement this, this patch defers the time when results are ready to
    be fetched. By default, the “rows available” event happens when any
    results are ready. For a retryable query, when spool_query_results and
    spool_all_results_for_retries are both true, the “rows available” event
    happens after all results are spooled or any errors stopping us to do
    so, e.g. batch queue is full, cancellation or failures. After waiting
    for the root fragment instance’s Open() finishes, the coordinator will
    wait until results of BufferedPlanRootSink are ready.
    BufferedPlanRootSink sets the results ready signal in its Send(),
    Close(), Cancel(), FlushFinal() methods.
    
    Tests:
    - Add a test to verify that a retryable query will spool all its results
      when results spooling and spool_all_results_for_retries are enabled.
    - Add a test to verify that query retry succeeds when a retryable query
      is still spooling its results (spool_all_results_for_retries=true).
    - Add a test to verify that the retried query won't spool all results
      even when results spooling and spool_all_results_for_retries are
      enabled in the original query.
    - Add a test to verify that the original query can be canceled
      correctly. We need this because the added logics for
      spool_all_results_for_retries are related to the cancellation code
      path.
    - Add a test to verify results will be returned when all of them can't
      fit into the result spooling space, and query retry will be skipped.
    
    Change-Id: I462dbfef9ddab9060b30a6937fca9122484a24a5
    Reviewed-on: http://gerrit.cloudera.org:8080/16323
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/buffered-plan-root-sink.cc     |  12 +++
 be/src/exec/buffered-plan-root-sink.h      |  11 ++
 be/src/runtime/coordinator.cc              |  17 ++-
 be/src/runtime/query-driver.cc             |   7 ++
 be/src/runtime/spillable-row-batch-queue.h |   2 +-
 be/src/service/query-options.cc            |   4 +
 be/src/service/query-options.h             |   4 +-
 common/thrift/ImpalaInternalService.thrift |   3 +
 common/thrift/ImpalaService.thrift         |  10 ++
 tests/common/impala_test_suite.py          |  21 ++++
 tests/custom_cluster/test_query_retries.py | 161 ++++++++++++++++++++++++++++-
 11 files changed, 244 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 6dd69dd..19bcee2 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -78,6 +78,9 @@ Status BufferedPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
     while (!state->is_cancelled() && batch_queue_->IsFull()) {
       SCOPED_TIMER(profile()->inactive_timer());
       SCOPED_TIMER(row_batches_send_wait_timer_);
+      if (!all_results_spooled_.IsSet()) {
+        discard_result(all_results_spooled_.Set(true));
+      }
       batch_queue_has_capacity_.Wait(l);
     }
     RETURN_IF_CANCELLED(state);
@@ -102,6 +105,9 @@ Status BufferedPlanRootSink::FlushFinal(RuntimeState* state) {
   DCHECK(!closed_);
   unique_lock<mutex> l(lock_);
   sender_state_ = SenderState::EOS;
+  if (!all_results_spooled_.IsSet()) {
+    discard_result(all_results_spooled_.Set(false));
+  }
   // If no batches are ever added, wake up the consumer thread so it can check the
   // SenderState and return appropriately.
   rows_available_.NotifyAll();
@@ -123,6 +129,9 @@ void BufferedPlanRootSink::Close(RuntimeState* state) {
   if (sender_state_ == SenderState::ROWS_PENDING) {
     sender_state_ = SenderState::CLOSED_NOT_EOS;
   }
+  if (!all_results_spooled_.IsSet()) {
+    discard_result(all_results_spooled_.Set(false));
+  }
   if (current_batch_row_ != 0) {
     current_batch_->Reset();
   }
@@ -152,6 +161,9 @@ void BufferedPlanRootSink::Cancel(RuntimeState* state) {
   rows_available_.NotifyAll();
   consumer_eos_.NotifyAll();
   batch_queue_has_capacity_.NotifyAll();
+  if (!all_results_spooled_.IsSet()) {
+    discard_result(all_results_spooled_.Set(false));
+  }
 }
 
 Status BufferedPlanRootSink::GetNext(RuntimeState* state, QueryResultSet* results,
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index 586423d..d62939a 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -76,6 +76,13 @@ class BufferedPlanRootSink : public PlanRootSink {
   /// status.
   virtual void Cancel(RuntimeState* state) override;
 
+  /// Blocks until all results are spooled or we fail to do this due to batch_queue_ is
+  /// full, cancellation or any errors. Returns if we fail to do this due to batch_queue_
+  /// is full.
+  bool WaitForAllResultsSpooled() {
+    return all_results_spooled_.Get();
+  }
+
  private:
   /// The maximum number of rows that can be fetched at a time. Set to 100x the
   /// DEFAULT_BATCH_SIZE. Limiting the fetch size is necessary so that the resulting
@@ -139,6 +146,10 @@ class BufferedPlanRootSink : public PlanRootSink {
   /// 'GetNext'. If 'current_batch_' is nullptr, the value of 'current_batch_row_' is 0.
   int current_batch_row_ = 0;
 
+  /// Set when all results are spooled or we fail to do this due to batch_queue_ full or
+  /// any errors.
+  Promise<bool> all_results_spooled_;
+
   /// Returns true if the 'queue' (not the 'batch_queue_') is empty. 'queue' refers to
   /// the logical queue of RowBatches and thus includes any RowBatch that
   /// 'current_batch_' points to. Must be called while holding 'lock_'. Cannot be called
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 0ceae83..b4f5178 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -29,6 +29,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/hdfs.h"
+#include "exec/buffered-plan-root-sink.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
@@ -799,8 +800,20 @@ Status Coordinator::Wait() {
 
   if (stmt_type_ == TStmtType::QUERY) {
     DCHECK(coord_instance_ != nullptr);
-    return UpdateExecState(coord_instance_->WaitForOpen(),
-        &coord_instance_->runtime_state()->fragment_instance_id(), FLAGS_hostname);
+    RETURN_IF_ERROR(UpdateExecState(coord_instance_->WaitForOpen(),
+        &coord_instance_->runtime_state()->fragment_instance_id(), FLAGS_hostname));
+    if (query_state_->query_options().retry_failed_queries
+        && query_state_->query_options().spool_query_results
+        && query_state_->query_options().spool_all_results_for_retries) {
+      // Wait until the BufferedPlanRootSink spooled all results or any errors stopping
+      // it, e.g. batch queue full, cancellation or failures.
+      auto sink = static_cast<BufferedPlanRootSink*>(coord_sink_);
+      if (sink->WaitForAllResultsSpooled()) {
+        VLOG_QUERY << "Cannot spool all results in the allocated result spooling space."
+            " Query retry will be skipped if any results have been returned.";
+      }
+    }
+    return Status::OK();
   }
   DCHECK_EQ(stmt_type_, TStmtType::DML);
   // DML finalization can only happen when all backends have completed all side-effects
diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index e34626c..53b35d0 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -309,6 +309,13 @@ void QueryDriver::CreateRetriedClientRequestState(ClientRequestState* request_st
   // TExecRequest object.
   retry_exec_request_ = make_unique<TExecRequest>(*exec_request_);
   TQueryCtx query_ctx = retry_exec_request_->query_exec_request.query_ctx;
+  if (query_ctx.client_request.query_options.spool_all_results_for_retries) {
+    // Reset this flag in the retry query since we won't retry again, so results can be
+    // returned immediately.
+    query_ctx.client_request.query_options.__set_spool_all_results_for_retries(false);
+    VLOG_QUERY << "Unset SPOOL_ALL_RESULTS_FOR_RETRIES when retrying query "
+        << PrintId(client_request_state_->query_id());
+  }
   parent_server_->PrepareQueryContext(&query_ctx);
   retry_exec_request_->query_exec_request.__set_query_ctx(query_ctx);
 
diff --git a/be/src/runtime/spillable-row-batch-queue.h b/be/src/runtime/spillable-row-batch-queue.h
index a7da32c..b1d8d17 100644
--- a/be/src/runtime/spillable-row-batch-queue.h
+++ b/be/src/runtime/spillable-row-batch-queue.h
@@ -130,7 +130,7 @@ class SpillableRowBatchQueue {
   const TDebugOptions& debug_options_;
 
   /// The max number of bytes that can be unpinned in the BufferedTupleStream. Set by the
-  /// query option MAX_UNPINNED_RESULT_SPOOLING_MEMORY.
+  /// query option MAX_SPILLED_RESULT_SPOOLING_MEM.
   const int64_t max_unpinned_bytes_;
 
   /// True if the queue has been closed, false otherwise.
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 0b3a728..181a4dd 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -938,6 +938,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_refresh_updated_hms_partitions(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::SPOOL_ALL_RESULTS_FOR_RETRIES: {
+        query_options->__set_spool_all_results_for_retries(IsTrue(value));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 4d7bc60..8147aff 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::REFRESH_UPDATED_HMS_PARTITIONS + 1);\
+      TImpalaQueryOptions::SPOOL_ALL_RESULTS_FOR_RETRIES + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -209,6 +209,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(max_fs_writers, MAX_FS_WRITERS, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(refresh_updated_hms_partitions,\
       REFRESH_UPDATED_HMS_PARTITIONS, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(spool_all_results_for_retries, SPOOL_ALL_RESULTS_FOR_RETRIES,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 2f0c52d..fe27b07 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -441,6 +441,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   109: optional bool refresh_updated_hms_partitions = false;
+
+  // See comment in ImpalaService.thrift
+  110: optional bool spool_all_results_for_retries = true;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 1d5688c..1fbf5f9 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -551,12 +551,22 @@ enum TImpalaQueryOptions {
   // Sets an upper limit on the number of fs writer instances to be scheduled during
   // insert. Currently this limit only applies to HDFS inserts.
   MAX_FS_WRITERS = 107
+
   // When this query option is set, a refresh table statement will detect existing
   // partitions which have been changed in metastore and refresh them. By default, this
   // option is disabled since there is additional performance hit to fetch all the
   // partitions and detect if they are not same as ones in the catalogd. Currently, this
   // option is only applicable for refresh table statement.
   REFRESH_UPDATED_HMS_PARTITIONS = 108
+
+  // If RETRY_FAILED_QUERIES and SPOOL_QUERY_RESULTS are enabled and this is true,
+  // retryable queries will try to spool all results before returning any to the client.
+  // If the result set is too large to fit into the spooling memory (including the spill
+  // mem), results will be returned and the query will not be retryable. This may have
+  // some performance impact. Set it to false then clients can fetch results immediately
+  // when any of them are ready. Note that in this case, query retry will be skipped if
+  // the client has fetched some results.
+  SPOOL_ALL_RESULTS_FOR_RETRIES = 109
 }
 
 // The summary of a DML statement.
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 4ee54d5..ac11ed3 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1070,6 +1070,27 @@ class ImpalaTestSuite(BaseTestSuite):
                     actual_state))
     return actual_state
 
+  def wait_for_progress(self, handle, expected_progress, timeout, client=None):
+    """Waits for the given query handle to reach expected progress rate"""
+    if client is None: client = self.client
+    start_time = time.time()
+    summary = client.get_exec_summary(handle)
+    while time.time() - start_time < timeout and \
+        self.__get_query_progress_rate(summary.progress) <= expected_progress:
+      summary = client.get_exec_summary(handle)
+      time.sleep(0.5)
+    actual_progress = self.__get_query_progress_rate(summary.progress)
+    if actual_progress <= expected_progress:
+      raise Timeout("query {0} did not reach the expected progress {1}, "
+                    "current progress {2}".format(handle.get_handle().id,
+                    expected_progress, actual_progress))
+    return actual_progress
+
+  def __get_query_progress_rate(self, progress):
+    if progress is None:
+      return 0
+    return float(progress.num_completed_scan_ranges) / progress.total_scan_ranges
+
   def wait_for_db_to_appear(self, db_name, timeout_s):
     """Wait until the database with 'db_name' is present in the impalad's local catalog.
     Fail after timeout_s if the doesn't appear."""
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 54f2334..e41544c 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -55,6 +55,13 @@ class TestQueryRetries(CustomClusterTestSuite):
       "\tegular courts above the\t1\t15635\t638\t6\t32.00\t49620.16\t0.07\t0.02\tN\tO" \
       "\t1996-01-30\t1996-02-07\t1996-02-03\tDELIVER IN PERSON\tMAIL\tarefully slyly ex"
 
+  # The following query has two union operands. The first operand executes quickly
+  # and the second one executes slowly. So we can kill one impalad when some results
+  # are ready and the query is still running and has more results.
+  _union_query = """select count(*) from functional.alltypestiny
+        union all
+        select count(*) from functional.alltypes where bool_col = sleep(50)"""
+
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -327,7 +334,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     running the query, and the validate that another fetch request fails."""
     query = "select * from functional.alltypes where bool_col = sleep(500)"
     handle = self.execute_query_async(query,
-            query_options={'retry_failed_queries': 'true', 'batch_size': '1'})
+        query_options={'retry_failed_queries': 'true', 'batch_size': '1'})
     self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
 
     self.client.fetch(query, handle, max_rows=1)
@@ -335,12 +342,158 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.cluster.impalads[1].kill()
     time.sleep(5)
 
-    # Assert than attempt to fetch from the query handle fails.
+    # Assert that attempt to fetch from the query handle fails.
     try:
       self.client.fetch(query, handle)
       assert False
-    except Exception, e:
-        assert "Failed due to unreachable impalad" in str(e)
+    except Exception as e:
+      assert "Failed due to unreachable impalad" in str(e)
+      assert "Skipping retry of query_id=%s because the client has already " \
+             "fetched some rows" % handle.get_handle().id in str(e)
+
+  @pytest.mark.execute_serially
+  def test_spooling_all_results_for_retries(self):
+    """Test retryable queries with spool_all_results_for_retries=true will spool all
+    results when results spooling is enabled."""
+    handle = self.execute_query_async(self._union_query, query_options={
+        'retry_failed_queries': 'true', 'spool_query_results': 'true',
+        'spool_all_results_for_retries': 'true'})
+
+    # Fetch one row first.
+    results = self.client.fetch(self._union_query, handle, max_rows=1)
+    assert len(results.data) == 1
+    assert int(results.data[0]) == 8
+
+    # All results are spooled since we are able to fetch some results.
+    # Killing an impalad should not trigger query retry.
+    self.__kill_random_impalad()
+    time.sleep(5)
+
+    # We are still able to fetch the remaining results.
+    results = self.client.fetch(self._union_query, handle)
+    assert len(results.data) == 1
+    assert int(results.data[0]) == 3650
+
+    # Verify no retry happens
+    retried_query_id = self.__get_retried_query_id_from_summary(handle)
+    assert retried_query_id is None
+    runtime_profile = self.client.get_runtime_profile(handle)
+    assert self.__get_query_id_from_profile(runtime_profile) == handle.get_handle().id
+
+    self.client.close_query(handle)
+
+  @pytest.mark.execute_serially
+  def test_query_retry_in_spooling(self):
+    """Test retryable queries with results spooling enabled and
+    spool_all_results_for_retries=true can be safely retried for failures that happen when
+    it's still spooling the results"""
+    handle = self.execute_query_async(self._union_query, query_options={
+      'retry_failed_queries': 'true', 'spool_query_results': 'true',
+      'spool_all_results_for_retries': 'true'})
+    # Wait until the first union operand finishes, so some results are spooled.
+    self.wait_for_progress(handle, 0.1, 60)
+
+    self.__kill_random_impalad()
+
+    # Still able to fetch the correct result since the query is retried.
+    results = self.client.fetch(self._union_query, handle)
+    assert len(results.data) == 2
+    assert int(results.data[0]) == 8
+    assert int(results.data[1]) == 3650
+
+    # Verify the query has been retried
+    retried_query_id = self.__get_retried_query_id_from_summary(handle)
+    assert retried_query_id is not None
+
+    self.client.close_query(handle)
+
+  @pytest.mark.execute_serially
+  def test_retried_query_not_spooling_all_results(self):
+    """Test retried query can return results immediately even when results spooling and
+    spool_all_results_for_retries are enabled in the original query."""
+    handle = self.execute_query_async(self._union_query, query_options={
+      'retry_failed_queries': 'true', 'spool_query_results': 'true',
+      'spool_all_results_for_retries': 'true'})
+    # Wait until the first union operand finishes and then kill one impalad.
+    self.wait_for_progress(handle, 0.1, 60)
+
+    # Kill one impalad so the query will be retried.
+    self.__kill_random_impalad()
+    time.sleep(5)
+
+    # Verify that we are able to fetch results of the first union operand while the query
+    # is still executing the second union operand.
+    results = self.client.fetch(self._union_query, handle, max_rows=1)
+    assert len(results.data) == 1
+    assert int(results.data[0]) == 8
+
+    # Assert that the query is still executing the second union operand.
+    summary = self.client.get_exec_summary(handle)
+    assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges
+
+    self.client.close_query(handle)
+
+  @pytest.mark.execute_serially
+  def test_query_retry_reaches_spool_limit(self):
+    """Test retryable queries with results spooling enabled and
+    spool_all_results_for_retries=true that reach spooling mem limit will return rows and
+    skip retry"""
+    query = "select * from functional.alltypes where bool_col = sleep(500)"
+    # Set lower values for spill-to-disk configs to force the above query to spill
+    # spooled results and hit result queue limit.
+    handle = self.execute_query_async(query, query_options={
+        'batch_size': 1,
+        'spool_query_results': True,
+        'retry_failed_queries': True,
+        'spool_all_results_for_retries': True,
+        'min_spillable_buffer_size': 8 * 1024,
+        'default_spillable_buffer_size': 8 * 1024,
+        'max_result_spooling_mem': 8 * 1024,
+        'max_spilled_result_spooling_mem': 8 * 1024})
+
+    # Wait until we can fetch some results
+    results = self.client.fetch(query, handle, max_rows=1)
+    assert len(results.data) == 1
+
+    # Assert that the query is still executing
+    summary = self.client.get_exec_summary(handle)
+    assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges
+
+    self.assert_impalad_log_contains('INFO', 'Cannot spool all results in the allocated'
+        ' result spooling space. Query retry will be skipped if any results have been '
+        'returned.', expected_count=1)
+
+    # Kill one impalad and assert that the query is not retried.
+    self.__kill_random_impalad()
+
+    try:
+      self.client.fetch(query, handle)
+      assert False, "fetch should fail"
+    except ImpalaBeeswaxException as e:
+      assert "Failed due to unreachable impalad" in str(e)
+      assert "Skipping retry of query_id=%s because the client has already " \
+             "fetched some rows" % handle.get_handle().id in str(e)
+
+  @pytest.mark.execute_serially
+  def test_original_query_cancel(self):
+    """Test canceling a retryable query with spool_all_results_for_retries=true. Make sure
+    Coordinator::Wait() won't block in cancellation."""
+    for state in ['RUNNING', 'FINISHED']:
+      handle = self.execute_query_async(self._union_query, query_options={
+        'retry_failed_queries': 'true', 'spool_query_results': 'true',
+        'spool_all_results_for_retries': 'true'})
+      self.wait_for_state(handle, self.client.QUERY_STATES[state], 60)
+
+      # Cancel the query.
+      self.client.cancel(handle)
+
+      # Assert that attempt to fetch from the query handle fails with a cancellation
+      # error
+      try:
+        self.client.fetch(self._union_query, handle)
+        assert False
+      except Exception as e:
+        assert "Cancelled" in str(e)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(