You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/09/10 06:00:21 UTC

[impala] branch master updated (4327cc3 -> 1518351)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 4327cc3  IMPALA-8931: Fix fe trigger for lineage events
     new 19cb8dc  IMPALA-8904: retry statestore RegisterSubscriber() RPC
     new 954e810  IMPALA-8932: shell shouldn't retry kerberos over http
     new 1518351  IMPALA-7312: Non-blocking mode for Fetch() RPC

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/blocking-plan-root-sink.cc             |  28 +++++-
 be/src/exec/buffered-plan-root-sink.cc             |  23 ++++-
 be/src/exec/plan-root-sink.cc                      |   4 +-
 be/src/exec/plan-root-sink.h                       |   8 ++
 be/src/exprs/expr-test.cc                          |   5 +
 be/src/runtime/client-cache.h                      |  27 ++++--
 be/src/runtime/coordinator.cc                      |   4 +
 be/src/runtime/coordinator.h                       |   3 +
 be/src/service/impala-beeswax-server.cc            |   1 -
 be/src/service/impala-hs2-server.cc                |   1 -
 be/src/service/query-options.cc                    |  12 +++
 be/src/service/query-options.h                     |   4 +-
 be/src/statestore/statestore-subscriber.cc         |  22 +++--
 common/thrift/ImpalaInternalService.thrift         |   3 +
 common/thrift/ImpalaService.thrift                 |   8 ++
 shell/impala_client.py                             |   3 +
 shell/impala_shell.py                              |   3 +-
 ...pc_options.py => test_statestore_rpc_errors.py} |  28 +++---
 tests/hs2/test_fetch.py                            | 101 +++++++++++++++++++++
 19 files changed, 245 insertions(+), 43 deletions(-)
 copy tests/custom_cluster/{test_krpc_options.py => test_statestore_rpc_errors.py} (59%)


[impala] 03/03: IMPALA-7312: Non-blocking mode for Fetch() RPC

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 151835116a7972b15a646f8eae6bd8a593bb3564
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Aug 26 19:13:59 2019 -0700

    IMPALA-7312: Non-blocking mode for Fetch() RPC
    
    Adds the query option FETCH_ROWS_TIMEOUT_MS to control the client
    timeout when fetching rows. Set to 10 seconds by default to avoid
    unnecessary fetch requests. Timeout applies when result spooling is
    enabled or disabled.
    
    When result spooling is disabled, the timeout controls how long the
    client thread will wait for a single RowBatch to be produced by the
    coordinator fragment. When result spooling is enabled, a client can
    fetch multiple RowBatches at a time, so the timeout controls the total
    time spent waiting for RowBatches to be produced.
    
    The timeout applies to both waiting for rows to be sent by the fragment
    instance thread, and waiting for rows to be materialized (e.g. the time
    measured by RowMaterializationTimer).
    
    Testing:
    * Added new tests to test_fetch.py
    * Ran core tests
    
    Change-Id: I331acaba23a65dab43cca48e9dc0dc957b9c632d
    Reviewed-on: http://gerrit.cloudera.org:8080/14157
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/blocking-plan-root-sink.cc     |  28 +++++++-
 be/src/exec/buffered-plan-root-sink.cc     |  23 ++++++-
 be/src/exec/plan-root-sink.cc              |   4 +-
 be/src/exec/plan-root-sink.h               |   8 +++
 be/src/exprs/expr-test.cc                  |   5 ++
 be/src/runtime/coordinator.cc              |   4 ++
 be/src/runtime/coordinator.h               |   3 +
 be/src/service/impala-beeswax-server.cc    |   1 -
 be/src/service/impala-hs2-server.cc        |   1 -
 be/src/service/query-options.cc            |  12 ++++
 be/src/service/query-options.h             |   4 +-
 common/thrift/ImpalaInternalService.thrift |   3 +
 common/thrift/ImpalaService.thrift         |   8 +++
 tests/hs2/test_fetch.py                    | 101 +++++++++++++++++++++++++++++
 14 files changed, 196 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/blocking-plan-root-sink.cc b/be/src/exec/blocking-plan-root-sink.cc
