You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/04/09 03:23:53 UTC

[impala] 02/02: IMPALA-8857: Fix flaky Kudu tests with external inserts

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 49cdd785637c82d7ba3f38be7e5c13e2b7272c2e
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Wed Apr 1 12:59:39 2020 -0700

    IMPALA-8857: Fix flaky Kudu tests with external inserts
    
    There are several Kudu related tests that are inherently flaky because
    they insert rows into a Kudu table from the python Kudu client and
    then scan the table from an Impala client even though no guarantees
    are made about the consistency of operations from different clients.
    
    This patch fixes this by adding a query option
    'kudu_snapshot_read_timestamp_micros' which sets the timestamp at
    which to perform Kudu snapshot reads.
    
    The tests are then modified to set the timestamp to be equal to the
    latest observed timestamp returned by the client that did the inserts
    (plus 1 microsecond which the Kudu python client adds automatically
    in latest_observed_timestamp() for this use case).
    
    Change-Id: I5b787f6542dc31dcd846f19576a060a89aec891d
    Reviewed-on: http://gerrit.cloudera.org:8080/15633
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/kudu-scanner.cc                |  5 +++++
 be/src/service/query-options.cc            | 11 +++++++++++
 be/src/service/query-options.h             |  6 ++++--
 common/thrift/ImpalaInternalService.thrift |  3 +++
 common/thrift/ImpalaService.thrift         |  4 ++++
 tests/query_test/test_kudu.py              | 16 ++++++++++++++++
 6 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 96adc28..b3a5e68 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -200,6 +200,11 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
   }
   KUDU_RETURN_IF_ERROR(
       scanner_->SetReadMode(mode), BuildErrorString("Could not set scanner ReadMode"));
+  if (state_->query_options().kudu_snapshot_read_timestamp_micros > 0) {
+    KUDU_RETURN_IF_ERROR(scanner_->SetSnapshotMicros(
+        state_->query_options().kudu_snapshot_read_timestamp_micros),
+        BuildErrorString("Could not set snapshot timestamp"));
+  }
   KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms),
       BuildErrorString("Could not set scanner timeout"));
   VLOG_ROW << "Starting KuduScanner with ReadMode=" << mode
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 50483c7..a0e7964 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -884,6 +884,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_max_cnf_exprs(requested_max_cnf_exprs);
         break;
       }
