You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2019/02/21 01:08:34 UTC

[impala] branch 2.x updated: IMPALA-6812: Fix flaky Kudu scan tests

This is an automated email from the ASF dual-hosted git repository.

lv pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/2.x by this push:
     new f9c2a67  IMPALA-6812: Fix flaky Kudu scan tests
f9c2a67 is described below

commit f9c2a67566facbbbd43a1b615d869f5b4decca50
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Wed May 23 23:11:01 2018 +0000

    IMPALA-6812: Fix flaky Kudu scan tests
    
    Many of our Kudu related tests have been flaky with the symptom that
    scans appear to not return rows that were just inserted. This occurs
    because our default Kudu scan level of READ_LATEST doesn't make any
    consistency guarantees.
    
    This patch adds a query option 'kudu_read_mode', which overrides the
    startup flag of the same name, and then set that option to
    READ_AT_SNAPSHOT for all tests with Kudu inserts and scans, which
    should give us more consistent test results.
    
    Testing:
    - Passed a full exhaustive run. Does not appear to increase time to
      run by any significant amount.
    
    Change-Id: I70df84f2cbc663107f2ad029565d3c15bdfbd47c
    Reviewed-on: http://gerrit.cloudera.org:8080/10503
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/12513
    Reviewed-by: Thomas Marshall <tm...@cloudera.com>
---
 be/src/exec/kudu-scanner.cc                | 14 ++++++-----
 be/src/exec/kudu-util.cc                   | 18 ++++++++++++++
 be/src/exec/kudu-util.h                    |  6 +++++
 be/src/service/query-options.cc            | 13 ++++++++++
 be/src/service/query-options.h             |  3 ++-
 be/src/util/debug-util.cc                  |  1 +
 be/src/util/debug-util.h                   |  1 +
 common/thrift/ImpalaInternalService.thrift | 10 ++++++++
 common/thrift/ImpalaService.thrift         |  4 +++
 tests/common/test_dimensions.py            |  9 +++++++
 tests/custom_cluster/test_kudu.py          |  8 ++++++
 tests/metadata/test_ddl.py                 |  1 +
 tests/query_test/test_kudu.py              | 40 +++++++++++++++++++++++++++++-
 13 files changed, 120 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index e63a6fe..7353bf9 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -53,7 +53,8 @@ using kudu::client::KuduTable;
 using kudu::client::KuduValue;
 
 DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. "
-    "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT.");
+    "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Can be overridden "
+    "with the query option of the same name.");
 DEFINE_bool(pick_only_leaders_for_tests, false,
             "Whether to pick only leader replicas, for tests purposes only.");
 DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15,
@@ -64,7 +65,6 @@ DECLARE_int32(kudu_operation_timeout_ms);
 
 namespace impala {
 
-const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
 
 KuduScanner::KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
@@ -154,10 +154,12 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
     KUDU_RETURN_IF_ERROR(scanner_->SetSelection(kudu::client::KuduClient::LEADER_ONLY),
         BuildErrorString("Could not set replica selection"));
   }
-  kudu::client::KuduScanner::ReadMode mode =
-      MODE_READ_AT_SNAPSHOT == FLAGS_kudu_read_mode ?
-          kudu::client::KuduScanner::READ_AT_SNAPSHOT :
-          kudu::client::KuduScanner::READ_LATEST;
+  kudu::client::KuduScanner::ReadMode mode;
+  RETURN_IF_ERROR(StringToKuduReadMode(FLAGS_kudu_read_mode, &mode));
+  if (state_->query_options().kudu_read_mode != TKuduReadMode::DEFAULT) {
+    RETURN_IF_ERROR(StringToKuduReadMode(
+        PrintThriftEnum(state_->query_options().kudu_read_mode), &mode));
+  }
   KUDU_RETURN_IF_ERROR(
       scanner_->SetReadMode(mode), BuildErrorString("Could not set scanner ReadMode"));
   KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms),
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index dd959e6..1751322 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -21,6 +21,7 @@
 #include <string>
 #include <sstream>
 
