You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2023/12/14 14:01:49 UTC

(impala) branch master updated: IMPALA-12322: Support converting UTC timestamps read from Kudu to local time

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 3af193022 IMPALA-12322: Support converting UTC timestamps read from Kudu to local time
3af193022 is described below

commit 3af193022916e42c33d6eafafb6f9560a0789895
Author: Eyizoha <ey...@163.com>
AuthorDate: Wed Nov 8 18:03:35 2023 +0800

    IMPALA-12322: Support converting UTC timestamps read from Kudu to local time
    
    This patch adds a query option 'convert_kudu_utc_timestamps' similar to
    'convert_legacy_hive_parquet_utc_timestamps'. When enabled, it converts
    UTC timestamps read from Kudu to local timestamps.
    
    The corresponding modification also include predicate pushdown and
    runtime filter. Due to the ambiguity of timestamps caused by daylight
    saving time changes, it is difficult to resolve in the bloom filter.
    This patch additionally introduces a query option
    'disable_kudu_local_timestamp_bloom_filter' to default disable the Kudu
    timestamp bloom filter after enabling time zone conversion in order to
    avoid erroneously filtering out data. However, for regions that do not
    observe daylight saving time, it can be set to false to re-enable the
    Kudu local timestamp bloom filter.
    
    Testing:
    - Add TestKuduTimestampConvert in query_test/test_kudu.py
    Perform end-to-end testing in a custom cluster, including basic Kudu UTC
    timestamp conversion testing, as well as checking if related predicate
    pushdown and runtime filters are working correctly (even with timestamps
    involving daylight saving time conversions).
    
    Change-Id: I9a1e7a13e617cc18deef14289cf9b958588397d3
    Reviewed-on: http://gerrit.cloudera.org:8080/20681
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/exec/kudu/kudu-scanner.cc                   | 37 ++++++++-
 be/src/exec/kudu/kudu-scanner.h                    | 10 +++
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  3 +-
 be/src/exprs/timestamp-functions.cc                | 50 +++++++++++
 be/src/exprs/timestamp-functions.h                 | 13 +++
 be/src/runtime/runtime-state.cc                    |  5 --
 be/src/runtime/runtime-state.h                     |  8 --
 be/src/runtime/timestamp-value.cc                  | 40 +++++++--
 be/src/runtime/timestamp-value.h                   | 19 ++++-
 be/src/service/query-options.cc                    |  8 ++
 be/src/service/query-options.h                     |  6 +-
 bin/rat_exclude_files.txt                          |  1 +
 common/function-registry/impala_functions.py       |  3 +
 common/thrift/ImpalaService.thrift                 | 14 ++++
 common/thrift/Query.thrift                         |  6 ++
 .../org/apache/impala/planner/KuduScanNode.java    | 97 +++++++++++++++++++++-
 .../impala/planner/RuntimeFilterGenerator.java     | 20 +++++
 .../main/java/org/apache/impala/util/ExprUtil.java | 42 ++++++++--
 testdata/data/timestamp_at_dst_changes.txt         | 10 +++
 .../functional/functional_schema_template.sql      | 28 +++++++
 .../datasets/functional/schema_constraints.csv     |  6 ++
 .../kudu_predicate_with_timestamp_conversion.test  | 74 +++++++++++++++++
 ...u_runtime_filter_with_timestamp_conversion.test | 73 ++++++++++++++++
 .../QueryTest/kudu_timestamp_conversion.test       | 27 ++++++
 tests/query_test/test_kudu.py                      | 29 +++++++
 25 files changed, 592 insertions(+), 37 deletions(-)

diff --git a/be/src/exec/kudu/kudu-scanner.cc b/be/src/exec/kudu/kudu-scanner.cc
index 5eda30ded..0a4a4e374 100644
--- a/be/src/exec/kudu/kudu-scanner.cc
+++ b/be/src/exec/kudu/kudu-scanner.cc
@@ -285,6 +285,18 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
           max = &int_max;
         }
 
+        TimestampValue ts_min;
+        TimestampValue ts_max;
+        if (state_->query_options().convert_kudu_utc_timestamps &&
+            col_type.type == TYPE_TIMESTAMP) {
+          ts_min = *reinterpret_cast<const TimestampValue*>(min);
+          ts_max = *reinterpret_cast<const TimestampValue*>(max);
+          ConvertLocalTimeMinStatToUTC(&ts_min);
+          ConvertLocalTimeMaxStatToUTC(&ts_max);
+          min = &ts_min;
+          max = &ts_max;
+        }
+
         KuduValue* min_value;
         RETURN_IF_ERROR(CreateKuduValue(col_type, min, &min_value));
         KUDU_RETURN_IF_ERROR(scanner_->AddConjunctPredicate(
@@ -391,7 +403,14 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me
       }
       int64_t ts_micros = *reinterpret_cast<int64_t*>(
           kudu_tuple->GetSlot(slot->tuple_offset()));
-      TimestampValue tv = TimestampValue::UtcFromUnixTimeMicros(ts_micros);
+
+      TimestampValue tv;
+      if (state_->query_options().convert_kudu_utc_timestamps) {
+        tv = TimestampValue::FromUnixTimeMicros(ts_micros, state_->local_time_zone());
+      } else {
+        tv = TimestampValue::UtcFromUnixTimeMicros(ts_micros);
+      }
+
       if (tv.HasDateAndTime()) {
         RawValue::Write(&tv, kudu_tuple, slot, nullptr);
       } else {
@@ -463,4 +482,20 @@ string KuduScanner::BuildErrorString(const char* msg) const {
       msg, scan_node_->id(), scan_node_->table_desc()->table_name());
 }
 
+void KuduScanner::ConvertLocalTimeMinStatToUTC(TimestampValue* v) const {
+  if (!v->HasDateAndTime()) return;
+  TimestampValue pre_repeated_utc_time;
+  const Timezone* local_tz = state_->local_time_zone();
+  v->LocalToUtc(*local_tz, &pre_repeated_utc_time);
+  if (pre_repeated_utc_time.HasDateAndTime()) *v = pre_repeated_utc_time;
+}
+
+void KuduScanner::ConvertLocalTimeMaxStatToUTC(TimestampValue* v) const {
+  if (!v->HasDateAndTime()) return;
+  TimestampValue post_repeated_utc_time;
+  const Timezone* local_tz = state_->local_time_zone();
+  v->LocalToUtc(*local_tz, nullptr, &post_repeated_utc_time);
+  if (post_repeated_utc_time.HasDateAndTime()) *v = post_repeated_utc_time;
+}
+
 }  // namespace impala