+      case TImpalaQueryOptions::KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS: {
+        StringParser::ParseResult result;
+        const int64_t timestamp =
+            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || timestamp < 0) {
+          return Status(Substitute("Invalid Kudu snapshot read timestamp: Only "
+              "non-negative numbers are allowed.", value));
+        }
+        query_options->__set_kudu_snapshot_read_timestamp_micros(timestamp);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 6413505..f77a09d 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MAX_CNF_EXPRS + 1);\
+      TImpalaQueryOptions::KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -196,7 +196,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(preagg_bytes_limit, PREAGG_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(max_cnf_exprs, MAX_CNF_EXPRS, TQueryOptionLevel::ADVANCED)
+  QUERY_OPT_FN(max_cnf_exprs, MAX_CNF_EXPRS, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(kudu_snapshot_read_timestamp_micros, KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS,\
+      TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 02704bf..1ca85b4 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -416,6 +416,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   101: optional i32 max_cnf_exprs = 0;
+
+  // See comment in ImpalaService.thrift
+  102: optional i64 kudu_snapshot_read_timestamp_micros = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 2e5fa7d..0d56503 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -516,6 +516,10 @@ enum TImpalaQueryOptions {
   // a disjunctive expression to CNF. Each AND counts as 1 expression. A value of
   // -1 or 0 means no limit. Default is 0 (unlimited).
   MAX_CNF_EXPRS = 100
+
+  // Set the timestamp for Kudu snapshot reads in Unix time micros. Only valid if
+  // KUDU_READ_MODE is set to READ_AT_SNAPSHOT.
+  KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS = 101
 }
 
 // The summary of a DML statement.
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index a0b072f..eee3ed5 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -28,6 +28,7 @@ from kudu.schema import (
     BINARY,
     UNIXTIME_MICROS)
 from kudu.client import Partitioning
+from kudu.util import to_unixtime_micros
 import logging
 import pytest
 import random
@@ -66,6 +67,7 @@ class TestKuduOperations(KuduTestSuite):
   @SkipIfKudu.hms_integration_enabled
   def test_out_of_range_timestamps(self, vector, cursor, kudu_client, unique_database):
     """Test timestamp values that are outside of Impala's supported date range."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     cursor.execute("""CREATE TABLE %s.times (a INT PRIMARY KEY, ts TIMESTAMP)
         PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
     assert kudu_client.table_exists(
@@ -81,6 +83,8 @@ class TestKuduOperations(KuduTestSuite):
     #session.apply(table.new_insert((2, datetime(12000, 1, 1, 0, 0, tzinfo=utc))))
     session.flush()
 
+    cursor.execute("set kudu_snapshot_read_timestamp_micros=%s" %
+        to_unixtime_micros(kudu_client.latest_observed_timestamp()))
     # TODO: The test driver should have a way to specify query options in an 'options'
     # section rather than having to split abort_on_error cases into separate files.
     vector.get_value('exec_option')['abort_on_error'] = 0
@@ -167,6 +171,7 @@ class TestKuduOperations(KuduTestSuite):
       self, cursor, kudu_client, unique_database, cluster_properties):
     """Test changing a Kudu column outside of Impala results in a failure on read with
        outdated metadata (IMPALA-4828)."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING)
         PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
     assert kudu_client.table_exists(
@@ -191,6 +196,8 @@ class TestKuduOperations(KuduTestSuite):
       session.apply(op)
     session.flush()
 
+    cursor.execute("set kudu_snapshot_read_timestamp_micros=%s" %
+        to_unixtime_micros(kudu_client.latest_observed_timestamp()))
     # Scanning should result in an error with Catalog V1, since the metadata is cached.
     try:
       cursor.execute("SELECT * FROM %s.foo" % (unique_database))
@@ -212,6 +219,7 @@ class TestKuduOperations(KuduTestSuite):
       self, cursor, kudu_client, unique_database, cluster_properties):
     """Test changing a NOT NULL Kudu column outside of Impala results in a failure
        on read with outdated metadata (IMPALA-4828)."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING NOT NULL)
         PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
     assert kudu_client.table_exists(
@@ -236,6 +244,8 @@ class TestKuduOperations(KuduTestSuite):
       session.apply(op)
     session.flush()
 
+    cursor.execute("set kudu_snapshot_read_timestamp_micros=%s" %
+        to_unixtime_micros(kudu_client.latest_observed_timestamp()))
     # Scanning should result in an error
     try:
       cursor.execute("SELECT * FROM %s.foo" % (unique_database))
@@ -258,6 +268,7 @@ class TestKuduOperations(KuduTestSuite):
       self, cursor, kudu_client, unique_database, cluster_properties):
     """Test changing a NULL Kudu column outside of Impala results in a failure
        on read with outdated metadata (IMPALA-4828)."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING NULL)
         PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
     assert kudu_client.table_exists(
@@ -282,6 +293,8 @@ class TestKuduOperations(KuduTestSuite):
       session.apply(op)
     session.flush()
 
+    cursor.execute("set kudu_snapshot_read_timestamp_micros=%s" %
+        to_unixtime_micros(kudu_client.latest_observed_timestamp()))
     # Scanning should result in an error
     try:
       cursor.execute("SELECT * FROM %s.foo" % (unique_database))
@@ -302,6 +315,7 @@ class TestKuduOperations(KuduTestSuite):
 
   def test_kudu_col_added(self, cursor, kudu_client, unique_database, cluster_properties):
     """Test adding a Kudu column outside of Impala."""
+    cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
     cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY)
         PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
     assert kudu_client.table_exists(
@@ -322,6 +336,8 @@ class TestKuduOperations(KuduTestSuite):
     session.apply(op)
     session.flush()
 
+    cursor.execute("set kudu_snapshot_read_timestamp_micros=%s" %
+        to_unixtime_micros(kudu_client.latest_observed_timestamp()))
     cursor.execute("SELECT * FROM %s.foo" % (unique_database))
     if cluster_properties.is_catalog_v2_cluster():
       # Changes in Kudu should be immediately visible to Impala with Catalog V2.