index 4396db0..2089192 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -106,6 +106,11 @@ void BlockingPlanRootSink::Cancel(RuntimeState* state) {
 
 Status BlockingPlanRootSink::GetNext(
     RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
+  // Used to track how long the consumer waits for RowBatches to be produced and
+  // materialized.
+  MonotonicStopWatch wait_timeout_timer;
+  wait_timeout_timer.Start();
+
   unique_lock<mutex> l(lock_);
 
   // Set the shared QueryResultSet pointer 'results_' to the given 'results' object and
@@ -114,11 +119,30 @@ Status BlockingPlanRootSink::GetNext(
   num_rows_requested_ = num_results;
   sender_cv_.NotifyAll();
 
+  // True if the consumer timed out waiting for the producer to send rows, false
+  // otherwise.
+  bool timed_out = false;
+
   // Wait while the sender is still producing rows and hasn't filled in the current
   // result set.
   while (sender_state_ == SenderState::ROWS_PENDING && results_ != nullptr
-      && !state->is_cancelled()) {
-    consumer_cv_.Wait(l);
+      && !state->is_cancelled() && !timed_out) {
+    // It is possible for the timeout to expire, and for the QueryResultSet to still have
+    // some rows appended to it. This can happen if the producer acquires the lock, the
+    // timeout expires, and then the producer appends rows to the QueryResultSet. This
+    // does not affect correctness because the producer always sets 'results_' to nullptr
+    // if it appends any rows to the QueryResultSet and it always appends either an entire
+    // RowBatch, or as many rows as requested.
+    uint64_t wait_duration = max(static_cast<uint64_t>(1),
+        PlanRootSink::fetch_rows_timeout_us() - wait_timeout_timer.ElapsedTime());
+    if (!consumer_cv_.WaitFor(l, wait_duration)) {
+      timed_out = true;
+
+      // If the consumer timed out, make sure results_ is set to nullptr because the
+      // consumer will destroy the current QueryResultSet and create a new one for the
+      // next fetch request.
+      results_ = nullptr;
+    }
   }
 
   *eos = sender_state_ == SenderState::EOS;
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 7b14794..fdcc5a6 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -139,6 +139,11 @@ void BufferedPlanRootSink::Cancel(RuntimeState* state) {
 Status BufferedPlanRootSink::GetNext(
     RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
   {
+    // Used to track how long the consumer waits for RowBatches to be produced and
+    // materialized.
+    MonotonicStopWatch wait_timeout_timer;
+    wait_timeout_timer.Start();
+
     unique_lock<mutex> l(lock_);
     *eos = false;
 
@@ -152,13 +157,23 @@ Status BufferedPlanRootSink::GetNext(
     int num_rows_to_read =
         num_results <= 0 ? FETCH_NUM_BATCHES * state->batch_size() : num_results;
 
+    // True if the consumer timed out waiting for the producer to send rows or if the
+    // consumer timed out while materializing rows, false otherwise.
+    bool timed_out = false;
+
     // Read from the queue until all requested rows have been read, or eos is hit.
-    while (!*eos && num_rows_read < num_rows_to_read) {
+    while (!*eos && num_rows_read < num_rows_to_read && !timed_out) {
       // Wait for the queue to have rows in it.
       while (IsQueueEmpty() && sender_state_ == SenderState::ROWS_PENDING
-          && !state->is_cancelled()) {
+          && !state->is_cancelled() && !timed_out) {
+        // Wait fetch_rows_timeout_us_ - row_batches_get_wait_timer_ microseconds for
+        // rows to become available before returning to the client. Subtracting
+        // wait_timeout_timer ensures the client only ever waits up to
+        // fetch_rows_timeout_us_ microseconds before returning.
+        uint64_t wait_duration = max(static_cast<uint64_t>(1),
+            PlanRootSink::fetch_rows_timeout_us() - wait_timeout_timer.ElapsedTime());
         SCOPED_TIMER(row_batches_get_wait_timer_);
-        rows_available_.Wait(l);
+        timed_out = !rows_available_.WaitFor(l, wait_duration);
       }
 
       // If the query was cancelled while the sink was waiting for rows to become
@@ -201,6 +216,8 @@ Status BufferedPlanRootSink::GetNext(
         // Prevent expr result allocations from accumulating.
         expr_results_pool_->Clear();
       }
+      timed_out = timed_out
+          || wait_timeout_timer.ElapsedTime() >= PlanRootSink::fetch_rows_timeout_us();
       *eos = IsQueueEmpty() && sender_state_ == SenderState::EOS;
     }
     if (*eos) consumer_eos_.NotifyOne();
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 43afdd0..9505e06 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -36,7 +36,9 @@ namespace impala {
 PlanRootSink::PlanRootSink(
     TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
   : DataSink(sink_id, row_desc, "PLAN_ROOT_SINK", state),
-    num_rows_produced_limit_(state->query_options().num_rows_produced_limit) {}
+    num_rows_produced_limit_(state->query_options().num_rows_produced_limit),
+    fetch_rows_timeout_us_(
+        MICROS_PER_MILLI * state->query_options().fetch_rows_timeout_ms) {}
 
 PlanRootSink::~PlanRootSink() {}
 
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 7d381ab..7e9c984 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -106,10 +106,18 @@ class PlanRootSink : public DataSink {
   enum class SenderState { ROWS_PENDING, EOS, CLOSED_NOT_EOS };
   SenderState sender_state_ = SenderState::ROWS_PENDING;
 
+  /// Returns the FETCH_ROWS_TIMEOUT_MS value for this query (converted to microseconds).
+  uint64_t fetch_rows_timeout_us() const { return fetch_rows_timeout_us_; }
+
  private:
   /// Limit on the number of rows produced by this query, initialized by the constructor.
   const int64_t num_rows_produced_limit_;
 
+  /// Timeout, in microseconds, when waiting for rows to become available. How long the
+  /// consumer thread waits for the producer thread to produce RowBatches. Derived from
+  /// query option FETCH_ROWS_TIMEOUT_MS.
+  const uint64_t fetch_rows_timeout_us_;
+
   /// Updated by CheckRowsProducedLimit() to indicate the total number of rows produced
   /// by query execution.
   int64_t num_rows_produced_ = 0;
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index a406d29..2eca331 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -286,6 +286,11 @@ class ExprTest : public testing::TestWithParam<std::tuple<bool, bool>> {
     executor_->PushExecOption("EXEC_SINGLE_NODE_ROWS_THRESHOLD=0");
     executor_->PushExecOption("DISABLE_CODEGEN_ROWS_THRESHOLD=0");
 
+    // Some tests select rows that take a long time to materialize (e.g.
+    // "select length(unhex(repeat('a', 1024 * 1024 * 1024)))") so set the client fetch
+    // timeout to a high value.
+    executor_->PushExecOption("FETCH_ROWS_TIMEOUT_MS=100000");
+
     min_int_values_[TYPE_TINYINT] = 1;
     min_int_values_[TYPE_SMALLINT] =
         static_cast<int64_t>(numeric_limits<int8_t>::max()) + 1;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 85b48c2..9fc79cb 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -681,6 +681,10 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
   RuntimeState* runtime_state = coord_instance_->runtime_state();
 
   Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos);
+  if (!first_row_fetched_ && results->size() > 0) {
+    query_events_->MarkEvent("First row fetched");
+    first_row_fetched_ = true;
+  }
   RETURN_IF_ERROR(UpdateExecState(
           status, &runtime_state->fragment_instance_id(), FLAGS_hostname));
   if (*eos) RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 953a111..e984927 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -396,6 +396,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
+  /// True if the first row has been fetched, false otherwise.
+  bool first_row_fetched_ = false;
+
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 899900b..2515394 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -543,7 +543,6 @@ Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
   lock_guard<mutex> l(*request_state->lock());
 
   if (request_state->num_rows_fetched() == 0) {
-    request_state->query_events()->MarkEvent("First row fetched");
     request_state->set_fetched_rows();
   }
 
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 68848f9..56be626 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -195,7 +195,6 @@ Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
   RETURN_IF_ERROR(request_state->query_status());
 
   if (request_state->num_rows_fetched() == 0) {
-    request_state->query_events()->MarkEvent("First row fetched");
     request_state->set_fetched_rows();
   }
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ea73b80..1b684d3 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -865,6 +865,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_disable_hbase_num_rows_estimate(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::FETCH_ROWS_TIMEOUT_MS: {
+        StringParser::ParseResult result;
+        const int64_t requested_timeout =
+            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || requested_timeout < 0) {
+          return Status(
+              Substitute("Invalid fetch rows timeout: '$0'. "
+                         "Only non-negative numbers are allowed.", value));
+        }
+        query_options->__set_fetch_rows_timeout_ms(requested_timeout);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index ce509a7..273bcff 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::DISABLE_HBASE_NUM_ROWS_ESTIMATE + 1);\
+      TImpalaQueryOptions::FETCH_ROWS_TIMEOUT_MS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -186,6 +186,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(max_spilled_result_spooling_mem, MAX_SPILLED_RESULT_SPOOLING_MEM,\
       TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(disable_hbase_num_rows_estimate, DISABLE_HBASE_NUM_ROWS_ESTIMATE,\
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(fetch_rows_timeout_ms, FETCH_ROWS_TIMEOUT_MS,\
       TQueryOptionLevel::ADVANCED)
   ;
 
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 7231d4f..cdc6a63 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -391,6 +391,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   93: optional bool disable_hbase_num_rows_estimate = false;
+
+  // See comment in ImpalaService.thrift
+  94: optional i64 fetch_rows_timeout_ms = 10000;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index d6feb8f..8bfeace 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -452,6 +452,14 @@ enum TImpalaQueryOptions {
   // Disable the normal key sampling of HBase tables in row count and row size estimation.
   // Set this to true will force the use of HMS table stats.
   DISABLE_HBASE_NUM_ROWS_ESTIMATE = 92
+
+  // The maximum amount of time, in milliseconds, a fetch rows request (TFetchResultsReq)
+  // from the client should spend fetching results (including waiting for results to
+  // become available and materialize). When result spooling is enabled, a fetch request
+  // to may read multiple RowBatches, in which case, the timeout controls how long the
+  // client waits for all returned RowBatches to be produced. If the timeout is hit, the
+  // client returns whatever rows it has already read. Defaults to 10000 milliseconds.
+  FETCH_ROWS_TIMEOUT_MS = 93
 }
 
 // The summary of a DML statement.
diff --git a/tests/hs2/test_fetch.py b/tests/hs2/test_fetch.py
index eb416d0..392aa11 100644
--- a/tests/hs2/test_fetch.py
+++ b/tests/hs2/test_fetch.py
@@ -18,11 +18,16 @@
 
 import pytest
 import re
+
+from time import sleep
+from time import time
+from tests.common.errors import Timeout
 from tests.hs2.hs2_test_suite import (HS2TestSuite, needs_session,
     create_op_handle_without_secret)
 from TCLIService import TCLIService, constants
 from TCLIService.ttypes import TTypeId
 
+
 # Simple test to make sure all the HS2 types are supported for both the row and
 # column-oriented versions of the HS2 protocol.
 class TestFetch(HS2TestSuite):
@@ -291,3 +296,99 @@ class TestFetch(HS2TestSuite):
         TCLIService.TGetResultSetMetadataReq(operationHandle=good_handle)))
     HS2TestSuite.check_response(self.hs2_client.CloseOperation(
         TCLIService.TCloseOperationReq(operationHandle=good_handle)))
+
+  @needs_session()
+  def test_fetch_timeout(self):
+    """Test FETCH_ROWS_TIMEOUT_MS with default configs."""
+    self.__test_fetch_timeout()
+    self.__test_fetch_materialization_timeout()
+
+  @needs_session(conf_overlay={'spool_query_results': 'true'})
+  def test_fetch_result_spooling_timeout(self):
+    """Test FETCH_ROWS_TIMEOUT_MS with result spooling enabled, and test that the timeout
+    applies when reading multiple RowBatches."""
+    self.__test_fetch_timeout()
+    self.__test_fetch_materialization_timeout()
+
+    # Validate that the timeout applies when reading multiple RowBatches.
+    num_rows = 100
+    statement = "select id from functional.alltypes limit {0}".format(num_rows)
+    execute_statement_resp = self.execute_statement(statement,
+        conf_overlay={'batch_size': '10',
+                      'debug_action': '0:GETNEXT:DELAY',
+                      'fetch_rows_timeout_ms': '500'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Issue a fetch request to read all rows, and validate that only a subset of the rows
+    # are returned.
+    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
+        operationHandle=execute_statement_resp.operationHandle, maxRows=num_rows))
+    HS2TestSuite.check_response(fetch_results_resp)
+    num_rows_fetched = self.get_num_rows(fetch_results_resp.results)
+    assert num_rows_fetched > 0 and num_rows_fetched < num_rows
+    assert fetch_results_resp.hasMoreRows
+
+    self.__fetch_remaining(execute_statement_resp.operationHandle,
+        num_rows - num_rows_fetched, statement)
+
+  def __test_fetch_timeout(self):
+    """Test the query option FETCH_ROWS_TIMEOUT_MS by running a query with a DELAY
+    DEBUG_ACTION and a low value for the fetch timeout. Validates that when the timeout
+    is hit, 0 rows are returned."""
+    num_rows = 1
+    statement = "select id from functional.alltypes limit {0}".format(num_rows)
+    execute_statement_resp = self.execute_statement(statement,
+        conf_overlay={'debug_action': '0:GETNEXT:DELAY', 'fetch_rows_timeout_ms': '1'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Assert that the first fetch request returns 0 rows.
+    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
+        operationHandle=execute_statement_resp.operationHandle, maxRows=1024))
+    HS2TestSuite.check_response(fetch_results_resp)
+    assert self.get_num_rows(fetch_results_resp.results) == 0
+    assert fetch_results_resp.hasMoreRows
+
+    # Assert that all remaining rows can be fetched.
+    self.__fetch_remaining(execute_statement_resp.operationHandle, num_rows, statement)
+
+  def __test_fetch_materialization_timeout(self):
+    """Test the query option FETCH_ROWS_TIMEOUT_MS applies to the time taken to
+    materialize rows. Runs a query with a sleep() which is evaluated during
+    materialization and validates the timeout is applied appropriately."""
+    num_rows = 2
+    statement = "select sleep(2500) from functional.alltypes limit {0}".format(num_rows)
+    execute_statement_resp = self.execute_statement(statement,
+        conf_overlay={'batch_size': '1', 'fetch_rows_timeout_ms': '3750'})
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # Only one row should be returned because the timeout should be hit after
+    # materializing the first row, but before materializing the second one.
+    fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
+        operationHandle=execute_statement_resp.operationHandle, maxRows=2))
+    HS2TestSuite.check_response(fetch_results_resp)
+    assert self.get_num_rows(fetch_results_resp.results) == 1
+
+    # Assert that all remaining rows can be fetched.
+    self.__fetch_remaining(execute_statement_resp.operationHandle, num_rows - 1,
+        statement)
+
+  def __fetch_remaining(self, op_handle, num_rows, statement):
+    """Fetch the remaining rows in the given op_handle and validate that the number of
+    rows returned matches the expected number of rows. If the op_handle does not return
+    the expected number of rows within a timeout, an error is thrown."""
+    # The timeout to wait for fetch requests to fetch all rows.
+    timeout = 10
+
+    start_time = time()
+    num_fetched = 0
+
+    # Fetch results until either the timeout is hit or all rows have been fetched.
+    while num_fetched != num_rows and time() - start_time < timeout:
+      sleep(0.5)
+      fetch_results_resp = self.hs2_client.FetchResults(TCLIService.TFetchResultsReq(
+          operationHandle=op_handle, maxRows=1024))
+      HS2TestSuite.check_response(fetch_results_resp)
+      num_fetched += self.get_num_rows(fetch_results_resp.results)
+    if num_fetched != num_rows:
+      raise Timeout("Query {0} did not fetch all results within the timeout {1}"
+                    .format(statement, timeout))


[impala] 02/03: IMPALA-8932: shell shouldn't retry kerberos over http

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 954e810b0ec67faca66e68b924b83dd805f455db
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Sep 9 16:48:45 2019 -0700

    IMPALA-8932: shell shouldn't retry kerberos over http
    
    Change-Id: I5dde277a6a0ddbe5a919bcf376bbc19f0b48e95e
    Reviewed-on: http://gerrit.cloudera.org:8080/14201
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 shell/impala_client.py | 3 +++
 shell/impala_shell.py  | 3 ++-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/shell/impala_client.py b/shell/impala_client.py
index 1ba632e..6761e8e 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -376,6 +376,9 @@ class ImpalaClient(object):
           " HTTP transport.")
 
     # HTTP server implemententations do not support SPNEGO yet.
+    # TODO: when we add support for Kerberos+HTTP, we need to re-enable the automatic
+    # kerberos retry logic in impala_shell.py that was disabled for HTTP because of
+    # IMPALA-8932.
     if self.use_kerberos or self.kerberos_host_fqdn:
       print_to_stderr("Kerberos not supported with HTTP endpoints.")
       raise NotImplementedError()
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 487ddf9..a52b8e0 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -813,7 +813,8 @@ class ImpalaShell(object, cmd.Cmd):
     # If the connection fails and the Kerberos has not been enabled,
     # check for a valid kerberos ticket and retry the connection
     # with kerberos enabled.
-    if not self.imp_client.connected and not self.use_kerberos:
+    # IMPALA-8932: Kerberos is not yet supported for hs2-http, so don't retry.
+    if not self.imp_client.connected and not self.use_kerberos and protocol != 'hs2-http':
       try:
         if call(["klist", "-s"]) == 0:
           print_to_stderr("Kerberos ticket found in the credentials cache, retrying "


[impala] 01/03: IMPALA-8904: retry statestore RegisterSubscriber() RPC

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 19cb8dc1c1c2247e91adc4bf62cab27a7c1e4381
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Wed Aug 28 12:05:35 2019 -0700

    IMPALA-8904: retry statestore RegisterSubscriber() RPC
    
    Previously connection failures triggered a retry, but
    failures on the actual RPC did not trigger a retry. This
    change moves the retry loop to DoRpcWithRetry(), instead
    of relying on the ClientCache to retry the connection.
    
    Note that DoRpcWithRetry() for thrift was dead code since
    most backend RPCs were ported to KRPC, but should still work.
    
    Testing:
    Added targeted test with debug action to inject error on first
    subscribe RPC.
    
    Change-Id: I5d4e6283b5ec83170a1d1d03075b3384a9f108b5
    Reviewed-on: http://gerrit.cloudera.org:8080/14198
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/client-cache.h                      | 27 ++++++++-----
 be/src/statestore/statestore-subscriber.cc         | 22 +++++++----
 tests/custom_cluster/test_statestore_rpc_errors.py | 44 ++++++++++++++++++++++
 3 files changed, 77 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index c06b979..b41c9b1 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -33,6 +33,7 @@
 #include "util/debug-util.h"
 #include "util/metrics-fwd.h"
 #include "util/network-util.h"
+#include "util/time.h"
 
 #include "common/status.h"
 
@@ -274,26 +275,34 @@ class ClientConnection {
     static RpcStatus OK() { return {Status::OK(), false}; }
   };
 
-  /// Helper that retries constructing a client and calling DoRpc() up the three times
-  /// and handles both RPC failures and failures to get a client from 'client_cache'.
-  /// 'debug_fn' is a Status-returning function that can be used to inject errors into
-  /// the RPC.
+  /// Helper that retries constructing a client and calling DoRpc() up to 'retries' times
+  /// with 'delay_ms' delay between retries. This handles both RPC failures and failures
+  /// to get a client from 'client_cache'.  'debug_fn' is a Status-returning function that
+  /// can be used to inject errors into the RPC.
   template <class F, class DebugF, class Request, class Response>
   static RpcStatus DoRpcWithRetry(ClientCache<T>* client_cache, TNetworkAddress address,
-      const F& f, const Request& request, const DebugF& debug_fn, Response* response) {
+      const F& f, const Request& request, int retries, int64_t delay_ms,
+      const DebugF& debug_fn, Response* response) {
     Status rpc_status;
     Status client_status;
 
-    // Try to send the RPC 3 times before failing.
-    for (int i = 0; i < 3; ++i) {
-      ImpalaBackendConnection client(client_cache, address, &client_status);
+    // Try to send the RPC as many times as requested before failing.
+    for (int i = 0; i < retries; ++i) {
+      if (i > 0) SleepForMs(delay_ms); // Delay before retrying.
+      ClientConnection<T> client(client_cache, address, &client_status);
       if (!client_status.ok()) continue;
 
       rpc_status = debug_fn();
-      if (!rpc_status.ok()) continue;
+      if (!rpc_status.ok()) {
+        LOG(INFO) << "Injected RPC error to " << TNetworkAddressToString(address) << ": "
+                  << rpc_status.GetDetail();
+        continue;
+      }
 
       rpc_status = client.DoRpc(f, request, response);
       if (rpc_status.ok()) break;
+      LOG(INFO) << "RPC to " << TNetworkAddressToString(address) << " failed "
+                << rpc_status.GetDetail();
     }
     if (!client_status.ok()) return {client_status, true};
     if (!rpc_status.ok()) return {rpc_status, false};
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 9ed64e2..6952fe5 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -64,6 +64,7 @@ DEFINE_int64_hidden(statestore_subscriber_recovery_grace_period_ms, 30000L, "Per
     "considered fully recovered. After a successful reconnect attempt, updates to the "
     "cluster membership will only become effective after this period has elapsed.");
 
+DECLARE_string(debug_actions);
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
 DECLARE_string(ssl_private_key);
@@ -124,8 +125,7 @@ StatestoreSubscriber::StatestoreSubscriber(const string& subscriber_id,
       failure_detector_(new TimeoutFailureDetector(
           seconds(FLAGS_statestore_subscriber_timeout_seconds),
           seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
-      client_cache_(new StatestoreClientCache(FLAGS_statestore_subscriber_cnxn_attempts,
-                FLAGS_statestore_subscriber_cnxn_retry_interval_ms, 0, 0, "",
+      client_cache_(new StatestoreClientCache(1, 0, 0, 0, "",
                 !FLAGS_ssl_client_ca_certificate.empty())),
       metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
       heartbeat_address_(heartbeat_address),
@@ -172,9 +172,6 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
 
 Status StatestoreSubscriber::Register() {
   Status client_status;
-  StatestoreServiceConn client(client_cache_.get(), statestore_address_, &client_status);
-  RETURN_IF_ERROR(client_status);
-
   TRegisterSubscriberRequest request;
   for (const auto& registration : topic_registrations_) {
     TTopicRegistration thrift_topic;
@@ -189,8 +186,19 @@ Status StatestoreSubscriber::Register() {
   request.subscriber_location = heartbeat_address_;
   request.subscriber_id = subscriber_id_;
   TRegisterSubscriberResponse response;
-  RETURN_IF_ERROR(client.DoRpc(&StatestoreServiceClientWrapper::RegisterSubscriber,
-      request, &response));
+  int attempt = 0; // Used for debug action only.
+  StatestoreServiceConn::RpcStatus rpc_status =
+      StatestoreServiceConn::DoRpcWithRetry(client_cache_.get(), statestore_address_,
+          &StatestoreServiceClientWrapper::RegisterSubscriber, request,
+          FLAGS_statestore_subscriber_cnxn_attempts,
+          FLAGS_statestore_subscriber_cnxn_retry_interval_ms,
+          [&attempt]() {
+            return attempt++ == 0 ?
+                DebugAction(FLAGS_debug_actions, "REGISTER_SUBSCRIBER_FIRST_ATTEMPT") :
+                Status::OK();
+          },
+          &response);
+  RETURN_IF_ERROR(rpc_status.status);
   Status status = Status(response.status);
   if (status.ok()) {
     connected_to_statestore_metric_->SetValue(true);
diff --git a/tests/custom_cluster/test_statestore_rpc_errors.py b/tests/custom_cluster/test_statestore_rpc_errors.py
new file mode 100644
index 0000000..da6817d
--- /dev/null
+++ b/tests/custom_cluster/test_statestore_rpc_errors.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestStatestoreRpcErrors(CustomClusterTestSuite):
+  """Tests for statestore RPC handling."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestStatestoreRpcErrors, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      " --debug_actions=REGISTER_SUBSCRIBER_FIRST_ATTEMPT:FAIL@1.0")
+  def test_register_subscriber_rpc_error(self, vector):
+    self.assert_impalad_log_contains("INFO",
+        "Injected RPC error.*Debug Action: REGISTER_SUBSCRIBER_FIRST_ATTEMPT")
+
+    # Ensure cluster has started up by running a query.
+    result = self.execute_query("select count(*) from functional_parquet.alltypes")
+    assert result.success, str(result)