+#include <boost/algorithm/string.hpp>
 #include <kudu/client/callbacks.h>
 #include <kudu/client/schema.h>
 #include <kudu/common/partial_row.h>
@@ -33,6 +34,7 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 
+using boost::algorithm::iequals;
 using kudu::client::KuduSchema;
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
@@ -46,6 +48,9 @@ DECLARE_int32(kudu_client_rpc_timeout_ms);
 
 namespace impala {
 
+const string MODE_READ_LATEST = "READ_LATEST";
+const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
+
 bool KuduClientIsSupported() {
   // The value below means the client is actually a stubbed client. This should mean
   // that no official client exists for the underlying OS. The value below should match
@@ -287,4 +292,17 @@ Status CreateKuduValue(const ColumnType& col_type, void* value, KuduValue** out)
   return Status::OK();
 }
 
+Status StringToKuduReadMode(
+    const std::string& mode, kudu::client::KuduScanner::ReadMode* out) {
+  if (iequals(mode, MODE_READ_LATEST)) {
+    *out = kudu::client::KuduScanner::READ_LATEST;
+  } else if (iequals(mode, MODE_READ_AT_SNAPSHOT)) {
+    *out = kudu::client::KuduScanner::READ_AT_SNAPSHOT;
+  } else {
+    return Status(Substitute("Invalid kudu_read_mode '$0'. Valid values are READ_LATEST "
+        "and READ_AT_SNAPSHOT.", mode));
+  }
+  return Status::OK();
+}
+
 }  // namespace impala
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 8fe4d01..1d9a9bd 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -108,5 +108,11 @@ inline Status FromKuduStatus(
   return Status::Expected(err_msg);
 }
 
+/// Converts 'mode' to its equivalent ReadMode, stored in 'out'. Possible values for
+/// 'mode' are 'READ_LATEST' and 'READ_AT_SNAPSHOT'. If 'mode' is invalid, an error is
+/// returned.
+Status StringToKuduReadMode(
+    const std::string& mode, kudu::client::KuduScanner::ReadMode* out) WARN_UNUSED_RESULT;
+
 } /// namespace impala
 #endif
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 08b19c7..4074250 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -693,6 +693,19 @@ Status impala::SetQueryOption(const string& key, const string& value,
         }
         break;
       }
+      case TImpalaQueryOptions::KUDU_READ_MODE: {
+        if (iequals(value, "DEFAULT") || iequals(value, "0")) {
+          query_options->__set_kudu_read_mode(TKuduReadMode::DEFAULT);
+        } else if (iequals(value, "READ_LATEST") || iequals(value, "1")) {
+          query_options->__set_kudu_read_mode(TKuduReadMode::READ_LATEST);
+        } else if (iequals(value, "READ_AT_SNAPSHOT") || iequals(value, "2")) {
+          query_options->__set_kudu_read_mode(TKuduReadMode::READ_AT_SNAPSHOT);
+        } else {
+          return Status(Substitute("Invalid kudu_read_mode '$0'. Valid values are "
+              "DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT.", value));
+        }
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index dafa0a0..72d2083 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::THREAD_RESERVATION_AGGREGATE_LIMIT + 1);\
+      TImpalaQueryOptions::KUDU_READ_MODE + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED,\
       TQueryOptionLevel::DEPRECATED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