diff --git a/be/src/exec/kudu/kudu-scanner.h b/be/src/exec/kudu/kudu-scanner.h
index 1250e19ab..48ef4134f 100644
--- a/be/src/exec/kudu/kudu-scanner.h
+++ b/be/src/exec/kudu/kudu-scanner.h
@@ -88,6 +88,16 @@ class KuduScanner {
   /// Closes the current kudu::client::KuduScanner.
   void CloseCurrentClientScanner();
 
+  /// Convert the 'v' from local timezone to UTC, and for those ambiguous conversions,
+  /// if timestamp t >= v before conversion, then this function converts v in such a way
+  /// that the same will be true after t is converted.
+  void ConvertLocalTimeMinStatToUTC(TimestampValue* v) const;
+
+  /// Convert the 'v' from local timezone to UTC, and for those ambiguous conversions,
+  /// if timestamp t <= v before conversion, then this function converts v in such a way
+  /// that the same will be true after t is converted.
+  void ConvertLocalTimeMaxStatToUTC(TimestampValue* v) const;
+
   inline Tuple* next_tuple(Tuple* t) const {
     uint8_t* mem = reinterpret_cast<uint8_t*>(t);
     return reinterpret_cast<Tuple*>(mem + scan_node_->tuple_desc()->byte_size());
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 6fe0fd264..cc7c92a7b 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -3120,7 +3120,8 @@ ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder(
     const parquet::SchemaElement& element) {
   bool timestamp_conversion_needed_for_int96_timestamps =
       file_version_.application == "parquet-mr" &&
-      state_->time_zone_for_legacy_parquet_time_conversions() != UTCPTR;
+      state_->query_options().convert_legacy_hive_parquet_utc_timestamps &&
+      state_->local_time_zone() != UTCPTR;
 
   return ParquetTimestampDecoder(element, state_->local_time_zone(),
       timestamp_conversion_needed_for_int96_timestamps);
diff --git a/be/src/exprs/timestamp-functions.cc b/be/src/exprs/timestamp-functions.cc
index 760fe2667..e9a1e60a3 100644
--- a/be/src/exprs/timestamp-functions.cc
+++ b/be/src/exprs/timestamp-functions.cc
@@ -150,6 +150,56 @@ TimestampVal TimestampFunctions::ToUtc(FunctionContext* context,
   return ts_val_ret;
 }
 
+TimestampVal TimestampFunctions::ToUtcUnambiguous(FunctionContext* context,
+    const TimestampVal& ts_val, const StringVal& tz_string_val,
+    const BooleanVal& expect_pre_bool_val) {
+  if (ts_val.is_null || tz_string_val.is_null) return TimestampVal::null();
+  const TimestampValue& ts_value = TimestampValue::FromTimestampVal(ts_val);
+  if (!ts_value.HasDateAndTime()) return TimestampVal::null();
+
+  const StringValue& tz_string_value = StringValue::FromStringVal(tz_string_val);
+  const Timezone* timezone = TimezoneDatabase::FindTimezone(
+      string(tz_string_value.Ptr(), tz_string_value.Len()));
+  if (UNLIKELY(timezone == nullptr)) {
+    // Although this is an error, Hive ignores it. We will issue a warning but otherwise
+    // ignore the error too.
+    stringstream ss;
+    ss << "Unknown timezone '" << tz_string_value << "'" << endl;
+    context->AddWarning(ss.str().c_str());
+    return ts_val;
+  }
+
+  TimestampValue ts_value_ret = ts_value;
+  TimestampValue pre_utc_if_repeated;
+  TimestampValue post_utc_if_repeated;
+  ts_value_ret.LocalToUtc(*timezone, &pre_utc_if_repeated, &post_utc_if_repeated);
+
+  TimestampVal ts_val_ret;
+  if (LIKELY(ts_value_ret.HasDateAndTime())) {
+    ts_value_ret.ToTimestampVal(&ts_val_ret);
+  } else {
+    if (expect_pre_bool_val.is_null) {
+      const string& msg =
+          Substitute("Timestamp '$0' in timezone '$1' could not be converted to UTC",
+              ts_value.ToString(), tz_string_value.DebugString());
+      context->AddWarning(msg.c_str());
+      return TimestampVal::null();
+    }
+    if (expect_pre_bool_val.val) {
+      pre_utc_if_repeated.ToTimestampVal(&ts_val_ret);
+    } else {
+      if (pre_utc_if_repeated == post_utc_if_repeated) {
+        return TimestampVal::null();
+      } else {
+        post_utc_if_repeated.ToTimestampVal(&ts_val_ret);
+      }
+    }
+  }
+
+  return ts_val_ret;
+}
+
+
 // The purpose of making this .cc only is to avoid moving the code of
 // CastDirection into the .h file
 void UnixAndFromUnixPrepare(FunctionContext* context,
diff --git a/be/src/exprs/timestamp-functions.h b/be/src/exprs/timestamp-functions.h
index 5c940c63f..4c4dd914a 100644
--- a/be/src/exprs/timestamp-functions.h
+++ b/be/src/exprs/timestamp-functions.h
@@ -159,6 +159,19 @@ class TimestampFunctions {
   static TimestampVal ToUtc(FunctionContext* context,
       const TimestampVal& ts_val, const StringVal& tz_string_val);
 
+  /// Convert a timestamp in particular timezone to UTC unambiguously.
+  /// If the conversion is unique or 'expect_pre_bool_val' is null, this function behaves
+  /// consistently with ToUtc.
+  /// In cases of ambiguous conversion (e.g., when the timestamp falls within the DST
+  /// repeated interval), if 'expect_pre_bool_val' is true, it returns the previous
+  /// possible value, otherwise it returns the posterior possible value.
+  /// In cases of invalid conversion (e.g., when the timestamp falls within the DST
+  /// skipped interval), if 'expect_pre_bool_val' is true, it returns the transition point
+  /// value, otherwise it returns null.
+  static TimestampVal ToUtcUnambiguous(FunctionContext* context,
+      const TimestampVal& ts_val, const StringVal& tz_string_val,
+      const BooleanVal& expect_pre_bool_val);
+
   /// Functions to extract parts of the timestamp, return integers.
   static IntVal Year(FunctionContext* context, const TimestampVal& ts_val);
   static IntVal Quarter(FunctionContext* context, const TimestampVal& ts_val);
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index d9c83e624..319cac423 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -84,7 +84,6 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragment& fragmen
         query_state->query_ctx().utc_timestamp_string))),
     local_time_zone_(UTCPTR),
     time_zone_for_unix_time_conversions_(UTCPTR),
-    time_zone_for_legacy_parquet_time_conversions_(UTCPTR),
     profile_(RuntimeProfile::Create(
         obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))),
     instance_buffer_reservation_(obj_pool()->Add(new ReservationTracker)) {
@@ -111,7 +110,6 @@ RuntimeState::RuntimeState(
         qctx.utc_timestamp_string))),
     local_time_zone_(UTCPTR),
     time_zone_for_unix_time_conversions_(UTCPTR),
