You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/07/19 22:26:21 UTC

[1/3] incubator-impala git commit: IMPALA-4674: add new message to stress test

Repository: incubator-impala
Updated Branches:
  refs/heads/master bc56d3c48 -> 7a1ff1e5e


IMPALA-4674: add new message to stress test

The main IMPALA-4674 commit adds a new OOM failure mode where the query
can't get its minimum reservation during query startup. The new message
includes the string "failed to get minimum memory reservation" along
with some additional context.

Testing:
Ran a stress test using the modified script. Verified it treats failure
to get minimum reservation in the same way as mem limit exceeded.

Change-Id: I1f5e227084dfd50369a9908975305fa5e571c8a8
Reviewed-on: http://gerrit.cloudera.org:8080/7458
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/db5103dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/db5103dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/db5103dc

Branch: refs/heads/master
Commit: db5103dc9fd083344a3216c7687400dc2cade2a0
Parents: bc56d3c
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Jul 18 16:07:51 2017 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Wed Jul 19 16:11:13 2017 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/db5103dc/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index a5fdea1..2b45c9c 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -925,6 +925,7 @@ class QueryRunner(object):
     # Exceeding a mem limit may result in the message "cancelled". See IMPALA-2234
     if "memory limit exceeded" in caught_msg or \
        "repartitioning did not reduce the size of a spilled partition" in caught_msg or \
+       "failed to get minimum memory reservation" in caught_msg or \
        caught_msg == "cancelled":
       report.mem_limit_exceeded = True
       return


[2/3] incubator-impala git commit: IMPALA-4925: Cancel finstance if query has finished

Posted by mj...@apache.org.
IMPALA-4925: Cancel finstance if query has finished

This patch is a partial fix for the issue where an finst would not
detect that it should cancel if the query limit had not been hit. It
changes the UpdateExecStatus() RPC to return a cancelled status to an
finst if the query has finished because it hit a limit.

For certain queries, this allows them to finish much more quickly than
they otherwise would. However, there's still a few-second delay for the
finst to pick up the cancellation signal, because there
UpdateExecStatus() RPC is only called every few seconds.

A complete fix would also call CancelInternal() when returned_all_results_
was set to true. That would be a much larger change. The improvement
here is to bound the delay between query completion and fragment
teardown to a few seconds.

Change-Id: I59f45e64978c9ab9914b5c33e86009960b4a88c4
Reviewed-on: http://gerrit.cloudera.org:8080/5987
Tested-by: Impala Public Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5bb48ed7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5bb48ed7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5bb48ed7

Branch: refs/heads/master
Commit: 5bb48ed71dc8272fdabac45a33b515cdd0d5f12d
Parents: db5103d
Author: Henry Robinson <he...@cloudera.com>
Authored: Mon Feb 13 15:01:44 2017 -0800
Committer: Henry Robinson <he...@cloudera.com>
Committed: Wed Jul 19 17:01:01 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc      |  4 ++++
 tests/query_test/test_lifecycle.py | 27 ++++++++++++++++++++++++++-
 2 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb48ed7/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 2bfe1b5..f904c48 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -977,6 +977,10 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
     return Status::OK();
   }
 