@@ -141,6 +141,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(thread_reservation_aggregate_limit, THREAD_RESERVATION_AGGREGATE_LIMIT,\
       TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(kudu_read_mode, KUDU_READ_MODE, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index eb55a7c..7a31748 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -68,6 +68,7 @@ PRINT_THRIFT_ENUM_IMPL(THdfsFileFormat)
 PRINT_THRIFT_ENUM_IMPL(THdfsSeqCompressionMode)
 PRINT_THRIFT_ENUM_IMPL(TImpalaQueryOptions)
 PRINT_THRIFT_ENUM_IMPL(TJoinDistributionMode)
+PRINT_THRIFT_ENUM_IMPL(TKuduReadMode)
 PRINT_THRIFT_ENUM_IMPL(TMetricKind)
 PRINT_THRIFT_ENUM_IMPL(TParquetArrayResolution)
 PRINT_THRIFT_ENUM_IMPL(TParquetFallbackSchemaResolution)
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 813f460..0978e50 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -57,6 +57,7 @@ std::string PrintThriftEnum(const THdfsFileFormat::type& value);
 std::string PrintThriftEnum(const THdfsSeqCompressionMode::type& value);
 std::string PrintThriftEnum(const TImpalaQueryOptions::type& value);
 std::string PrintThriftEnum(const TJoinDistributionMode::type& value);
+std::string PrintThriftEnum(const TKuduReadMode::type& value);
 std::string PrintThriftEnum(const TMetricKind::type& value);
 std::string PrintThriftEnum(const TParquetArrayResolution::type& value);
 std::string PrintThriftEnum(const TParquetFallbackSchemaResolution::type& value);
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 2c00090..b1fbe87 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -63,6 +63,13 @@ enum TJoinDistributionMode {
   SHUFFLE
 }
 
+// Consistency level options for Kudu scans.
+enum TKuduReadMode {
+  DEFAULT,
+  READ_LATEST,
+  READ_AT_SNAPSHOT
+}
+
 // Query options that correspond to ImpalaService.ImpalaQueryOptions, with their
 // respective defaults. Query options can be set in the following ways:
 //
@@ -317,6 +324,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift.
   67: optional i32 thread_reservation_aggregate_limit = 0;
+
+  // See comment in ImpalaService.thrift.
+  68: optional TKuduReadMode kudu_read_mode = TKuduReadMode.DEFAULT;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 7b6afef..3114cfb 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -331,6 +331,10 @@ enum TImpalaQueryOptions {
   // across all backends for the query exceeds this number. 0 or -1 means this has no
   // effect.
   THREAD_RESERVATION_AGGREGATE_LIMIT,
+
+  // Overrides the -kudu_read_mode flag to set the consistency level for Kudu scans.
+  // Possible values are DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT.
+  KUDU_READ_MODE,
 }
 
 // The summary of a DML statement.
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index a9ba7a8..b15d439 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -182,6 +182,15 @@ def create_exec_option_dimension_from_dict(exec_option_dimensions):
   # Build a test vector out of it
   return ImpalaTestDimension('exec_option', *exec_option_dimension_values)
 
+
+def add_exec_option_dimension(test_suite, key, value):
+  """
+  Takes an ImpalaTestSuite object 'test_suite' and adds 'key=value' to every exec option
+  test dimension, leaving the number of tests that will be run unchanged.
+  """
+  for v in test_suite.ImpalaTestMatrix.dimensions["exec_option"]:
+    v.value[key] = value
+
 def extend_exec_option_dimension(test_suite, key, value):
   """
   Takes an ImpalaTestSuite object 'test_suite' and extends the exec option test dimension
diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py
index 7563236..d79703d 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -21,6 +21,7 @@ from kudu.schema import INT32
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.kudu_test_suite import KuduTestSuite
+from tests.common.test_dimensions import add_exec_option_dimension
 
 KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
 LOG = logging.getLogger(__name__)
@@ -31,6 +32,13 @@ class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestKuduOperations, cls).add_test_dimensions()
+    # The default read mode of READ_LATEST does not provide high enough consistency for
+    # these tests.
+    add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args=\
       "--use_local_tz_for_unix_timestamp_conversions=true")
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index bcafd8e..39f7f3c 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -259,6 +259,7 @@ class TestDdlStatements(TestDdlBase):
   @UniqueDatabase.parametrize(sync_ddl=True)
   def test_create_kudu(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
+    vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
     self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index d6a0d2a..4a216a3 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -40,6 +40,7 @@ from pytz import utc
 from tests.common.kudu_test_suite import KuduTestSuite
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.skip import SkipIfNotHdfsMinicluster
+from tests.common.test_dimensions import add_exec_option_dimension
 from tests.verifiers.metric_verifier import MetricVerifier
 
 KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
@@ -51,6 +52,13 @@ class TestKuduOperations(KuduTestSuite):
   This suite tests the different modification operations when using a kudu table.
   """
 
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestKuduOperations, cls).add_test_dimensions()
+    # The default read mode of READ_LATEST does not provide high enough consistency for
+    # these tests.
+    add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
+
   def test_out_of_range_timestamps(self, vector, cursor, kudu_client, unique_database):
     """Test timestamp values that are outside of Impala's supported date range."""
     cursor.execute("""CREATE TABLE %s.times (a INT PRIMARY KEY, ts TIMESTAMP)
@@ -299,6 +307,7 @@ class TestKuduOperations(KuduTestSuite):
 
   def test_kudu_col_removed(self, cursor, kudu_client, unique_database):
     """Test removing a Kudu column outside of Impala."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING)
         PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
     assert kudu_client.table_exists(
@@ -366,7 +375,7 @@ class TestKuduOperations(KuduTestSuite):
     table_name = "%s.storage_attrs" % unique_database
     types = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'float', 'double', \
         'string', 'timestamp', 'decimal']
-
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     create_query = "create table %s (id int primary key" % table_name
     for t in types:
       create_query += ", %s_col %s" % (t, t)
@@ -436,6 +445,33 @@ class TestKuduOperations(KuduTestSuite):
         or "Column col1 has unexpected type." in msg \
         or "Client provided column col1[int64 NULLABLE] not present in tablet" in msg
 
+  def _retry_query(self, cursor, query, expected):
+    retries = 0
+    while retries < 3:
+      cursor.execute(query)
+      result = cursor.fetchall()
+      if result == expected:
+        break
+      retries += 1
+      time.sleep(1)
+    assert retries < 3, \
+        "Did not get a correct result for %s after 3 retries: %s" % (query, result)
+
+  def test_read_modes(self, cursor, unique_database):
+    """Other Kudu tests are run with a scan level of READ_AT_SNAPSHOT to have predicable
+    scan results. This test verifies that scans work as expected at the scan level of
+    READ_LATEST by retrying the scan if the results are incorrect."""
+    table_name = "%s.test_read_latest" % unique_database
+    cursor.execute("set kudu_read_mode=READ_LATEST")
+    cursor.execute("""create table %s (a int primary key, b string) partition by hash(a)
+    partitions 8 stored as kudu""" % table_name)
+    cursor.execute("insert into %s values (0, 'a'), (1, 'b'), (2, 'c')" % table_name)
+    self._retry_query(cursor, "select * from %s order by a" % table_name,
+                      [(0, 'a'), (1, 'b'), (2, 'c')])
+    cursor.execute("""insert into %s select id, string_col from functional.alltypes
+    where id > 2 limit 100""" % table_name)
+    self._retry_query(cursor, "select count(*) from %s" % table_name, [(103,)])
+
 class TestCreateExternalTable(KuduTestSuite):
 
   def test_external_timestamp_default_value(self, cursor, kudu_client, unique_database):
@@ -606,6 +642,7 @@ class TestCreateExternalTable(KuduTestSuite):
        unbounded partition). It is not possible to create such a table in Impala, but
        it can be created directly in Kudu and then loaded as an external table.
        Regression test for IMPALA-5154."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     schema_builder = SchemaBuilder()
     column_spec = schema_builder.add_column("id", INT64)
     column_spec.nullable(False)
@@ -637,6 +674,7 @@ class TestCreateExternalTable(KuduTestSuite):
   def test_column_name_case(self, cursor, kudu_client, unique_database):
     """IMPALA-5286: Tests that an external Kudu table that was created with a column name
        containing upper case letters is handled correctly."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     table_name = '%s.kudu_external_test' % unique_database
     if kudu_client.table_exists(table_name):
       kudu_client.delete_table(table_name)