-    time_zone_for_legacy_parquet_time_conversions_(UTCPTR),
     profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")),
     instance_buffer_reservation_(nullptr) {
   // We may use execution resources while evaluating exprs, etc. Decremented in
@@ -177,9 +175,6 @@ void RuntimeState::Init() {
     if (query_options().use_local_tz_for_unix_timestamp_conversions) {
       time_zone_for_unix_time_conversions_ = local_time_zone_;
     }
-    if (query_options().convert_legacy_hive_parquet_utc_timestamps) {
-      time_zone_for_legacy_parquet_time_conversions_ = local_time_zone_;
-    }
   }
 }
 
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 5d00df7ae..6b5f39ebb 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -124,9 +124,6 @@ class RuntimeState {
   const Timezone* time_zone_for_unix_time_conversions() const {
     return time_zone_for_unix_time_conversions_;
   }
-  const Timezone* time_zone_for_legacy_parquet_time_conversions() const {
-    return time_zone_for_legacy_parquet_time_conversions_;
-  }
   const TUniqueId& query_id() const { return query_ctx().query_id; }
   const TUniqueId& fragment_instance_id() const {
     return instance_ctx_ != nullptr
@@ -359,11 +356,6 @@ class RuntimeState {
   /// instead.
   const Timezone* time_zone_for_unix_time_conversions_;
 
-  /// Query-global timezone used to read INT96 timestamps written by Hive. UTC by default,
-  /// but if convert_legacy_hive_parquet_utc_timestamps=1, then the local_time_zone_ is
-  /// used instead.
-  const Timezone* time_zone_for_legacy_parquet_time_conversions_;
-
   /// Thread resource management object for this fragment's execution.  The runtime
   /// state is responsible for returning this pool to the thread mgr.
   std::unique_ptr<ThreadResourcePool> resource_pool_;
diff --git a/be/src/runtime/timestamp-value.cc b/be/src/runtime/timestamp-value.cc
index cc8ef971e..417ef467d 100644
--- a/be/src/runtime/timestamp-value.cc
+++ b/be/src/runtime/timestamp-value.cc
@@ -155,8 +155,11 @@ void TimestampValue::UtcToLocal(const Timezone& local_tz,
   }
 }
 
-void TimestampValue::LocalToUtc(const Timezone& local_tz) {
+void TimestampValue::LocalToUtc(const Timezone& local_tz,
+    TimestampValue* pre_utc_if_repeated, TimestampValue* post_utc_if_repeated) {
   DCHECK(HasDateAndTime());
+  // Time-zone conversion rules don't affect fractional seconds, leave them intact.
+  const auto nanos = nanoseconds(time_.fractional_seconds());
   const cctz::civil_second from_cs(date_.year(), date_.month(), date_.day(),
       time_.hours(), time_.minutes(), time_.seconds());
 
@@ -164,18 +167,37 @@ void TimestampValue::LocalToUtc(const Timezone& local_tz) {
   // 'local_tz' time-zone.
   const cctz::time_zone::civil_lookup from_cl = local_tz.lookup(from_cs);
 
-  // In case the resulting 'time_point' is ambiguous, we have to invalidate
-  // TimestampValue.
+  if (LIKELY(from_cl.kind == cctz::time_zone::civil_lookup::UNIQUE)) {
+    *this = UtcFromUnixTimeTicks<1>(TimePointToUnixTime(from_cl.pre));
+    time_ += nanos;
+    return;
+  }
+
+  // In case the resulting 'time_point' is ambiguous, we have to invalidate this
+  // TimestampValue and set pre/post_utc_if_repeated if needed.
   // 'civil_lookup' members and the details of handling ambiguity are described at:
   // https://github.com/google/cctz/blob/a2dd3d0fbc811fe0a1d4d2dbb0341f1a3d28cb2a/
   // include/cctz/time_zone.h#L106
-  if (UNLIKELY(from_cl.kind != cctz::time_zone::civil_lookup::UNIQUE)) {
-    SetToInvalidDateTime();
+  SetToInvalidDateTime();
+  if (from_cl.kind == cctz::time_zone::civil_lookup::REPEATED){
+    if (pre_utc_if_repeated != nullptr) {
+      *pre_utc_if_repeated = UtcFromUnixTimeTicks<1>(TimePointToUnixTime(from_cl.pre));
+      pre_utc_if_repeated->time_ += nanos;
+    }
+    if (post_utc_if_repeated != nullptr) {
+      *post_utc_if_repeated = UtcFromUnixTimeTicks<1>(TimePointToUnixTime(from_cl.post));
+      post_utc_if_repeated->time_ += nanos;
+    }
   } else {
-    int64_t nanos = time_.fractional_seconds();
-    *this = UtcFromUnixTimeTicks<1>(TimePointToUnixTime(from_cl.pre));
-    // Time-zone conversion rules don't affect fractional seconds, leave them intact.
-    time_ += nanoseconds(nanos);
+    DCHECK(from_cl.kind == cctz::time_zone::civil_lookup::SKIPPED);
+    if (pre_utc_if_repeated != nullptr) {
+      *pre_utc_if_repeated = UtcFromUnixTimeTicks<1>(TimePointToUnixTime(from_cl.trans));
+      pre_utc_if_repeated->time_ += nanos;
+    }
+    if (post_utc_if_repeated != nullptr) {
+      *post_utc_if_repeated = UtcFromUnixTimeTicks<1>(TimePointToUnixTime(from_cl.trans));
+      post_utc_if_repeated->time_ += nanos;
+    }
   }
 }
 
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 9b630a92c..f1fb126a8 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -298,7 +298,24 @@ class TimestampValue {
 
   /// Converts from 'local_tz' to UTC time zone in-place. The caller must ensure the
   /// TimestampValue this function is called upon has both a valid date and time.
-  void LocalToUtc(const Timezone& local_tz);
+  ///
+  /// If pre/post_utc_if_repeated is not nullptr and timestamp falls into an interval
+  /// where conversion is ambiguous (e.g. Summer->Winter DST change on Northern
+  /// hemisphere), then these arguments are set to the previous/posterior of possible UTC
+  /// timestamp. Or if the timestamp falls into a skipped interval (e.g. Winter->Summer
+  /// DST change on Northern hemisphere), then these arguments will be both set to the UTC
+  /// timestamp of the transition point, and if the above conditions occur 'this' will be
+  /// set to invalid timestamp.
+  ///
+  /// The pre/post_utc_if_repeated is useful to get some ordering guarantees in the case
+  /// when the order of two timestamps is different in UTC and local time (e.g CET Autumn
+  /// dst change 00:30:00 -> 02:30:00 vs 01:15:00 -> 02:15:00) - any timestamp that is
+  /// earlier than 'this' in local time is guaranteed to be earlier than
+  /// 'post_utc_if_repeated' in UTC, and any timestamp later than 'this' in local time is
+  /// guaranteed to be later than 'pre_utc_if_repeated' in UTC.
+  void LocalToUtc(const Timezone& local_tz,
+      TimestampValue* pre_utc_if_repeated = nullptr,
+      TimestampValue* post_utc_if_repeated = nullptr);
 
   void set_date(const boost::gregorian::date d) { date_ = d; Validate(); }
   void set_time(const boost::posix_time::time_duration t) { time_ = t; Validate(); }
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 4678a7d19..9d743ba9e 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -909,6 +909,14 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_convert_legacy_hive_parquet_utc_timestamps(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::CONVERT_KUDU_UTC_TIMESTAMPS: {
+        query_options->__set_convert_kudu_utc_timestamps(IsTrue(value));
+        break;
+      }
+      case TImpalaQueryOptions::DISABLE_KUDU_LOCAL_TIMESTAMP_BLOOM_FILTER: {
+        query_options->__set_disable_kudu_local_timestamp_bloom_filter(IsTrue(value));
+        break;
+      }
       case TImpalaQueryOptions::ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION: {
         query_options->__set_enable_outer_join_to_inner_transformation(IsTrue(value));
         break;
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index c420e0743..3406d4fc9 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                                 \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
-      TImpalaQueryOptions::KUDU_TABLE_RESERVE_SECONDS + 1);                              \
+      TImpalaQueryOptions::DISABLE_KUDU_LOCAL_TIMESTAMP_BLOOM_FILTER + 1);               \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
@@ -313,6 +313,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(codegen_opt_level, CODEGEN_OPT_LEVEL, TQueryOptionLevel::ADVANCED)        \
   QUERY_OPT_FN(kudu_table_reserve_seconds, KUDU_TABLE_RESERVE_SECONDS,                   \
       TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(convert_kudu_utc_timestamps,                                              \
+      CONVERT_KUDU_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED)                          \
+  QUERY_OPT_FN(disable_kudu_local_timestamp_bloom_filter,                                \
+      DISABLE_KUDU_LOCAL_TIMESTAMP_BLOOM_FILTER, TQueryOptionLevel::ADVANCED)            \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index f6bb10f9f..43d5df250 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -168,6 +168,7 @@ testdata/data/text-dollar-hash-pipe.txt
 testdata/data/dateless_timestamps.txt
 testdata/data/text_large_zstd.txt
 testdata/data/text_large_zstd.zst
+testdata/data/timestamp_at_dst_changes.txt
 testdata/data/widerow.txt
 testdata/data/local_tbl/00000.txt
 testdata/data/hudi_parquet/*
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index 1326e100f..7a5516e34 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -1071,6 +1071,9 @@ visible_functions = [
 ]
 
 invisible_functions = [
+  [['to_utc_timestamp'], 'TIMESTAMP', ['TIMESTAMP', 'STRING', 'BOOLEAN'],
+      "impala::TimestampFunctions::ToUtcUnambiguous"],
+
   [['months_add_interval'], 'TIMESTAMP', ['TIMESTAMP', 'INT'],
       '_ZN6impala18TimestampFunctions6AddSubILb1EN10impala_udf6IntValEN5boost9date_time15months_durationINS4_9gregorian21greg_durations_configEEELb0EEENS2_12TimestampValEPNS2_15FunctionContextERKSA_RKT0_'],
   [['months_add_interval'], 'TIMESTAMP', ['TIMESTAMP', 'BIGINT'],
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index d2fb3a046..ff14d2a09 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -878,6 +878,20 @@ enum TImpalaQueryOptions {
   // During this time deleted Kudu tables can be recovered by Kudu's 'recall table' API.
   // See KUDU-3326 for details.
   KUDU_TABLE_RESERVE_SECONDS = 168
+
+  // When true, TIMESTAMPs read from Kudu will be converted from UTC to local time.
+  // Writes are unaffected.
+  CONVERT_KUDU_UTC_TIMESTAMPS = 169
+
+  // This only makes sense when 'CONVERT_KUDU_UTC_TIMESTAMPS' is true. When true, it
+  // disables the bloom filter for Kudu's timestamp type, because using local timestamp in
+  // Kudu bloom filter may cause missing rows.
+  // Local timestamp convert to UTC could be ambiguous in the case of DST change.
+  // We can only put one of the two possible UTC timestamps in the bloom filter
+  // for now, which may cause missing rows that have the other UTC timestamp.
+  // For those regions that do not observe DST, could set this flag to false
+  // to re-enable kudu local timestamp bloom filter.
+  DISABLE_KUDU_LOCAL_TIMESTAMP_BLOOM_FILTER = 170
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index e6e5ded5a..2ed9134db 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -681,6 +681,12 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   169: optional i32 kudu_table_reserve_seconds = 0;
+
+  // See comment in ImpalaService.thrift
+  170: optional bool convert_kudu_utc_timestamps = false;
+
+  // See comment in ImpalaService.thrift
+  171: optional bool disable_kudu_local_timestamp_bloom_filter = true;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index d2ec18c91..2ae581525 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -44,7 +44,9 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.InternalException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TKuduReplicaSelection;
@@ -128,6 +130,16 @@ public class KuduScanNode extends ScanNode {
   // for Kudu primary keys by enabling small query optimization.
   boolean isPointLookupQuery_ = false;
 
+  // It is used to indicate current kudu predicate should not be removed from conjuncts.
+  // If the current predicate is a comparison predicate with ambiguous timestamp, it may
+  // need to check again after actually scanning. For example, if we have two rows with
+  // column 'ts' 01:40:00(Local, UTC is 05:40:00) and 01:20:00(Local, UTC is 06:20:00),
+  // and the predicate 'ts' < 01:30:00(Local, convert to UTC is 05:30:00 or 06:30:00), we
+  // should push ts < 06:30:00(UTC) to Kudu to avoid missing the row with ts
+  // 01:20:00(Local, UTC is 06:20:00) and also need to filter out the row with ts
+  // 01:40:00(Local, UTC is 05:40:00) after actually scanning.
+  boolean currentPredicateNeedCheckAgain_ = false;
+
   public KuduScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts,
       MultiAggregateInfo aggInfo, TableRef kuduTblRef) {
     super(id, desc, "SCAN KUDU");
@@ -492,7 +504,11 @@ public class KuduScanNode extends ScanNode {
               primaryKeyColsInEqualPred) ||
           tryConvertInListKuduPredicate(analyzer, rpcTable, predicate) ||
           tryConvertIsNullKuduPredicate(analyzer, rpcTable, predicate)) {
-        it.remove();
+        if (currentPredicateNeedCheckAgain_) {
+          currentPredicateNeedCheckAgain_ = false;
+        } else {
+          it.remove();
+        }
       }
     }
     if (primaryKeyColsInEqualPred.size() >= 1 &&
@@ -567,8 +583,10 @@ public class KuduScanNode extends ScanNode {
       case TIMESTAMP: {
         try {
           // TODO: Simplify when Impala supports a 64-bit TIMESTAMP type.
-          kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
-              ExprUtil.utcTimestampToUnixTimeMicros(analyzer, literal));
+          kuduPredicate = analyzer.getQueryOptions().isConvert_kudu_utc_timestamps() ?
+              convertLocalTimestampBinaryKuduPredicate(analyzer, column, op, literal) :
+              KuduPredicate.newComparisonPredicate(column, op,
+                  ExprUtil.utcTimestampToUnixTimeMicros(analyzer, literal));
         } catch (Exception e) {
           LOG.info("Exception converting Kudu timestamp predicate: " + expr.toSql(), e);
           return false;
@@ -601,6 +619,54 @@ public class KuduScanNode extends ScanNode {
     return true;
   }
 
+  private KuduPredicate convertLocalTimestampBinaryKuduPredicate(Analyzer analyzer,
+      ColumnSchema column, ComparisonOp op, LiteralExpr literal)
+      throws AnalysisException, InternalException {
+    Long preUnixTimeMicros =
+        ExprUtil.localTimestampToUnixTimeMicros(analyzer, literal, true);
+    Long postUnixTimeMicros =
+        ExprUtil.localTimestampToUnixTimeMicros(analyzer, literal, false);
+    // If the timestamp is not a valid local timestamp, EQUAL predicate should be always
+    // false. For other comparison predicates, we could use the transition point time as
+    // a common value for comparison.
+    if (preUnixTimeMicros == null || postUnixTimeMicros == null) {
+      if (preUnixTimeMicros == null) return null; // should not happen
+      if (op == ComparisonOp.EQUAL) {
+        // An empty IN LIST predicate is always false.
+        return KuduPredicate.newInListPredicate(column, Lists.newArrayList());
+      } else {
+        postUnixTimeMicros = preUnixTimeMicros;
+      }
+    }
+    // If the timestamp is unique, create the predicate normally.
+    if (preUnixTimeMicros.equals(postUnixTimeMicros)) {
+      return KuduPredicate.newComparisonPredicate(column, op, preUnixTimeMicros);
+    }
+    // If the timestamp is ambiguous, we should convert EQUAL predicate to an IN LIST
+    // predicate that include all ambiguous values. For comparison predicates, we need to
+    // use a larger range of possible values for comparison to avoid missing rows.
+    // Additionally, set currentPredicateNeedCheckAgain_ to true to indicate that the
+    // predicate should not removed from the conjuncts_ list.
+    switch (op) {
+      case EQUAL: return KuduPredicate.newInListPredicate(column,
+          Lists.newArrayList(preUnixTimeMicros, postUnixTimeMicros));
+      case LESS:
+      case LESS_EQUAL: {
+        currentPredicateNeedCheckAgain_ = true;
+        return KuduPredicate.newComparisonPredicate(column, op,
+            postUnixTimeMicros);
+      }
+      case GREATER:
+      case GREATER_EQUAL: {
+        currentPredicateNeedCheckAgain_ = true;
+        return KuduPredicate.newComparisonPredicate(column, op,
+            preUnixTimeMicros);
+      }
+      default:
+        throw new InternalException("Unexpected operator: " + op);
+    }
+  }
+
   /**
    * If the InList 'expr' can be converted to a KuduPredicate, returns true and updates
    * kuduPredicates_ and kuduConjuncts_.
@@ -628,7 +694,11 @@ public class KuduScanNode extends ScanNode {
 
       Object value = getKuduInListValue(analyzer, literal);
       if (value == null) return false;
-      values.add(value);
+      if (value instanceof List) {
+        values.addAll((List<?>) value);
+      } else {
+        values.add(value);
+      }
     }
 
     String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
@@ -671,6 +741,10 @@ public class KuduScanNode extends ScanNode {
    * Return the value of the InList child expression 'e' as an Object that can be
    * added to a KuduPredicate. If the Expr is not supported by Kudu or the type doesn't
    * match the expected PrimitiveType 'type', null is returned.
+   * Additionally, if the query option 'convert_kudu_utc_timestamps' is enabled and when
+   * the expression 'e' is converted from a local timestamp to a UTC timestamp, it is
+   * invalid or ambiguous, the method will return either an empty list or a list
+   * containing two ambiguous values.
    */
   private static Object getKuduInListValue(Analyzer analyzer, LiteralExpr e) {
     switch (e.getType().getPrimitiveType()) {
@@ -685,6 +759,21 @@ public class KuduScanNode extends ScanNode {
       case TIMESTAMP: {
         try {
           // TODO: Simplify when Impala supports a 64-bit TIMESTAMP type.
+          if (analyzer.getQueryOptions().isConvert_kudu_utc_timestamps()) {
+            Long preUnixTimeMicros =
+                ExprUtil.localTimestampToUnixTimeMicros(analyzer, e, true);
+            Long postUnixTimeMicros =
+                ExprUtil.localTimestampToUnixTimeMicros(analyzer, e, false);
+            // If the timestamp is invalid in local time, return empty list.
+            if (preUnixTimeMicros == null || postUnixTimeMicros == null) {
+              if (preUnixTimeMicros == null) return null; // should not happen
+              return Lists.newArrayList();
+            }
+            // If the timestamp is unique, return the unique value.
+            if (preUnixTimeMicros.equals(postUnixTimeMicros)) return preUnixTimeMicros;
+            // If the timestamp is ambiguous, return a list of the two possible values.
+            return Lists.newArrayList(preUnixTimeMicros, postUnixTimeMicros);
+          }
           return ExprUtil.utcTimestampToUnixTimeMicros(analyzer, e);
         } catch (Exception ex) {
           LOG.info("Exception converting Kudu timestamp expr: " + e.toSql(), ex);
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index fc69cdc06..904904d15 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -43,6 +43,7 @@ import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.Predicate;
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
+import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
@@ -482,6 +483,12 @@ public final class RuntimeFilterGenerator {
       if (isTimestampTruncation) {
         Preconditions.checkArgument(srcExpr.isAnalyzed());
         Preconditions.checkArgument(srcExpr.getType() == Type.TIMESTAMP);
+        // The filter is targeted for Kudu scan node with source timestamp truncation.
+        if (analyzer.getQueryOptions().isConvert_kudu_utc_timestamps()) {
+          List<Expr> params = Lists.newArrayList(srcExpr,
+              new StringLiteral(analyzer.getQueryCtx().getLocal_time_zone()));
+          srcExpr = new FunctionCallExpr("to_utc_timestamp", params);
+        }
         Expr toUnixTimeExpr =
             new FunctionCallExpr("utc_to_unix_micros", Lists.newArrayList(srcExpr));
         try {
@@ -1170,6 +1177,19 @@ public final class RuntimeFilterGenerator {
           }
           SlotRef slotRef = (SlotRef) targetExpr;
           if (slotRef.getDesc().getColumn() == null) continue;
+          if (filter.isTimestampTruncation() &&
+              analyzer.getQueryOptions().isConvert_kudu_utc_timestamps() &&
+              analyzer.getQueryOptions().isDisable_kudu_local_timestamp_bloom_filter()) {
+            // Local timestamp convert to UTC could be ambiguous in the case of DST
+            // change. We can only put one of the two possible UTC timestamps in the bloom
+            // filter for now, which may cause missing rows that have the other UTC
+            // timestamp.
+            // For those regions that do not observe DST, could set this flag to false
+            // to re-enable kudu local timestamp bloom filter.
+            LOG.info("Skipping runtime filter because kudu local timestamp bloom filter "
+                + "is disabled: " + filter.getSrcExpr().toSql());
+            continue;
+          }
         } else if (filter.getType() == TRuntimeFilterType.MIN_MAX) {
           Preconditions.checkState(
               enabledRuntimeFilterTypes.contains(TRuntimeFilterType.MIN_MAX),
diff --git a/fe/src/main/java/org/apache/impala/util/ExprUtil.java b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
index 0adb557b4..87a0a8c29 100644
--- a/fe/src/main/java/org/apache/impala/util/ExprUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.SelectListItem;
@@ -83,9 +84,32 @@ public class ExprUtil {
   public static long localTimestampToUnixTimeMicros(Analyzer analyzer, Expr timestampExpr)
       throws AnalysisException, InternalException {
     return utcTimestampToUnixTimeMicros(analyzer,
-        toUtcTimestampExpr(analyzer, timestampExpr));
+        toUtcTimestampExpr(analyzer, timestampExpr, null));
   }
 
+  /**
+   * Converts a timestamp in local timezone to UTC, then to UNIX microseconds.
+   * If expectPreIfNonUnique is null, expect the conversion to be unique and returns
+   * the unique value, otherwise it returns null.
+   * In cases of ambiguous conversion (e.g., when the timestamp falls within the DST
+   * repeated interval), if 'expectPreIfNonUnique' is true, it returns the previous
+   * possible value, otherwise it returns the posterior possible value.
+   * In cases of invalid conversion (e.g., when the timestamp falls within the DST
+   * skipped interval), if 'expectPreIfNonUnique' is true, it returns the transition point
+   * value, otherwise it returns null.
+   */
+  public static Long localTimestampToUnixTimeMicros(Analyzer analyzer, Expr timestampExpr,
+      Boolean expectPreIfNonUnique) throws AnalysisException, InternalException {
+    Expr toUtcTimestampExpr = toUtcTimestampExpr(analyzer, timestampExpr,
+        expectPreIfNonUnique);
+    Expr toUnixTimeExpr = new FunctionCallExpr("utc_to_unix_micros",
+        Lists.newArrayList(toUtcTimestampExpr));
+    toUnixTimeExpr.analyze(analyzer);
+    TColumnValue result = FeSupport.EvalExprWithoutRow(toUnixTimeExpr,
+        analyzer.getQueryCtx());
+    if (!result.isSetLong_val()) return null;
+    return result.getLong_val();
+  }
 
   /**
    * Converts a timestamp in local timezone to string value.
@@ -93,17 +117,21 @@ public class ExprUtil {
   public static String localTimestampToString(Analyzer analyzer, Expr timestampExpr)
       throws AnalysisException, InternalException {
     return utcTimestampToSpecifiedTimeZoneTimestamp(analyzer,
-        toUtcTimestampExpr(analyzer, timestampExpr));
+        toUtcTimestampExpr(analyzer, timestampExpr, null));
   }
 
-  private static Expr toUtcTimestampExpr(Analyzer analyzer, Expr timestampExpr)
-      throws AnalysisException {
+  private static Expr toUtcTimestampExpr(Analyzer analyzer, Expr timestampExpr,
+      Boolean expectPreIfNonUnique) throws AnalysisException {
     Preconditions.checkArgument(timestampExpr.isAnalyzed());
     Preconditions.checkArgument(timestampExpr.isConstant());
     Preconditions.checkArgument(timestampExpr.getType() == Type.TIMESTAMP);
-    Expr toUtcTimestamp = new FunctionCallExpr("to_utc_timestamp",
-        Lists.newArrayList(timestampExpr,
-            new StringLiteral(analyzer.getQueryCtx().getLocal_time_zone())));
+    List<Expr> params = Lists.newArrayList(timestampExpr,
+        new StringLiteral(analyzer.getQueryCtx().getLocal_time_zone()));
+    if (expectPreIfNonUnique != null) {
+      params.add(new BoolLiteral(expectPreIfNonUnique));
+    }
+    FunctionCallExpr toUtcTimestamp = new FunctionCallExpr("to_utc_timestamp", params);
+    toUtcTimestamp.setIsInternalFnCall(true);
     toUtcTimestamp.analyze(analyzer);
     return toUtcTimestamp;
   }
diff --git a/testdata/data/timestamp_at_dst_changes.txt b/testdata/data/timestamp_at_dst_changes.txt
new file mode 100644
index 000000000..314caf0bd
--- /dev/null
+++ b/testdata/data/timestamp_at_dst_changes.txt
@@ -0,0 +1,10 @@
+1,1300006800,2011-03-13 09:00:00
+2,1300008600,2011-03-13 09:30:00
+3,1300010400,2011-03-13 10:00:00
+4,1300012200,2011-03-13 10:30:00
+5,1320566400,2011-11-06 08:00:00
+6,1320567600,2011-11-06 08:20:00
+7,1320568800,2011-11-06 08:40:00
+8,1320570000,2011-11-06 09:00:00
+9,1320571200,2011-11-06 09:20:00
+10,1320572400,2011-11-06 09:40:00
\ No newline at end of file
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 6ee4de13a..491224841 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -4402,3 +4402,31 @@ transactional=false
 ---- DEPENDENT_LOAD
 LOAD DATA LOCAL INPATH '{impala_home}/testdata/data/empty_present_stream.orc' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
 ====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+timestamp_at_dst_changes
+---- COLUMNS
+id int
+unixtime bigint
+ts timestamp
+---- ROW_FORMAT
+delimited fields terminated by ','  escaped by '\\'
+---- DEPENDENT_LOAD
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT * FROM {db_name}.{table_name};
+---- LOAD
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/data/timestamp_at_dst_changes.txt'
+OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+---- CREATE_KUDU
+DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name};
+CREATE TABLE {db_name}{db_suffix}.{table_name} (
+  id INT PRIMARY KEY,
+  unixtime BIGINT,
+  ts TIMESTAMP
+)
+PARTITION BY HASH (id) PARTITIONS 3 STORED AS KUDU;
+---- DEPENDENT_LOAD_KUDU
+INSERT into TABLE {db_name}{db_suffix}.{table_name}
+SELECT * FROM {db_name}.{table_name};
+====
\ No newline at end of file
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index d99aac234..b067580c3 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -284,6 +284,7 @@ table_name:decimal_tiny, constraint:only, table_format:kudu/none/none
 table_name:strings_with_quotes, constraint:only, table_format:kudu/none/none
 table_name:manynulls, constraint:only, table_format:kudu/none/none
 table_name:date_tbl, constraint:only, table_format:kudu/none/none
+table_name:timestamp_at_dst_changes, constraint:only, table_format:kudu/none/none
 
 # Skipping header lines is only effective with text tables
 table_name:table_with_header, constraint:restrict_to, table_format:text/none/none
@@ -406,3 +407,8 @@ table_name:empty_parquet_page_source_impala10186, constraint:restrict_to, table_
 
 # The table is used as test coverage for ORC-1304
 table_name:empty_stream_tbl, constraint:restrict_to, table_format:orc/def/block
+
+# The table is used to test DST changes in timestamps, the timestamps in the table near
+# DST changes in the 'America/Los_Angeles' time zone.
+table_name:timestamp_at_dst_changes, constraint:restrict_to, table_format:text/none/none
+table_name:timestamp_at_dst_changes, constraint:restrict_to, table_format:kudu/none/none
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_predicate_with_timestamp_conversion.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_predicate_with_timestamp_conversion.test
new file mode 100644
index 000000000..ccd7cb9dd
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_predicate_with_timestamp_conversion.test
@@ -0,0 +1,74 @@
+====
+---- QUERY
+# Test that kudu equal predicate with timestamps can be correctly pushed down in the local
+# time. '2011-03-13 02:00:00' is invalid in timezone America/Los_Angeles, so the total
+# RowsRead should be 0.
+select count(*) from timestamp_at_dst_changes where ts = '2011-03-13 02:00:00';
+---- TYPES
+BIGINT
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Test that kudu equal predicate with timestamps can be correctly pushed down in the local
+# time. '2011-11-06 01:00:00' is repeated in timezone America/Los_Angeles, so the total
+# RowsRead should be 2 (2011-11-06 08:00:00 and 09:00:00 in UTC).
+select * from timestamp_at_dst_changes where ts = '2011-11-06 01:00:00';
+---- TYPES
+BIGINT,BIGINT,TIMESTAMP
+---- RESULTS
+5,1320566400,2011-11-06 01:00:00
+8,1320570000,2011-11-06 01:00:00
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+====
+---- QUERY
+# Test that kudu greater predicate with timestamps can be correctly pushed down in the
+# local time. '2011-11-06 01:30:00' in timezone America/Los_Angeles could be converted to
+# 2011-11-06 08:30:00 or 09:30:00 in UTC, we should use the previous value 08:30:00 for
+# greater compare to avoid missing rows, so the total RowsRead should be 4
+# (2011-11-06 08:40:00, 09:00:00, 09:20:00 and 09:40:00 in UTC).
+select * from timestamp_at_dst_changes where ts > '2011-11-06 01:30:00';
+---- TYPES
+BIGINT,BIGINT,TIMESTAMP
+---- RESULTS
+7,1320568800,2011-11-06 01:40:00
+10,1320572400,2011-11-06 01:40:00
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 4
+====
+---- QUERY
+# Test that kudu less predicate with timestamps can be correctly pushed down in the
+# local time. for similar reasons of greater predicate, we should use the posterior value
+# for less compare
+select * from timestamp_at_dst_changes where ts < '2011-11-06 01:30:00';
+---- TYPES
+BIGINT,BIGINT,TIMESTAMP
+---- RESULTS
+1,1300006800,2011-03-13 01:00:00
+2,1300008600,2011-03-13 01:30:00
+3,1300010400,2011-03-13 03:00:00
+4,1300012200,2011-03-13 03:30:00
+5,1320566400,2011-11-06 01:00:00
+6,1320567600,2011-11-06 01:20:00
+8,1320570000,2011-11-06 01:00:00
+9,1320571200,2011-11-06 01:20:00
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 9
+====
+---- QUERY
+# Test that kudu inlist predicate with timestamps can be correctly pushed down in the
+# local time. for similar reasons of equal predicate, we should remove from list for
+# invalid timestamp and put both possible value in list for repeated timestamp.
+select * from timestamp_at_dst_changes where ts in
+  ('2011-03-13 02:00:00', '2011-11-06 01:00:00');
+---- TYPES
+BIGINT,BIGINT,TIMESTAMP
+---- RESULTS
+5,1320566400,2011-11-06 01:00:00
+8,1320570000,2011-11-06 01:00:00
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_runtime_filter_with_timestamp_conversion.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_runtime_filter_with_timestamp_conversion.test
new file mode 100644
index 000000000..66afae507
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_runtime_filter_with_timestamp_conversion.test
@@ -0,0 +1,73 @@
+====
+---- QUERY
+# Test that kudu runtime filter with timestamps can be work correctly in the local time.
+set timezone='America/Los_Angeles';
+select straight_join t1.id, t1.ts, t2.id, from_utc_timestamp(t2.ts, 'America/Los_Angeles')
+  from timestamp_at_dst_changes t1
+  join functional.timestamp_at_dst_changes t2
+  on t1.ts = from_utc_timestamp(t2.ts, 'America/Los_Angeles')
+  order by t1.id, t2.id;
+---- TYPES
+BIGINT,TIMESTAMP,BIGINT,TIMESTAMP
+---- RESULTS
+1,2011-03-13 01:00:00,1,2011-03-13 01:00:00
+2,2011-03-13 01:30:00,2,2011-03-13 01:30:00
+3,2011-03-13 03:00:00,3,2011-03-13 03:00:00
+4,2011-03-13 03:30:00,4,2011-03-13 03:30:00
+5,2011-11-06 01:00:00,5,2011-11-06 01:00:00
+5,2011-11-06 01:00:00,8,2011-11-06 01:00:00
+6,2011-11-06 01:20:00,6,2011-11-06 01:20:00
+6,2011-11-06 01:20:00,9,2011-11-06 01:20:00
+7,2011-11-06 01:40:00,7,2011-11-06 01:40:00
+7,2011-11-06 01:40:00,10,2011-11-06 01:40:00
+8,2011-11-06 01:00:00,5,2011-11-06 01:00:00
+8,2011-11-06 01:00:00,8,2011-11-06 01:00:00
+9,2011-11-06 01:20:00,6,2011-11-06 01:20:00
+9,2011-11-06 01:20:00,9,2011-11-06 01:20:00
+10,2011-11-06 01:40:00,7,2011-11-06 01:40:00
+10,2011-11-06 01:40:00,10,2011-11-06 01:40:00
+---- RUNTIME_PROFILE
+row_regex: .*RF00.\[min_max\] <- from_utc_timestamp\(t2.ts, 'America/Los_Angeles'\).*
+====
+---- QUERY
+# Test that kudu min_max runtime filter with timestamps can be work correctly in the
+# local time even dst changes.
+set timezone='America/Los_Angeles';
+select straight_join * from timestamp_at_dst_changes t1
+  join (select '2011-11-06 01:00:00' as ts) t2 on t1.ts = t2.ts;
+---- TYPES
+BIGINT,BIGINT,TIMESTAMP,TIMESTAMP
+---- RESULTS
+5,1320566400,2011-11-06 01:00:00,2011-11-06 01:00:00
+8,1320570000,2011-11-06 01:00:00,2011-11-06 01:00:00
+---- RUNTIME_PROFILE
+row_regex: .*RF00.\[min_max\] <- t2.ts.*
+====
+---- QUERY
+# Test that kudu bloom runtime filter with timestamps can be work correctly in the
+# local time without DST changes. After using kudu timestamp conversion, the kudu bloom
+# filter for target columns of timestamp type is disabled by default to avoid missing rows
+# due to DST ambiguity. If it is certain that the time zone being used does not observe
+# DST, the Bloom filter can be re-enabled using disable_kudu_local_timestamp_bloom_filter.
+set timezone='Asia/Shanghai';
+set disable_kudu_local_timestamp_bloom_filter=false;
+select straight_join t1.id, t1.ts, t2.ts from timestamp_at_dst_changes t1
+  join functional.timestamp_at_dst_changes t2
+  on t1.ts = from_utc_timestamp(t2.ts, 'Asia/Shanghai')
+  order by t1.id;
+---- TYPES
+BIGINT,TIMESTAMP,TIMESTAMP
+---- RESULTS
+1,2011-03-13 17:00:00,2011-03-13 09:00:00
+2,2011-03-13 17:30:00,2011-03-13 09:30:00
+3,2011-03-13 18:00:00,2011-03-13 10:00:00
+4,2011-03-13 18:30:00,2011-03-13 10:30:00
+5,2011-11-06 16:00:00,2011-11-06 08:00:00
+6,2011-11-06 16:20:00,2011-11-06 08:20:00
+7,2011-11-06 16:40:00,2011-11-06 08:40:00
+8,2011-11-06 17:00:00,2011-11-06 09:00:00
+9,2011-11-06 17:20:00,2011-11-06 09:20:00
+10,2011-11-06 17:40:00,2011-11-06 09:40:00
+---- RUNTIME_PROFILE
+row_regex: .*RF00.\[bloom\] <- utc_to_unix_micros\(to_utc_timestamp\(from_utc_timestamp\(t2.ts, 'Asia/Shanghai'\), 'Asia/Shanghai'\)\).*
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_timestamp_conversion.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_timestamp_conversion.test
new file mode 100644
index 000000000..c62b2a3a3
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_timestamp_conversion.test
@@ -0,0 +1,27 @@
+====
+---- QUERY
+# Test that kutu UTC timestamp can be convert to local time.
+select id, from_unixtime(unixtime), ts from timestamp_at_dst_changes;
+---- TYPES
+BIGINT,TIMESTAMP,TIMESTAMP
+---- RESULTS
+1,2011-03-13 09:00:00,2011-03-13 01:00:00
+2,2011-03-13 09:30:00,2011-03-13 01:30:00
+3,2011-03-13 10:00:00,2011-03-13 03:00:00
+4,2011-03-13 10:30:00,2011-03-13 03:30:00
+5,2011-11-06 08:00:00,2011-11-06 01:00:00
+6,2011-11-06 08:20:00,2011-11-06 01:20:00
+7,2011-11-06 08:40:00,2011-11-06 01:40:00
+8,2011-11-06 09:00:00,2011-11-06 01:00:00
+9,2011-11-06 09:20:00,2011-11-06 01:20:00
+10,2011-11-06 09:40:00,2011-11-06 01:40:00
+====
+---- QUERY
+# Test that kutu UTC timestamp conversion results are consistent with from_utc_timestamp().
+select count(*) from functional.alltypes t1 join alltypes t2
+on from_utc_timestamp(t1.timestamp_col, 'America/Los_Angeles') = t2.timestamp_col;
+---- TYPES
+BIGINT
+---- RESULTS
+7300
+====
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index ea3c3388b..03f5842c9 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -17,6 +17,8 @@
 
 from __future__ import absolute_import, division, print_function
 from builtins import range
+from copy import deepcopy
+
 from kudu.schema import (
     BOOL,
     DOUBLE,
@@ -93,6 +95,33 @@ class TestKuduBasicDML(KuduTestSuite):
       vector,
       use_db=unique_database)
 
+
+class TestKuduTimestampConvert(KuduTestSuite):
+  """
+  This suite tests converts UTC timestamps read from kudu table to local time.
+  """
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestKuduTimestampConvert, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_mandatory_exec_option('convert_kudu_utc_timestamps', 'true')
+    cls.ImpalaTestMatrix.add_mandatory_exec_option('timezone', '"America/Los_Angeles"')
+
+  @SkipIfKudu.no_hybrid_clock
+  def test_kudu_timestamp_conversion(self, vector):
+    self.run_test_case('QueryTest/kudu_timestamp_conversion', vector)
+
+  @SkipIfKudu.no_hybrid_clock
+  def test_kudu_predicate_with_timestamp_conversion(self, vector):
+    self.run_test_case('QueryTest/kudu_predicate_with_timestamp_conversion', vector)
+
+  @SkipIfKudu.no_hybrid_clock
+  def test_kudu_runtime_filter_with_timestamp_conversion(self, vector):
+    new_vector = deepcopy(vector)
+    del new_vector.get_value('exec_option')['timezone']  # .test file sets timezone
+    self.run_test_case('QueryTest/kudu_runtime_filter_with_timestamp_conversion',
+        new_vector)
+
+
 # TODO(IMPALA-8614): parameterize some tests to run with HMS integration enabled.
 class TestKuduOperations(KuduTestSuite):
   """