+  // If all results have been returned, return a cancelled status to force the fragment
+  // instance to stop executing.
+  if (!done && returned_all_results_) return Status::CANCELLED;
+
   if (done) {
     lock_guard<mutex> l(lock_);
     DCHECK_GT(num_remaining_backends_, 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb48ed7/tests/query_test/test_lifecycle.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_lifecycle.py b/tests/query_test/test_lifecycle.py
index 2bd4476..7ed8f90 100644
--- a/tests/query_test/test_lifecycle.py
+++ b/tests/query_test/test_lifecycle.py
@@ -16,12 +16,15 @@
 # under the License.
 
 import pytest
+import time
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.impala_cluster import ImpalaCluster
 from tests.verifiers.metric_verifier import MetricVerifier
 
-class TestFragmentLifecycle(ImpalaTestSuite):
+# TODO: Debug actions leak into other tests in the same suite (if not explicitly
+# unset). Ensure they get unset between tests.
+class TestFragmentLifecycleWithDebugActions(ImpalaTestSuite):
   """Using the debug action interface, check that failed queries correctly clean up *all*
   fragments"""
 
@@ -67,3 +70,25 @@ class TestFragmentLifecycle(ImpalaTestSuite):
       #
       # TODO: Fix when we have cancellable RPCs.
       v.wait_for_metric(self.IN_FLIGHT_FRAGMENTS, 0, timeout=125)
+
+class TestFragmentLifecycle(ImpalaTestSuite):
+  def test_finst_cancel_when_query_complete(self):
+    """Regression test for IMPALA-4295: if a query returns all its rows before all its
+    finsts have completed, it should cancel the finsts and complete promptly."""
+    now = time.time()
+
+    # Query designed to produce 1024 (the limit) rows very quickly from the first union
+    # child, but the second one takes a very long time to complete. Without fix for
+    # IMPALA-4295, the whole query waits for the second child to complete.
+
+    # Due to IMPALA-5671, the limit must be a multiple of the row batch size - if it's
+    # reached during production of a row batch, processing moves to the second child, and
+    # the query will take a long time complete.
+    self.client.execute("with l as (select 1 from functional.alltypes), r as"
+      " (select count(*) from tpch_parquet.lineitem a cross join tpch_parquet.lineitem b)"
+      "select * from l union all (select * from r) LIMIT 1024")
+    end = time.time()
+
+    # Query typically completes in < 2s, but if cross join is fully evaluated, will take >
+    # 10 minutes. Pick 2 minutes as a reasonable midpoint to avoid false negatives.
+    assert end - now < 120, "Query took too long to complete: " + duration + "s"


[3/3] incubator-impala git commit: IMPALA-5539: Fix Kudu timestamp with -use_local_tz_for_unix_ts

Posted by mj...@apache.org.
IMPALA-5539: Fix Kudu timestamp with -use_local_tz_for_unix_ts

The -use_local_tz_for_unix_timestamp_conversion flag exists
to specify if TIMESTAMPs should be interpreted as localtime
or UTC when converting to/from Unix time via builtins:
  from_unixtime(bigint unixtime)
  unix_timestamp(string datetime[, ...])
  unix_timestamp(timestamp datetime)

However, the KuduScanner was calling into code that, when
the gflag above was set, interpreted Unix times as local
time.  Unfortunately the write path (KuduTableSink) and some
FE TIMESTAMP code (see KuduUtil.java) did not have this
behavior, i.e. we were handling the gflag inconsistently.

Tests:
* Adds a custom cluster test to run Kudu test cases with
  -use_local_tz_for_unix_timestamp_conversion.
* Adds tests for the new builtin
  unix_micros_to_utc_timestamp() which run in a custom
  cluster test (added test_local_tz_conversion.py) as well
  as in the regular tests (added to test_exprs.py).

Change-Id: I423a810427353be76aa64442044133a9a22cdc9b
Reviewed-on: http://gerrit.cloudera.org:8080/7311
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7a1ff1e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7a1ff1e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7a1ff1e5

Branch: refs/heads/master
Commit: 7a1ff1e5e9302ad9c3bf287d414436cc0d3046cf
Parents: 5bb48ed
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Sat Jun 24 14:42:59 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Jul 19 22:17:13 2017 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scanner.cc                     |  2 +-
 be/src/exprs/timestamp-functions-ir.cc          |  4 +-
 be/src/exprs/timestamp-functions.h              |  4 +-
 be/src/runtime/timestamp-value.cc               |  4 +-
 be/src/runtime/timestamp-value.h                | 12 +++-
 be/src/runtime/timestamp-value.inline.h         |  4 +-
 common/function-registry/impala_functions.py    |  4 +-
 .../org/apache/impala/catalog/KuduColumn.java   |  2 +-
 .../apache/impala/analysis/AnalyzeDDLTest.java  |  2 +-
 .../QueryTest/utc-timestamp-functions.test      | 16 ++++++
 tests/common/custom_cluster_test_suite.py       |  8 +++
 tests/custom_cluster/test_kudu.py               | 10 ++++
 .../custom_cluster/test_local_tz_conversion.py  | 60 ++++++++++++++++++++
 tests/query_test/test_exprs.py                  | 28 +++++++++
 tests/query_test/test_kudu.py                   |  2 +-
 15 files changed, 146 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 1610e12..9c9ce73 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -215,7 +215,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me
       }
       int64_t ts_micros = *reinterpret_cast<int64_t*>(
           kudu_tuple->GetSlot(slot->tuple_offset()));
-      TimestampValue tv = TimestampValue::FromUnixTimeMicros(ts_micros);
+      TimestampValue tv = TimestampValue::UtcFromUnixTimeMicros(ts_micros);
       if (tv.HasDateAndTime()) {
         RawValue::Write(&tv, kudu_tuple, slot, NULL);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/be/src/exprs/timestamp-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/timestamp-functions-ir.cc b/be/src/exprs/timestamp-functions-ir.cc
index 0cbf19e..c209f21 100644
--- a/be/src/exprs/timestamp-functions-ir.cc
+++ b/be/src/exprs/timestamp-functions-ir.cc
@@ -129,10 +129,10 @@ BigIntVal TimestampFunctions::UtcToUnixMicros(FunctionContext* context,
   return (tv.UtcToUnixTimeMicros(&result)) ? BigIntVal(result) : BigIntVal::null();
 }
 
-TimestampVal TimestampFunctions::TimestampFromUnixMicros(FunctionContext* context,
+TimestampVal TimestampFunctions::UnixMicrosToUtcTimestamp(FunctionContext* context,
     const BigIntVal& unix_time_micros) {
   if (unix_time_micros.is_null) return TimestampVal::null();
-  TimestampValue tv = TimestampValue::FromUnixTimeMicros(unix_time_micros.val);
+  TimestampValue tv = TimestampValue::UtcFromUnixTimeMicros(unix_time_micros.val);
   TimestampVal result;
   tv.ToTimestampVal(&result);
   return result;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/be/src/exprs/timestamp-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/timestamp-functions.h b/be/src/exprs/timestamp-functions.h
index 9e447b8..fdf33bd 100644
--- a/be/src/exprs/timestamp-functions.h
+++ b/be/src/exprs/timestamp-functions.h
@@ -113,8 +113,8 @@ class TimestampFunctions {
   static StringVal FromUnix(FunctionContext* context, const TIME& unix_time,
       const StringVal& fmt);
 
-  /// Return a timestamp from a unix time in microseconds.
-  static TimestampVal TimestampFromUnixMicros(FunctionContext* context,
+  /// Return a timestamp in UTC from a unix time in microseconds.
+  static TimestampVal UnixMicrosToUtcTimestamp(FunctionContext* context,
       const BigIntVal& unix_time_micros);
 
   /// Convert a timestamp to or from a particular timezone based time.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/be/src/runtime/timestamp-value.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-value.cc b/be/src/runtime/timestamp-value.cc
index 56dfbda..8d27b46 100644
--- a/be/src/runtime/timestamp-value.cc
+++ b/be/src/runtime/timestamp-value.cc
@@ -115,7 +115,7 @@ ostream& operator<<(ostream& os, const TimestampValue& timestamp_value) {
 /// Return a ptime representation of the given Unix time (seconds since the Unix epoch).
 /// The time zone of the resulting ptime is local time. This is called by
 /// UnixTimeToPtime.
-inline ptime UnixTimeToLocalPtime(time_t unix_time) {
+ptime TimestampValue::UnixTimeToLocalPtime(time_t unix_time) {
   tm temp_tm;
   // TODO: avoid localtime*, which takes a global timezone db lock
   if (UNLIKELY(localtime_r(&unix_time, &temp_tm) == nullptr)) {
@@ -139,7 +139,7 @@ inline ptime UnixTimeToLocalPtime(time_t unix_time) {
 /// use the libc function gmtime_r which supports those dates but takes the global lock
 /// for the timezone db (even though technically it is not needed for the conversion,
 /// again see IMPALA-5357). This is called by UnixTimeToPtime.
-inline ptime UnixTimeToUtcPtime(time_t unix_time) {
+ptime TimestampValue::UnixTimeToUtcPtime(time_t unix_time) {
   // Minimum Unix time that can be converted with from_time_t: 1677-Sep-21 00:12:44
   const int64_t MIN_BOOST_CONVERT_UNIX_TIME = -9223372036;
   // Maximum Unix time that can be converted with from_time_t: 2262-Apr-11 23:47:16

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/be/src/runtime/timestamp-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index c84eb7f..fa97f78 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -102,8 +102,9 @@ class TimestampValue {
     return TimestampValue(temp);
   }
 
-  /// Same as FromUnixTime() above, but the unix time is specified in microseconds.
-  static TimestampValue FromUnixTimeMicros(int64_t unix_time_micros);
+  /// Return the corresponding timestamp in UTC for the Unix time specified in
+  /// microseconds.
+  static TimestampValue UtcFromUnixTimeMicros(int64_t unix_time_micros);
 
   /// Returns a TimestampValue where the integer part of the specified 'unix_time'
   /// specifies the number of seconds (see above), and the fractional part is converted
@@ -280,6 +281,13 @@ class TimestampValue {
   /// FLAGS_use_local_tz_for_unix_timestamp_conversions. If the flag is true, the value
   /// will be in the local time zone. If the flag is false, the value will be in UTC.
   static boost::posix_time::ptime UnixTimeToPtime(time_t unix_time);
+
+  /// Same as the above, but the time zone of the resulting ptime is always in the local
+  /// time zone.
+  static boost::posix_time::ptime UnixTimeToLocalPtime(time_t unix_time);
+
+  /// Same as the above, but the time zone of the resulting ptime is always in UTC.
+  static boost::posix_time::ptime UnixTimeToUtcPtime(time_t unix_time);
 };
 
 /// This function must be called 'hash_value' to be picked up by boost.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/be/src/runtime/timestamp-value.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-value.inline.h b/be/src/runtime/timestamp-value.inline.h
index 1d14e30..eff78d0 100644
--- a/be/src/runtime/timestamp-value.inline.h
+++ b/be/src/runtime/timestamp-value.inline.h
@@ -28,10 +28,10 @@
 
 namespace impala {
 
-inline TimestampValue TimestampValue::FromUnixTimeMicros(int64_t unix_time_micros) {
+inline TimestampValue TimestampValue::UtcFromUnixTimeMicros(int64_t unix_time_micros) {
   int64_t ts_seconds = unix_time_micros / MICROS_PER_SEC;
   int64_t micros_part = unix_time_micros - (ts_seconds * MICROS_PER_SEC);
-  boost::posix_time::ptime temp = UnixTimeToPtime(ts_seconds);
+  boost::posix_time::ptime temp = UnixTimeToUtcPtime(ts_seconds);
   temp += boost::posix_time::microseconds(micros_part);
   return TimestampValue(temp);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/common/function-registry/impala_functions.py
----------------------------------------------------------------------
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index 4201553..1502809 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -210,8 +210,8 @@ visible_functions = [
   [['unix_timestamp'], 'BIGINT', ['STRING', 'STRING'], '_ZN6impala18TimestampFunctions4UnixEPN10impala_udf15FunctionContextERKNS1_9StringValES6_',
           '_ZN6impala18TimestampFunctions22UnixAndFromUnixPrepareEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE',
           '_ZN6impala18TimestampFunctions20UnixAndFromUnixCloseEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE'],
-  [['timestamp_from_unix_micros'], 'TIMESTAMP', ['BIGINT'],
-      '_ZN6impala18TimestampFunctions23TimestampFromUnixMicrosEPN10impala_udf15FunctionContextERKNS1_9BigIntValE'],
+  [['unix_micros_to_utc_timestamp'], 'TIMESTAMP', ['BIGINT'],
+      '_ZN6impala18TimestampFunctions24UnixMicrosToUtcTimestampEPN10impala_udf15FunctionContextERKNS1_9BigIntValE'],
   [['utc_to_unix_micros'], 'BIGINT', ['TIMESTAMP'],
     '_ZN6impala18TimestampFunctions15UtcToUnixMicrosEPN10impala_udf15FunctionContextERKNS1_12TimestampValE'],
   [['from_unixtime'], 'STRING', ['INT'],

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
index 75699ab..2dd3e85 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
@@ -134,7 +134,7 @@ public class KuduColumn extends Column {
   public String getDefaultValueSql() {
     if (!hasDefaultValue()) return null;
     if (!type_.isTimestamp()) return defaultValue_.toSql();
-    return "timestamp_from_unix_micros(" + defaultValue_.getStringValue() + ")";
+    return "unix_micros_to_utc_timestamp(" + defaultValue_.getStringValue() + ")";
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index e3fa2a6..6928ed2 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -2379,7 +2379,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalyzesOk("create table tdefault (id int primary key, ts timestamp default now())" +
         "partition by hash(id) partitions 3 stored as kudu");
     AnalyzesOk("create table tdefault (id int primary key, ts timestamp default " +
-        "timestamp_from_unix_micros(1230768000000000)) partition by hash(id) " +
+        "unix_micros_to_utc_timestamp(1230768000000000)) partition by hash(id) " +
         "partitions 3 stored as kudu");
     AnalyzesOk("create table tdefault (id int primary key, " +
         "ts timestamp not null default '2009-01-01 00:00:00') " +

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test b/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test
new file mode 100644
index 0000000..c2da36a
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test
@@ -0,0 +1,16 @@
+====
+---- QUERY
+# Tests for unix_micros_to_utc_timestamp().
+# TODO: after fixing IMPALA-5664, add test for -17987443200 * 1000000 - 1
+SELECT
+unix_micros_to_utc_timestamp(NULL),
+unix_micros_to_utc_timestamp(0),
+unix_micros_to_utc_timestamp(1),
+unix_micros_to_utc_timestamp(-17987443200 * 1000000),
+unix_micros_to_utc_timestamp(253402300799 * 1000000),
+unix_micros_to_utc_timestamp(253402300799 * 1000000 + 1);
+---- TYPES
+TIMESTAMP,TIMESTAMP,TIMESTAMP,TIMESTAMP,TIMESTAMP,TIMESTAMP
+---- RESULTS
+NULL,1970-01-01 00:00:00,1970-01-01 00:00:00.000001000,1400-01-01 00:00:00,9999-12-31 23:59:59,9999-12-31 23:59:59.000001000
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 377b38e..a48fd0d 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -48,6 +48,14 @@ class CustomClusterTestSuite(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(CustomClusterTestSuite, cls).add_test_dimensions()
+    cls.add_custom_cluster_constraints()
+
+  @classmethod
+  def add_custom_cluster_constraints(cls):
+    # Defines constraints for custom cluster tests, called by add_test_dimensions.
+    # By default, custom cluster tests only run on text/none and with a limited set of
+    # exec options. Subclasses may override this to relax these default constraints.
+    super(CustomClusterTestSuite, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'none')

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/tests/custom_cluster/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py
index 9377863..28d1a82 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -32,6 +32,16 @@ class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite):
     return 'functional-query'
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args=\
+      "--use_local_tz_for_unix_timestamp_conversions=true")
+  def test_local_tz_conversion_ops(self, vector, unique_database):
+    """IMPALA-5539: Test Kudu timestamp reads/writes are correct with the
+       use_local_tz_for_unix_timestamp_conversions flag."""
+    # These tests provide enough coverage of queries with timestamps.
+    self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database)
+    self.run_test_case('QueryTest/kudu_insert', vector, use_db=unique_database)
+
+  @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-kudu_master_hosts=")
   def test_kudu_master_hosts(self, cursor, kudu_client):
     """Check behavior when -kudu_master_hosts is not provided to catalogd."""

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/tests/custom_cluster/test_local_tz_conversion.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_local_tz_conversion.py b/tests/custom_cluster/test_local_tz_conversion.py
new file mode 100644
index 0000000..54ac277
--- /dev/null
+++ b/tests/custom_cluster/test_local_tz_conversion.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_vector import ImpalaTestDimension
+from tests.common.skip import SkipIfBuildType
+from tests.common.test_dimensions import create_exec_option_dimension
+
+class TestLocalTzConversion(CustomClusterTestSuite):
+  """Tests for --use_local_tz_for_unix_timestamp_conversions"""
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestLocalTzConversion, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        cluster_sizes=[0], disable_codegen_options=[False, True], batch_sizes=[0]))
+    # Test with and without expr rewrites to cover regular expr evaluations
+    # as well as constant folding, in particular, timestamp literals.
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('enable_expr_rewrites', *[0,1]))
+
+  @classmethod
+  def add_custom_cluster_constraints(cls):
+    # Do not call the super() implementation because this class needs to relax the
+    # set of constraints.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'text' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--use_local_tz_for_unix_timestamp_conversions=true")
+  def test_utc_timestamp_functions(self, vector):
+    """Tests for UTC timestamp functions, i.e. functions that do not depend on the
+       behavior of the flag --use_local_tz_for_unix_timestamp_conversions. These tests
+       are also executed in test_exprs.py to ensure the same behavior when running
+       without the gflag set."""
+    vector.get_value('exec_option')['enable_expr_rewrites'] = \
+        vector.get_value('enable_expr_rewrites')
+    self.run_test_case('QueryTest/utc-timestamp-functions', vector)
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/tests/query_test/test_exprs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_exprs.py b/tests/query_test/test_exprs.py
index a52b2a9..90fd207 100644
--- a/tests/query_test/test_exprs.py
+++ b/tests/query_test/test_exprs.py
@@ -146,3 +146,31 @@ class TestExprLimits(ImpalaTestSuite):
       assert impala_ret.success, "Failed to execute query %s" % (sql_str)
     except: # consider any exception a failure
       assert False, "Failed to execute query %s" % (sql_str)
+
+class TestUtcTimestampFunctions(ImpalaTestSuite):
+  """Tests for UTC timestamp functions, i.e. functions that do not depend on the behavior
+     of the flag --use_local_tz_for_unix_timestamp_conversions. Tests added here should
+     also be run in the custom cluster test test_local_tz_conversion.py to ensure they
+     have the same behavior when the conversion flag is set to true."""
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestUtcTimestampFunctions, cls).add_test_dimensions()
+    # Test with and without expr rewrites to cover regular expr evaluations
+    # as well as constant folding, in particular, timestamp literals.
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('enable_expr_rewrites', *[0,1]))
+    if cls.exploration_strategy() == 'core':
+      # Test with file format that supports codegen
+      cls.ImpalaTestMatrix.add_constraint(lambda v:\
+          v.get_value('table_format').file_format == 'text' and\
+          v.get_value('table_format').compression_codec == 'none')
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  def test_utc_functions(self, vector):
+    vector.get_value('exec_option')['enable_expr_rewrites'] = \
+        vector.get_value('enable_expr_rewrites')
+    self.run_test_case('QueryTest/utc-timestamp-functions', vector)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7a1ff1e5/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 87cfa24..9715b4e 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -773,7 +773,7 @@ class TestShowCreateTable(KuduTestSuite):
         CREATE TABLE {db}.{{table}} (
           c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
           d TIMESTAMP NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
-          e TIMESTAMP NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT timestamp_from_unix_micros(%s),
+          e TIMESTAMP NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT unix_micros_to_utc_timestamp(%s),
           PRIMARY KEY (c, d)
         )
         PARTITION BY HASH (c) PARTITIONS 3