You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/04/09 15:48:54 UTC

[impala] 02/06: IMPALA-5051: Add INT64 timestamp write support in Parquet

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 39413a18117acde1822d9f084ab30c748ce837bc
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Thu Jan 17 16:19:10 2019 +0100

    IMPALA-5051: Add INT64 timestamp write support in Parquet
    
    Add query option "parquet_timestamp_type" that chooses the
    Parquet type used when writing TIMESTAMP columns. This is an
    experimental feature at the moment, because these types are not
    widely adopted in other Hadoop components yet. For this reason
    the query option is added as "development" level, and the default
    behavior is not changed.
    
    The following options can be used:
    INT96_NANOS (default):
      This is the same as the old behavior, can represent any
      timestamp that can be handled by Impala.
    INT64_MILLIS, INT64_MICROS:
      Can encode the whole [1400..10000) range handled by Impala
      at the cost of reduced precision. Values are rounded towards
      minus infinity during writing.
    INT64_NANOS:
      Can encode a reduced range without losing nanosecond precision:
      [1677-09-21 00:12:43.145224192 .. 2262-04-11 23:47:16.854775807]
      Values outside this range are converted to NULLs without warning.
    
    The change was done completely in the backend and all TIMESTAMP
    columns are written using the type set in the query option.
    An alternative design would have been to implement some parts
    in the fronted by adding TIMESTAMP->BIGINT conversion functions
    to the query plan, which would make it easier to add the possibility
    of per-column setting in the future. I choose the current design
    because it seemed much simpler and there are no clear plans for the
    per-column setting. Most of the code will be still useful if we
    decide to go the other way in the future.
    
    All types are written without conversion to UTC (the way Impala
    always wrote timestamps), and this information is expressed in the
    new Parquet logical types by setting isAdjustedToUTC to false. The
    old logical type (converted_type) is not set, because old readers do
    not read isAdjustedToUTC, and assume that TIMESTAMP_MILLIS and
    TIMESTAMP_MICROS are written in UTC. These readers can still read
    int64 timestamp columns as INT_64.
    
    Testing:
    - added unit tests for new TimestampValue->int64 functions
    - add EE tests for checking values / min-max stats / metadata
      written for int64 Parquet timestamps
    - ran core tests
    
    Change-Id: Ib41ad532ec902ed5a9a1528513726eac1c11441f
    Reviewed-on: http://gerrit.cloudera.org:8080/12247
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   | 111 ++++++++++-
 be/src/exec/parquet/hdfs-parquet-table-writer.h    |   8 +
 be/src/exec/parquet/parquet-common.cc              |  31 ---
 be/src/exec/parquet/parquet-common.h               |   4 -
 be/src/exec/parquet/parquet-metadata-utils.cc      |  75 +++++++-
 be/src/exec/parquet/parquet-metadata-utils.h       |   5 +
 be/src/runtime/timestamp-test.cc                   | 212 ++++++++++++++-------
 be/src/runtime/timestamp-value.h                   |  30 ++-
 be/src/runtime/timestamp-value.inline.h            |  49 ++++-
 be/src/service/query-options.cc                    |   7 +
 be/src/service/query-options.h                     |   6 +-
 be/src/util/debug-util.cc                          |   1 +
 be/src/util/debug-util.h                           |   1 +
 common/thrift/ImpalaInternalService.thrift         |  12 ++
 common/thrift/ImpalaService.thrift                 |   5 +
 .../QueryTest/parquet-int64-timestamps.test        |  87 +++++++++
 tests/query_test/test_insert_parquet.py            |  88 ++++++++-
 tests/util/get_parquet_metadata.py                 |   4 +-
 18 files changed, 615 insertions(+), 121 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 94f2b9e..9051cbd 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -271,6 +271,12 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // Implemented in the subclass.
   virtual bool ProcessValue(void* value, int64_t* bytes_needed) WARN_UNUSED_RESULT = 0;
 
+
+  // Subclasses can override this function to convert values after the expression was
+  // evaluated. Used by int64 timestamp writers to change the TimestampValue returned by
+  // the expression to an int64.
+  virtual void* ConvertValue(void* value) { return value; }
+
   // Encodes out all data for the current page and updates the metadata.
   virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT;
 
@@ -471,9 +477,6 @@ class HdfsParquetTableWriter::ColumnWriter :
   // The number of values added since we last checked the dictionary.
   int num_values_since_dict_size_check_;
 
-  // Size of each encoded value in plain encoding. -1 if the type is variable-length.
-  int64_t plain_encoded_value_size_;
-
   // Temporary string value to hold CHAR(N)
   StringValue temp_;
 
@@ -487,6 +490,9 @@ class HdfsParquetTableWriter::ColumnWriter :
   inline T* CastValue(void* value) {
     return reinterpret_cast<T*>(value);
   }
+ protected:
+  // Size of each encoded value in plain encoding. -1 if the type is variable-length.
+  int64_t plain_encoded_value_size_;
 };
 
 template<>
@@ -554,9 +560,81 @@ class HdfsParquetTableWriter::BoolColumnWriter :
 
 }
 
+
+/// Base class for int64 timestamp writers. 'eval' is expected to return a pointer to a
+/// TimestampValue. The result of TimestampValue->int64 conversion is stored in 'result_'.
+class HdfsParquetTableWriter::Int64TimestampColumnWriterBase :
+    public HdfsParquetTableWriter::ColumnWriter<int64_t> {
+public:
+  Int64TimestampColumnWriterBase(HdfsParquetTableWriter* parent,
+      ScalarExprEvaluator* eval, const THdfsCompression::type& codec)
+    : HdfsParquetTableWriter::ColumnWriter<int64_t>(parent, eval, codec) {
+    int64_t dummy;
+    plain_encoded_value_size_ = ParquetPlainEncoder::ByteSize(dummy);
+  }
+
+protected:
+  virtual void* ConvertValue(void* value) override {
+    const TimestampValue* ts = reinterpret_cast<TimestampValue*>(value);
+    return ConvertTimestamp(*ts, &result_) ? &result_ : nullptr;
+  }
+
+  /// Overrides of this function are expected to set 'result' if the conversion is
+  /// successful and return true. If the timestamp is invalid or cannot
+  /// be represented as int64 then false should be returned.
+  virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) = 0;
+
+private:
+  int64_t result_;
+};
+
+
+/// Converts TimestampValues to INT64_MILLIS.
+class HdfsParquetTableWriter::Int64MilliTimestampColumnWriter :
+    public HdfsParquetTableWriter::Int64TimestampColumnWriterBase {
+public:
+  Int64MilliTimestampColumnWriter(HdfsParquetTableWriter* parent,
+      ScalarExprEvaluator* eval, const THdfsCompression::type& codec)
+    : HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec) {}
+
+protected:
+  virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) {
+    return ts.FloorUtcToUnixTimeMillis(result);
+  }
+};
+
+/// Converts TimestampValues to INT64_MICROS.
+class HdfsParquetTableWriter::Int64MicroTimestampColumnWriter :
+    public HdfsParquetTableWriter::Int64TimestampColumnWriterBase {
+public:
+  Int64MicroTimestampColumnWriter(HdfsParquetTableWriter* parent,
+      ScalarExprEvaluator* eval, const THdfsCompression::type& codec)
+    : HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec) {}
+
+protected:
+  virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) {
+    return ts.FloorUtcToUnixTimeMicros(result);
+  }
+};
+
+/// Converts TimestampValues to INT64_NANOS. Conversion is expected to fail for
+/// timestamps outside [1677-09-21 00:12:43.145224192 .. 2262-04-11 23:47:16.854775807].
+class HdfsParquetTableWriter::Int64NanoTimestampColumnWriter :
+    public HdfsParquetTableWriter::Int64TimestampColumnWriterBase {
+public:
+  Int64NanoTimestampColumnWriter(HdfsParquetTableWriter* parent,
+      ScalarExprEvaluator* eval, const THdfsCompression::type& codec)
+    : HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec) {}
+
+protected:
+  virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) {
+    return ts.UtcToUnixTimeLimitedRangeNanos(result);
+  }
+};
+
 inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) {
   ++num_values_;
-  void* value = expr_eval_->GetValue(row);
+  void* value = ConvertValue(expr_eval_->GetValue(row));
   if (current_page_ == nullptr) NewPage();
 
   // Ensure that we have enough space for the definition level, but don't write it yet in
@@ -925,8 +1003,26 @@ Status HdfsParquetTableWriter::Init() {
         writer = new ColumnWriter<double>(this, output_expr_evals_[i], codec);
         break;
       case TYPE_TIMESTAMP:
-        writer = new ColumnWriter<TimestampValue>(
-            this, output_expr_evals_[i], codec);
+        switch (state_->query_options().parquet_timestamp_type) {
+          case TParquetTimestampType::INT96_NANOS:
+            writer =
+                new ColumnWriter<TimestampValue>(this, output_expr_evals_[i], codec);
+            break;
+          case TParquetTimestampType::INT64_MILLIS:
+            writer =
+                new Int64MilliTimestampColumnWriter(this, output_expr_evals_[i], codec);
+            break;
+          case TParquetTimestampType::INT64_MICROS:
+            writer =
+                new Int64MicroTimestampColumnWriter(this, output_expr_evals_[i], codec);
+            break;
+          case TParquetTimestampType::INT64_NANOS:
+            writer =
+                new Int64NanoTimestampColumnWriter(this, output_expr_evals_[i], codec);
+            break;
+          default:
+            DCHECK(false);
+        }
         break;
       case TYPE_VARCHAR:
       case TYPE_STRING:
@@ -990,7 +1086,8 @@ Status HdfsParquetTableWriter::AddRowGroup() {
   current_row_group_->columns.resize(columns_.size());
   for (int i = 0; i < columns_.size(); ++i) {
     parquet::ColumnMetaData metadata;
-    metadata.type = ConvertInternalToParquetType(columns_[i]->type().type);
+    metadata.type = ParquetMetadataUtils::ConvertInternalToParquetType(
+        columns_[i]->type().type, state_->query_options());
     metadata.path_in_schema.push_back(
         table_desc_->col_descs()[i + num_clustering_cols].name());
     metadata.codec = columns_[i]->GetParquetCodec();
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h b/be/src/exec/parquet/hdfs-parquet-table-writer.h
index dd0bd7f..a065b9a 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.h
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h
@@ -120,6 +120,14 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   template<typename T> friend class ColumnWriter;
   class BoolColumnWriter;
   friend class BoolColumnWriter;
+  class Int64TimestampColumnWriterBase;
+  friend class Int64TimestampColumnWriterBase;
+  class Int64MicroTimestampColumnWriter;
+  friend class Int64MicroTimestampColumnWriter;
+  class Int64MilliTimestampColumnWriter;
+  friend class Int64MilliTimestampColumnWriter;
+  class Int64NanoTimestampColumnWriter;
+  friend class Int64NanoTimestampColumnWriter;
 
   /// Minimum allowable block size in bytes. This is a function of the number of columns
   /// in the target file.
diff --git a/be/src/exec/parquet/parquet-common.cc b/be/src/exec/parquet/parquet-common.cc
index 5984146..769e189 100644
--- a/be/src/exec/parquet/parquet-common.cc
+++ b/be/src/exec/parquet/parquet-common.cc
@@ -19,31 +19,6 @@
 
 namespace impala {
 
-/// Mapping of impala's internal types to parquet storage types. This is indexed by
-/// PrimitiveType enum
-const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = {
-  parquet::Type::BOOLEAN,     // Invalid
-  parquet::Type::BOOLEAN,     // NULL type
-  parquet::Type::BOOLEAN,
-  parquet::Type::INT32,
-  parquet::Type::INT32,
-  parquet::Type::INT32,
-  parquet::Type::INT64,
-  parquet::Type::FLOAT,
-  parquet::Type::DOUBLE,
-  parquet::Type::INT96,       // Timestamp
-  parquet::Type::BYTE_ARRAY,  // String
-  parquet::Type::BYTE_ARRAY,  // Date, NYI
-  parquet::Type::BYTE_ARRAY,  // DateTime, NYI
-  parquet::Type::BYTE_ARRAY,  // Binary NYI
-  parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal
-  parquet::Type::BYTE_ARRAY,  // VARCHAR(N)
-  parquet::Type::BYTE_ARRAY,  // CHAR(N)
-};
-
-const int INTERNAL_TO_PARQUET_TYPES_SIZE =
-  sizeof(INTERNAL_TO_PARQUET_TYPES) / sizeof(INTERNAL_TO_PARQUET_TYPES[0]);
-
 /// Mapping of Parquet codec enums to Impala enums
 const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = {
   THdfsCompression::NONE,
@@ -69,12 +44,6 @@ const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
 const int IMPALA_TO_PARQUET_CODEC_SIZE =
     sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]);
 
-parquet::Type::type ConvertInternalToParquetType(PrimitiveType type) {
-  DCHECK_GE(type, 0);
-  DCHECK_LT(type, INTERNAL_TO_PARQUET_TYPES_SIZE);
-  return INTERNAL_TO_PARQUET_TYPES[type];
-}
-
 THdfsCompression::type ConvertParquetToImpalaCodec(
     parquet::CompressionCodec::type codec) {
   DCHECK_GE(codec, 0);
diff --git a/be/src/exec/parquet/parquet-common.h b/be/src/exec/parquet/parquet-common.h
index b137296..d225814 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -35,10 +35,6 @@ namespace impala {
 const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
 const uint32_t PARQUET_CURRENT_VERSION = 1;
 
-/// Return the Parquet type corresponding to Impala's internal type. The caller must
-/// validate that the type is valid, otherwise this will DCHECK.
-parquet::Type::type ConvertInternalToParquetType(PrimitiveType type);
-
 /// Return the Impala compression type for the given Parquet codec. The caller must
 /// validate that the codec is a supported one, otherwise this will DCHECK.
 THdfsCompression::type ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec);
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc
index 301e308..e83ef3c 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -137,6 +137,65 @@ void SetDecimalConvertedAndLogicalType(
   col_schema->__set_logicalType(logical_type);
 }
 
+/// For int64 timestamps, sets logical_type in 'col_schema' to TIMESTAMP and fills its
+/// parameters.
+/// converted_type is not set because Impala always writes timestamps without UTC
+/// normalization, and older readers that do not use logical types would incorrectly
+/// interpret TIMESTAMP_MILLIS/MICROS as UTC normalized.
+/// Leaves logical type empty for int96 timestamps.
+void SetTimestampLogicalType(const TQueryOptions& query_options,
+    parquet::SchemaElement* col_schema) {
+  if(query_options.parquet_timestamp_type == TParquetTimestampType::INT96_NANOS) return;
+
+  parquet::TimeUnit time_unit;
+  switch (query_options.parquet_timestamp_type) {
+    case TParquetTimestampType::INT64_MILLIS:
+      time_unit.__set_MILLIS(parquet::MilliSeconds());
+      break;
+    case TParquetTimestampType::INT64_MICROS:
+      time_unit.__set_MICROS(parquet::MicroSeconds());
+      break;
+    case TParquetTimestampType::INT64_NANOS:
+      time_unit.__set_NANOS(parquet::NanoSeconds());
+      break;
+    default:
+      DCHECK(false);
+  }
+
+  parquet::TimestampType timestamp_type;
+  timestamp_type.__set_unit(time_unit);
+  timestamp_type.__set_isAdjustedToUTC(false);
+
+  parquet::LogicalType logical_type;
+  logical_type.__set_TIMESTAMP(timestamp_type);
+  col_schema->__set_logicalType(logical_type);
+}
+
+/// Mapping of impala's internal types to parquet storage types. This is indexed by
+/// PrimitiveType enum
+const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = {
+  parquet::Type::BOOLEAN,     // Invalid
+  parquet::Type::BOOLEAN,     // NULL type
+  parquet::Type::BOOLEAN,
+  parquet::Type::INT32,
+  parquet::Type::INT32,
+  parquet::Type::INT32,
+  parquet::Type::INT64,
+  parquet::Type::FLOAT,
+  parquet::Type::DOUBLE,
+  parquet::Type::INT96,       // Timestamp
+  parquet::Type::BYTE_ARRAY,  // String
+  parquet::Type::BYTE_ARRAY,  // Date, NYI
+  parquet::Type::BYTE_ARRAY,  // DateTime, NYI
+  parquet::Type::BYTE_ARRAY,  // Binary NYI
+  parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal
+  parquet::Type::BYTE_ARRAY,  // VARCHAR(N)
+  parquet::Type::BYTE_ARRAY,  // CHAR(N)
+};
+
+const int INTERNAL_TO_PARQUET_TYPES_SIZE =
+  sizeof(INTERNAL_TO_PARQUET_TYPES) / sizeof(INTERNAL_TO_PARQUET_TYPES[0]);
+
 } // anonymous namespace
 
 // Needs to be in sync with the order of enum values declared in TParquetArrayResolution.
@@ -316,9 +375,20 @@ Status ParquetMetadataUtils::ValidateColumn(const char* filename,
   return Status::OK();
 }
 
+parquet::Type::type ParquetMetadataUtils::ConvertInternalToParquetType(
+    PrimitiveType type, const TQueryOptions& query_options) {
+  DCHECK_GE(type, 0);
+  DCHECK_LT(type, INTERNAL_TO_PARQUET_TYPES_SIZE);
+  if (type == TYPE_TIMESTAMP &&
+      query_options.parquet_timestamp_type != TParquetTimestampType::INT96_NANOS) {
+    return parquet::Type::INT64;
+  }
+  return INTERNAL_TO_PARQUET_TYPES[type];
+}
+
 void ParquetMetadataUtils::FillSchemaElement(const ColumnType& col_type,
     const TQueryOptions& query_options, parquet::SchemaElement* col_schema) {
-  col_schema->__set_type(ConvertInternalToParquetType(col_type.type));
+  col_schema->__set_type(ConvertInternalToParquetType(col_type.type, query_options));
   col_schema->__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL);
 
   switch (col_type.type) {
@@ -353,11 +423,12 @@ void ParquetMetadataUtils::FillSchemaElement(const ColumnType& col_type,
       SetIntLogicalType(64, col_schema);
       break;
     case TYPE_TIMESTAMP:
+      SetTimestampLogicalType(query_options, col_schema);
+      break;
     case TYPE_BOOLEAN:
     case TYPE_FLOAT:
     case TYPE_DOUBLE:
       // boolean/float/double/INT96 encoded timestamp have no logical or converted types.
-      // INT64 encoded timestamp will have logical and converted types (IMPALA-5051).
       break;
     default:
       DCHECK(false);
diff --git a/be/src/exec/parquet/parquet-metadata-utils.h b/be/src/exec/parquet/parquet-metadata-utils.h
index e9e95f2..4639abf 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.h
+++ b/be/src/exec/parquet/parquet-metadata-utils.h
@@ -57,6 +57,11 @@ class ParquetMetadataUtils {
       const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
       RuntimeState* state);
 
+  /// Returns the Parquet type corresponding to Impala's internal type. The caller must
+  /// validate that the input type is valid, otherwise this will DCHECK.
+  static parquet::Type::type ConvertInternalToParquetType(PrimitiveType type,
+      const TQueryOptions& query_options);
+
   /// Sets type related fields in a SchemaElement based on the column's internal type
   /// and query options.
   static void FillSchemaElement(const ColumnType& col_type,
diff --git a/be/src/runtime/timestamp-test.cc b/be/src/runtime/timestamp-test.cc
index 95aa521..80c44e8 100644
--- a/be/src/runtime/timestamp-test.cc
+++ b/be/src/runtime/timestamp-test.cc
@@ -301,6 +301,36 @@ void TestFromSubSecondFunctions(int64_t seconds, int64_t millis, const char* exp
   }
 }
 
+// Convenience functions for TimestampValue->Unix time conversion that assume that
+// the conversion is successful.
+int64_t FloorToSeconds(const TimestampValue& ts) {
+  EXPECT_TRUE(ts.HasDateAndTime());
+  int64_t result = 0;
+  EXPECT_TRUE(ts.UtcToUnixTime(&result));
+  return result;
+}
+
+int64_t RoundToMicros(const TimestampValue& ts) {
+  EXPECT_TRUE(ts.HasDateAndTime());
+  int64_t result = 0;
+  EXPECT_TRUE(ts.UtcToUnixTimeMicros(&result));
+  return result;
+}
+
+int64_t FloorToMicros(const TimestampValue& ts) {
+  EXPECT_TRUE(ts.HasDateAndTime());
+  int64_t result = 0;
+  EXPECT_TRUE(ts.FloorUtcToUnixTimeMicros(&result));
+  return result;
+}
+
+int64_t FloorToMillis(const TimestampValue& ts) {
+  EXPECT_TRUE(ts.HasDateAndTime());
+  int64_t result = 0;
+  EXPECT_TRUE(ts.FloorUtcToUnixTimeMillis(&result));
+  return result;
+}
+
 TEST(TimestampTest, Basic) {
   // Fix current time to determine the behavior parsing 2-digit year format
   // Set it to 03/01 to test 02/29 edge cases.
@@ -702,31 +732,47 @@ TEST(TimestampTest, Basic) {
         string(test_case.str, strlen(test_case.str))) << "TC: " << i;
   }
 
-  // Test edge cases
+  // Test rounding near edge cases.
   const int64_t MIN_DATE_AS_UNIX_TIME = -17987443200;
-  TimestampValue min_date = TimestampValue::Parse("1400-01-01");
-  EXPECT_TRUE(min_date.HasDate());
-  EXPECT_TRUE(min_date.HasTime());
-  time_t tm_min;
-  EXPECT_TRUE(min_date.ToUnixTime(utc_tz, &tm_min));
-  EXPECT_EQ(MIN_DATE_AS_UNIX_TIME, tm_min);
-  int64_t tm_min_micros;
-  EXPECT_TRUE(min_date.UtcToUnixTimeMicros(&tm_min_micros));
-  EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MICROS_PER_SEC, tm_min_micros);
-
-  // Add 250ns and check the value is rounded down
-  min_date = TimestampValue::FromUnixTimeNanos(MIN_DATE_AS_UNIX_TIME, 250, utc_tz);
-  EXPECT_TRUE(min_date.ToUnixTime(utc_tz, &tm_min));
-  EXPECT_EQ(MIN_DATE_AS_UNIX_TIME, tm_min);
-  EXPECT_TRUE(min_date.UtcToUnixTimeMicros(&tm_min_micros));
-  EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MICROS_PER_SEC, tm_min_micros);
-
-  // Add another 250ns and check the value is rounded up to the nearest microsecond.
-  EXPECT_TRUE(min_date.ToUnixTime(utc_tz, &tm_min));
-  EXPECT_EQ(MIN_DATE_AS_UNIX_TIME, tm_min);
-  min_date.set_time(min_date.time() + boost::posix_time::nanoseconds(250));
-  EXPECT_TRUE(min_date.UtcToUnixTimeMicros(&tm_min_micros));
-  EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MICROS_PER_SEC + 1, tm_min_micros);
+  {
+    // Check lowest valid timestamp.
+    const TimestampValue ts = TimestampValue::Parse("1400-01-01");
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME, FloorToSeconds(ts));
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MICROS_PER_SEC, RoundToMicros(ts));
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MICROS_PER_SEC, FloorToMicros(ts));
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MILLIS_PER_SEC, FloorToMillis(ts));
+  }
+
+  {
+    // Check that 250 nanoseconds is rounded/floored to last microsecond.
+    const TimestampValue ts = TimestampValue::Parse("1400-01-01 00:00:00.000000250");
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME, FloorToSeconds(ts));
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MICROS_PER_SEC, RoundToMicros(ts));
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MICROS_PER_SEC, FloorToMicros(ts));
+  }
+
+  {
+    // Check that 500 nanosecond is rounded up to the next microsecond, while floored to
+    // the last microsecond.
+    const TimestampValue ts = TimestampValue::Parse("1400-01-01 00:00:00.000000500");
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME, FloorToSeconds(ts));
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MICROS_PER_SEC + 1, RoundToMicros(ts));
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MICROS_PER_SEC, FloorToMicros(ts));
+  }
+
+  {
+    // Check that 250 microseconds is floored to last millisecond.
+    const TimestampValue ts = TimestampValue::Parse("1400-01-01 00:00:00.000250");
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME, FloorToSeconds(ts));
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MILLIS_PER_SEC, FloorToMillis(ts));
+  }
+
+  {
+    // Check that 500 microseconds is floored to last millisecond.
+    const TimestampValue ts = TimestampValue::Parse("1400-01-01 00:00:00.000500");
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME, FloorToSeconds(ts));
+    EXPECT_EQ(MIN_DATE_AS_UNIX_TIME * MILLIS_PER_SEC, FloorToMillis(ts));
+  }
 
   EXPECT_EQ("1400-01-01 00:00:00",
       TimestampValue::FromUnixTime(MIN_DATE_AS_UNIX_TIME, utc_tz).ToString());
@@ -734,7 +780,10 @@ TEST(TimestampTest, Basic) {
       utc_tz);
   EXPECT_FALSE(too_early.HasDate());
   EXPECT_FALSE(too_early.HasTime());
-  EXPECT_FALSE(too_early.UtcToUnixTimeMicros(&tm_min_micros));
+  int64_t dummy;
+  EXPECT_FALSE(too_early.UtcToUnixTimeMicros(&dummy));
+  EXPECT_FALSE(too_early.FloorUtcToUnixTimeMicros(&dummy));
+  EXPECT_FALSE(too_early.FloorUtcToUnixTimeMillis(&dummy));
 
   // Sub-second FromUnixTime functions incorrectly accepted the last second of 1399
   // as valid, because validation logic checked the nearest second rounded towards 0
@@ -743,38 +792,53 @@ TEST(TimestampTest, Basic) {
   TestFromSubSecondFunctions(MIN_DATE_AS_UNIX_TIME, 100,
       "1400-01-01 00:00:00.100000000");
 
-  // Test the max supported date that can be represented in seconds.
   const int64_t MAX_DATE_AS_UNIX_TIME = 253402300799;
-  TimestampValue max_date =
-      TimestampValue(date(9999, Dec, 31), time_duration(23, 59, 59));
-  EXPECT_TRUE(max_date.HasDate());
-  EXPECT_TRUE(max_date.HasTime());
-  time_t tm_max;
-  EXPECT_TRUE(max_date.ToUnixTime(utc_tz, &tm_max));
-  EXPECT_EQ(MAX_DATE_AS_UNIX_TIME, tm_max);
-  int64_t tm_max_micros;
-  EXPECT_TRUE(max_date.UtcToUnixTimeMicros(&tm_max_micros));
-  EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC, tm_max_micros);
-
-  // Add 250 nanoseconds and test the result of UtcToUnixTimeMicros
-  max_date.set_time(max_date.time() + boost::posix_time::nanoseconds(250));
-  EXPECT_TRUE(max_date.UtcToUnixTimeMicros(&tm_max_micros));
-  EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC, tm_max_micros);
-  // Adding another 250ns will result in the timestamp being rounded up.
-  max_date.set_time(max_date.time() + boost::posix_time::nanoseconds(250));
-  EXPECT_TRUE(max_date.UtcToUnixTimeMicros(&tm_max_micros));
-  EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC + 1, tm_max_micros);
-
-  // The max date that can be represented with the maximum number of nanoseconds. Unlike
-  // the cases above, converting to microseconds does not round up to the next
-  // microsecond because that time is not supported by Impala.
-  max_date = TimestampValue::FromUnixTimeNanos(MAX_DATE_AS_UNIX_TIME, 999999999, utc_tz);
-  EXPECT_TRUE(max_date.HasDate());
-  EXPECT_TRUE(max_date.HasTime());
-  // The result is the maximum date with the maximum number of microseconds supported by
-  // Impala.
-  EXPECT_TRUE(max_date.UtcToUnixTimeMicros(&tm_max_micros));
-  EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC + 999999, tm_max_micros);
+  {
+    // Test the max supported date that can be represented in seconds.
+    const TimestampValue ts = TimestampValue::Parse("9999-12-31 23:59:59");
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME, FloorToSeconds(ts));
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC, RoundToMicros(ts));
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC, FloorToMicros(ts));
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MILLIS_PER_SEC, FloorToMillis(ts));
+  }
+
+  {
+    // Check that 250 nanoseconds is rounded/floored to last microsecond.
+    const TimestampValue ts = TimestampValue::Parse("9999-12-31 23:59:59.000000250");
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC, RoundToMicros(ts));
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC, FloorToMicros(ts));
+  }
+
+  {
+    // Check that 500 nanosecond is rounded to the next microsecond, while floored to the
+    // last microsecond.
+    const TimestampValue ts = TimestampValue::Parse("9999-12-31 23:59:59.000000500");
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC + 1, RoundToMicros(ts));
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC, FloorToMicros(ts));
+  }
+
+  {
+    // Check that 250 microseconds is floored to last millisecond.
+    const TimestampValue ts = TimestampValue::Parse("9999-12-31 23:59:59.000250");
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME, FloorToSeconds(ts));
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MILLIS_PER_SEC, FloorToMillis(ts));
+  }
+
+  {
+    // Check that 500 microseconds is floored to last millisecond.
+    const TimestampValue ts = TimestampValue::Parse("9999-12-31 23:59:59.000500");
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MILLIS_PER_SEC, FloorToMillis(ts));
+  }
+
+  {
+    // The max date that can be represented with the maximum number of nanoseconds. Unlike
+    // the cases above, rounding to microsecond does not round up to the
+    // next microsecond because that time is not supported by Impala.
+    const TimestampValue ts = TimestampValue::Parse("9999-12-31 23:59:59.999999999");
+    // The result is the maximum date with the maximum number of microseconds supported by
+    // Impala.
+    EXPECT_EQ(MAX_DATE_AS_UNIX_TIME * MICROS_PER_SEC + 999999, RoundToMicros(ts));
+  }
 
   EXPECT_EQ("9999-12-31 23:59:59",
       TimestampValue::FromUnixTime(MAX_DATE_AS_UNIX_TIME, utc_tz).ToString());
@@ -810,18 +874,36 @@ TEST(TimestampTest, Basic) {
   EXPECT_EQ("2018-01-10 15:30:00",
       TimestampValue::FromUnixTimeNanos(1515600000, -1800000000000, utc_tz).ToString());
 
-  // Test FromUnixTime around the boundary of the values that can be representad with
+  // Test FromUnixTime around the boundary of the values that can be represented with
   // int64 in nanosecond precision. Tests 1 second before and after these bounds.
   const int64_t MIN_BOOST_CONVERT_UNIX_TIME = -9223372036;
   const int64_t MAX_BOOST_CONVERT_UNIX_TIME = 9223372036;
-  EXPECT_EQ("1677-09-21 00:12:43",
-      TimestampValue::FromUnixTime(MIN_BOOST_CONVERT_UNIX_TIME - 1, utc_tz).ToString());
-  EXPECT_EQ("1677-09-21 00:12:44",
-      TimestampValue::FromUnixTime(MIN_BOOST_CONVERT_UNIX_TIME, utc_tz).ToString());
-  EXPECT_EQ("2262-04-11 23:47:16",
-      TimestampValue::FromUnixTime(MAX_BOOST_CONVERT_UNIX_TIME, utc_tz).ToString());
-  EXPECT_EQ("2262-04-11 23:47:17",
-      TimestampValue::FromUnixTime(MAX_BOOST_CONVERT_UNIX_TIME + 1, utc_tz).ToString());
+  const TimestampValue MIN_INT64_NANO_MINUS_1_SEC =
+      TimestampValue::FromUnixTime(MIN_BOOST_CONVERT_UNIX_TIME - 1, utc_tz);
+  const TimestampValue MIN_INT64_NANO =
+      TimestampValue::FromUnixTime(MIN_BOOST_CONVERT_UNIX_TIME, utc_tz);
+  const TimestampValue MAX_INT64_NANO =
+      TimestampValue::FromUnixTime(MAX_BOOST_CONVERT_UNIX_TIME, utc_tz);
+  const TimestampValue MAX_INT64_NANO_PLUS_1_SEC =
+      TimestampValue::FromUnixTime(MAX_BOOST_CONVERT_UNIX_TIME + 1, utc_tz);
+
+  EXPECT_EQ("1677-09-21 00:12:43", MIN_INT64_NANO_MINUS_1_SEC.ToString());
+  EXPECT_EQ("1677-09-21 00:12:44", MIN_INT64_NANO.ToString());
+  EXPECT_EQ("2262-04-11 23:47:16", MAX_INT64_NANO.ToString());
+  EXPECT_EQ("2262-04-11 23:47:17", MAX_INT64_NANO_PLUS_1_SEC.ToString());
+
+  // Test UtcToUnixTimeLimitedRangeNanos() near the edge values.
+  int64 int64_nano_value = 0;
+
+  EXPECT_TRUE(MIN_INT64_NANO.UtcToUnixTimeLimitedRangeNanos(&int64_nano_value));
+  EXPECT_EQ(MIN_BOOST_CONVERT_UNIX_TIME * NANOS_PER_SEC, int64_nano_value);
+  EXPECT_TRUE(MAX_INT64_NANO.UtcToUnixTimeLimitedRangeNanos(&int64_nano_value));
+  EXPECT_EQ(MAX_BOOST_CONVERT_UNIX_TIME * NANOS_PER_SEC, int64_nano_value);
+
+  EXPECT_FALSE(
+      MIN_INT64_NANO_MINUS_1_SEC.UtcToUnixTimeLimitedRangeNanos(&int64_nano_value));
+  EXPECT_FALSE(
+      MAX_INT64_NANO_PLUS_1_SEC.UtcToUnixTimeLimitedRangeNanos(&int64_nano_value));
 
   // Test the exact bounderies of nanoseconds stored as int64.
   EXPECT_EQ("1677-09-21 00:12:43.145224192",
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 7c6aa2a..1a69fb9 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -193,7 +193,7 @@ class TimestampValue {
 
   /// Verifies that the time is not negative and is less than a whole day.
   static inline bool IsValidTime(const boost::posix_time::time_duration& time) {
-    static const int64_t NANOS_PER_DAY = 1'000'000'000LL*60*60*24;
+    static const int64_t NANOS_PER_DAY = 1'000'000'000LL * SECONDS_PER_DAY;
     return !time.is_negative()
         && time.total_nanoseconds() < NANOS_PER_DAY;
   }
@@ -217,8 +217,30 @@ class TimestampValue {
   /// Nanoseconds are rounded to the nearest microsecond supported by Impala.
   /// Returns false if the conversion failed ('unix_time_micros' will be undefined),
   /// otherwise true.
+  /// TODO: Rounding towards nearest microsecond should be replaced with rounding
+  ///       towards minus infinity. For more details, see IMPALA-8180
   bool UtcToUnixTimeMicros(int64_t* unix_time_micros) const;
 
+  /// Interpret 'this' as a timestamp in UTC and convert to unix time in microseconds.
+  /// Nanoseconds are rounded towards minus infinity.
+  /// Returns false if the conversion failed ('unix_time_micros' will be undefined),
+  /// otherwise true.
+  bool FloorUtcToUnixTimeMicros(int64_t* unix_time_micros) const;
+
+  /// Interpret 'this' as a timestamp in UTC and convert to unix time in milliseconds.
+  /// Nanoseconds are rounded towards minus infinity.
+  /// Returns false if the conversion failed ('unix_time_millis' will be undefined),
+  /// otherwise true.
+  bool FloorUtcToUnixTimeMillis(int64_t* unix_time_millis) const;
+
+  /// Interpret 'this' as a timestamp in UTC and convert to unix time in nanoseconds.
+  /// The full [1400 .. 10000) range cannot be represented with an int64, so the
+  /// conversion will fail outside the supported range:
+  ///  [1677-09-21 00:12:43.145224192 .. 2262-04-11 23:47:16.854775807]
+  /// Returns false if the conversion failed ('unix_time_millis' will be undefined),
+  /// otherwise true.
+  bool UtcToUnixTimeLimitedRangeNanos(int64_t* unix_time_nanos) const;
+
   /// Converts to Unix time (seconds since the Unix epoch) representation. The time zone
   /// interpretation of the TimestampValue instance is determined by
   /// FLAGS_use_local_tz_for_unix_timestamp_conversions. If the flag is true, the instance
@@ -316,6 +338,9 @@ class TimestampValue {
   /// Used when converting a time with fractional seconds which are stored as in integer
   /// to a Unix time stored as a double.
   static const double ONE_BILLIONTH;
+
+  static const uint64_t SECONDS_PER_DAY = 24 * 60 * 60;
+
   /// Boost ptime leaves a gap in the structure, so we swap the order to make it
   /// 12 contiguous bytes.  We then must convert to and from the boost ptime data type.
   /// See IMP-87 for more information on why using ptime with the 4 byte gap is
@@ -349,6 +374,9 @@ class TimestampValue {
   /// Converts 'unix_time_ticks'/TICKS_PER_SEC seconds to TimestampValue.
   template <int32_t TICKS_PER_SEC>
   static TimestampValue UtcFromUnixTimeTicks(int64_t unix_time_ticks);
+
+  // Returns the number of days since 1970-01-01. Expects date_ to be valid.
+  int64_t DaysSinceUnixEpoch() const;
 };
 
 /// This function must be called 'hash_value' to be picked up by boost.
diff --git a/be/src/runtime/timestamp-value.inline.h b/be/src/runtime/timestamp-value.inline.h
index 66036ef..f7cba6b 100644
--- a/be/src/runtime/timestamp-value.inline.h
+++ b/be/src/runtime/timestamp-value.inline.h
@@ -26,6 +26,7 @@
 #include <chrono>
 
 #include "exprs/timezone_db.h"
+#include "kudu/util/int128.h"
 #include "gutil/walltime.h"
 
 namespace impala {
@@ -33,7 +34,7 @@ namespace impala {
 template <int32_t TICKS_PER_SEC>
 inline TimestampValue TimestampValue::UtcFromUnixTimeTicks(int64_t unix_time_ticks) {
   static const boost::gregorian::date EPOCH(1970,1,1);
-  int64_t days = SplitTime<(uint64_t)TICKS_PER_SEC*24*60*60>(&unix_time_ticks);
+  int64_t days = SplitTime<(uint64_t)TICKS_PER_SEC*SECONDS_PER_DAY>(&unix_time_ticks);
 
   return TimestampValue(EPOCH + boost::gregorian::date_duration(days),
       boost::posix_time::nanoseconds(unix_time_ticks*(NANOS_PER_SEC/TICKS_PER_SEC)));
@@ -81,6 +82,11 @@ inline TimestampValue TimestampValue::FromUnixTimeNanos(time_t unix_time, int64_
   return result;
 }
 
+inline int64_t TimestampValue::DaysSinceUnixEpoch() const {
+  DCHECK(HasDate());
+  static const boost::gregorian::date epoch(1970,1,1);
+  return (date_ - epoch).days();
+}
 
 /// Interpret 'this' as a timestamp in UTC and convert to unix time.
 /// Returns false if the conversion failed ('unix_time' will be undefined), otherwise
@@ -89,8 +95,7 @@ inline bool TimestampValue::UtcToUnixTime(time_t* unix_time) const {
   DCHECK(unix_time != nullptr);
   if (UNLIKELY(!HasDateAndTime())) return false;
 
-  static const boost::gregorian::date epoch(1970,1,1);
-  *unix_time = (date_ - epoch).days() * 24 * 60 * 60 + time_.total_seconds();
+  *unix_time = DaysSinceUnixEpoch() * SECONDS_PER_DAY + time_.total_seconds();
   return true;
 }
 
@@ -119,6 +124,44 @@ inline bool TimestampValue::UtcToUnixTimeMicros(int64_t* unix_time_micros) const
   return true;
 }
 
+inline bool TimestampValue::FloorUtcToUnixTimeMicros(int64_t* unix_time_micros) const {
+  DCHECK(unix_time_micros != nullptr);
+  if (UNLIKELY(!HasDateAndTime())) return false;
+
+  *unix_time_micros = DaysSinceUnixEpoch() * SECONDS_PER_DAY * MICROS_PER_SEC
+      + time_.total_microseconds();
+  return true;
+}
+
+inline bool TimestampValue::FloorUtcToUnixTimeMillis(int64_t* unix_time_millis) const {
+  DCHECK(unix_time_millis != nullptr);
+  if (UNLIKELY(!HasDateAndTime())) return false;
+
+  *unix_time_millis = DaysSinceUnixEpoch() * SECONDS_PER_DAY * MILLIS_PER_SEC
+      + time_.total_milliseconds();
+  return true;
+}
+
+inline bool TimestampValue::UtcToUnixTimeLimitedRangeNanos(
+    int64_t* unix_time_nanos) const {
+  DCHECK(unix_time_nanos != nullptr);
+  time_t unixtime_seconds;
+  if (UNLIKELY(!UtcToUnixTime(&unixtime_seconds))) return false;
+
+  DCHECK(HasTime());
+  // TODO: consider optimizing this (IMPALA-8268)
+  kudu::int128_t nanos128 =
+      static_cast<kudu::int128_t>(unixtime_seconds) * NANOS_PER_SEC
+      + time_.fractional_seconds();
+
+  if (nanos128 <  std::numeric_limits<int64_t>::min()
+      || nanos128 >  std::numeric_limits<int64_t>::max()) {
+    return false;
+  }
+  *unix_time_nanos = static_cast<int64_t>(nanos128);
+  return true;
+}
+
 /// Converts to Unix time (seconds since the Unix epoch) representation. The time
 /// zone interpretation of the TimestampValue instance is determined by
 /// FLAGS_use_local_tz_for_unix_timestamp_conversions. If the flag is true, the
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index b0f782e..e199489 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -725,6 +725,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_default_file_format(enum_type);
         break;
       }
+      case TImpalaQueryOptions::PARQUET_TIMESTAMP_TYPE: {
+        TParquetTimestampType::type enum_type;
+        RETURN_IF_ERROR(GetThriftEnum(value, "Parquet timestamp type",
+            _TParquetTimestampType_VALUES_TO_NAMES, &enum_type));
+        query_options->__set_parquet_timestamp_type(enum_type);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 4ead0e5..bb83931 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::DEFAULT_FILE_FORMAT + 1);\
+      TImpalaQueryOptions::PARQUET_TIMESTAMP_TYPE + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -152,7 +152,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(\
       planner_testcase_mode, PLANNER_TESTCASE_MODE, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(default_file_format, DEFAULT_FILE_FORMAT, TQueryOptionLevel::REGULAR)
+  QUERY_OPT_FN(default_file_format, DEFAULT_FILE_FORMAT, TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(parquet_timestamp_type, PARQUET_TIMESTAMP_TYPE,\
+      TQueryOptionLevel::DEVELOPMENT)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index f2729b4..552c16c 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -85,6 +85,7 @@ PRINT_THRIFT_ENUM_IMPL(TRuntimeFilterMode)
 PRINT_THRIFT_ENUM_IMPL(TSessionType)
 PRINT_THRIFT_ENUM_IMPL(TStmtType)
 PRINT_THRIFT_ENUM_IMPL(TUnit)
+PRINT_THRIFT_ENUM_IMPL(TParquetTimestampType)
 
 string PrintId(const TUniqueId& id, const string& separator) {
   stringstream out;
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 2acbbfc..1ca53b5 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -69,6 +69,7 @@ std::string PrintThriftEnum(const TRuntimeFilterMode::type& value);
 std::string PrintThriftEnum(const TSessionType::type& value);
 std::string PrintThriftEnum(const TStmtType::type& value);
 std::string PrintThriftEnum(const TUnit::type& value);
+std::string PrintThriftEnum(const TParquetTimestampType::type& value);
 
 std::string PrintTuple(const Tuple* t, const TupleDescriptor& d);
 std::string PrintRow(TupleRow* row, const RowDescriptor& d);
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 65de009..18ee0c8 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -67,6 +67,14 @@ enum TKuduReadMode {
   READ_AT_SNAPSHOT = 2
 }
 
+// Physical type and unit used when writing timestamps in Parquet.
+enum TParquetTimestampType {
+  INT96_NANOS,
+  INT64_MILLIS,
+  INT64_MICROS,
+  INT64_NANOS
+}
+
 // Query options that correspond to ImpalaService.ImpalaQueryOptions, with their
 // respective defaults. Query options can be set in the following ways:
 //
@@ -327,6 +335,10 @@ struct TQueryOptions {
   // See comment in ImpalaService.thrift.
   79: optional CatalogObjects.THdfsFileFormat default_file_format =
       CatalogObjects.THdfsFileFormat.TEXT;
+
+  // See comment in ImpalaService.thrift.
+  80: optional TParquetTimestampType parquet_timestamp_type =
+      TParquetTimestampType.INT96_NANOS;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 7c46bd8..b5ffaa3 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -380,6 +380,11 @@ enum TImpalaQueryOptions {
 
   // Specifies the default table file format.
   DEFAULT_FILE_FORMAT = 78
+
+  // The physical type and unit used when writing timestamps in Parquet.
+  // Valid values: INT96_NANOS, INT64_MILLIS, INT64_MICROS, INT64_NANOS
+  // Default: INT96_NANOS
+  PARQUET_TIMESTAMP_TYPE = 79
 }
 
 // The summary of a DML statement.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-int64-timestamps.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-int64-timestamps.test
index e4adf87..ce44e7b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-int64-timestamps.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-int64-timestamps.test
@@ -98,3 +98,90 @@ BIGINT,TIMESTAMP,TIMESTAMP
 2147483647,1970-01-01 01:00:02.147483647,1970-01-01 00:00:02.147483647
 -9223372036854775808,1677-09-21 01:12:43.145224192,1677-09-21 00:12:43.145224192
 9223372036854775807,2262-04-12 01:47:16.854775807,2262-04-11 23:47:16.854775807
+====
+---- QUERY
+create table int96_nanos (ts timestamp) stored as parquet;
+====
+---- QUERY
+# Insert edge values as "normal" int96 timestamps that can represent all values.
+set parquet_timestamp_type=INT96_NANOS;
+insert into int96_nanos values
+("1400-01-01"),
+("1677-09-21 00:12:43.145224191"),
+("1677-09-21 00:12:43.145224192"),
+("2019-01-18 00:00:00.000000001"),
+("2019-01-18 00:00:00.000001"),
+("2019-01-18 00:00:00.001"),
+("2019-01-18 23:59:59.999"),
+("2019-01-18 23:59:59.999999"),
+("2019-01-18 23:59:59.999999999"),
+("2262-04-11 23:47:16.854775807"),
+("2262-04-11 23:47:16.854775808"),
+("9999-12-31 23:59:59.999"),
+("9999-12-31 23:59:59.999999"),
+("9999-12-31 23:59:59.999999999");
+====
+---- QUERY
+# Inserted values are rounded to milliseconds.
+set parquet_timestamp_type=INT64_MILLIS;
+create table int64_millis stored as parquet as select * from int96_nanos;
+select * from int64_millis;
+---- RESULTS
+1400-01-01 00:00:00
+1677-09-21 00:12:43.145000000
+1677-09-21 00:12:43.145000000
+2019-01-18 00:00:00
+2019-01-18 00:00:00
+2019-01-18 00:00:00.001000000
+2019-01-18 23:59:59.999000000
+2019-01-18 23:59:59.999000000
+2019-01-18 23:59:59.999000000
+2262-04-11 23:47:16.854000000
+2262-04-11 23:47:16.854000000
+9999-12-31 23:59:59.999000000
+9999-12-31 23:59:59.999000000
+9999-12-31 23:59:59.999000000
+====
+---- QUERY
+# Inserted values are rounded to microseconds.
+set parquet_timestamp_type=INT64_MICROS;
+create table int64_micros stored as parquet as select * from int96_nanos;
+select * from int64_micros;
+---- RESULTS
+1400-01-01 00:00:00
+1677-09-21 00:12:43.145224000
+1677-09-21 00:12:43.145224000
+2019-01-18 00:00:00
+2019-01-18 00:00:00.000001000
+2019-01-18 00:00:00.001000000
+2019-01-18 23:59:59.999000000
+2019-01-18 23:59:59.999999000
+2019-01-18 23:59:59.999999000
+2262-04-11 23:47:16.854775000
+2262-04-11 23:47:16.854775000
+9999-12-31 23:59:59.999000000
+9999-12-31 23:59:59.999999000
+9999-12-31 23:59:59.999999000
+====
+---- QUERY
+# Values outside the [1677-09-21 00:12:43.145224192 .. 2262-04-11 23:47:16.854775807]
+# range are inserted as NULLs.
+set parquet_timestamp_type=INT64_NANOS;
+create table int64_nanos stored as parquet as select * from int96_nanos;
+select * from int64_nanos;
+---- RESULTS
+NULL
+NULL
+1677-09-21 00:12:43.145224192
+2019-01-18 00:00:00.000000001
+2019-01-18 00:00:00.000001000
+2019-01-18 00:00:00.001000000
+2019-01-18 23:59:59.999000000
+2019-01-18 23:59:59.999999000
+2019-01-18 23:59:59.999999999
+2262-04-11 23:47:16.854775807
+NULL
+NULL
+NULL
+NULL
+====
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index 5790859..00622eb 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -353,9 +353,9 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
             % dst_tbl)
     assert result_src.data == result_dst.data
 
-  def _ctas_and_get_metadata(self, vector, unique_database, tmp_dir, source_table):
+  def _ctas_and_get_metadata(self, vector, unique_database, tmp_dir, source_table,
+                             table_name="test_hdfs_parquet_table_writer"):
     """CTAS 'source_table' into a Parquet table and returns its Parquet metadata."""
-    table_name = "test_hdfs_parquet_table_writer"
     qualified_table_name = "{0}.{1}".format(unique_database, table_name)
     hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
                                                                  table_name))
@@ -461,6 +461,36 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
     self._check_decimal_logical_type(schemas, "c2", 15, 5)
     self._check_decimal_logical_type(schemas, "c3", 1, 1)
 
+  def _check_int64_timestamp_logical_type(self, schemas, column_name, unit):
+    """Checks that the schema with name 'column_name' has logical and converted type that
+       describe a timestamp with the given unit."""
+    schema = self._get_schema(schemas, column_name)
+    assert schema.logicalType is not None
+    self._check_only_one_member_var_is_set(schema.logicalType, "TIMESTAMP")
+    assert schema.logicalType.TIMESTAMP.unit is not None
+    self._check_only_one_member_var_is_set(
+        schema.logicalType.TIMESTAMP.unit, unit.upper())
+    # Non UTC-normalized timestamps have no converted_type to avoid confusing older
+    # readers that would interpret these as UTC-normalized.
+    assert schema.converted_type is None
+    assert not schema.logicalType.TIMESTAMP.isAdjustedToUTC
+
+  def _ctas_and_check_int64_timestamps(self, vector, unique_database, tmpdir, unit):
+    """CTAS a table using 'unit' int64 timestamps and checks columns metadata."""
+    source = "functional.alltypestiny"
+    timestamp_type = 'int64_' + unit
+    vector.get_value('exec_option')['parquet_timestamp_type'] = timestamp_type
+    file_metadata = self._ctas_and_get_metadata(vector, unique_database, tmpdir.strpath,
+                                                source, table_name=timestamp_type)
+    schemas = file_metadata.schema
+    self._check_int64_timestamp_logical_type(schemas, "timestamp_col", unit)
+
+  def test_int64_timestamp_logical_type(self, vector, unique_database, tmpdir):
+    """Tests that correct metadata is written for int64 timestamps."""
+    self._ctas_and_check_int64_timestamps(vector, unique_database, tmpdir, "millis")
+    self._ctas_and_check_int64_timestamps(vector, unique_database, tmpdir, "micros")
+    self._ctas_and_check_int64_timestamps(vector, unique_database, tmpdir, "nanos")
+
 
 @SkipIfIsilon.hive
 @SkipIfLocal.hive
@@ -559,13 +589,13 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
       assert stats == expected
 
   def _ctas_table_and_verify_stats(self, vector, unique_database, tmp_dir, source_table,
-                                   expected_values):
+                                   expected_values,
+                                   table_name="test_hdfs_parquet_table_writer"):
     """Copies 'source_table' into a parquet table and makes sure that the row group
     statistics in the resulting parquet file match those in 'expected_values'. 'tmp_dir'
     needs to be supplied by the caller and will be used to store temporary files. The
     caller is responsible for cleaning up 'tmp_dir'.
     """
-    table_name = "test_hdfs_parquet_table_writer"
     qualified_table_name = "{0}.{1}".format(unique_database, table_name)
     hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
                                                                  table_name))
@@ -781,11 +811,59 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
       "functional_parquet.zipcode_incomes", expected_min_max_values)
 
+  def test_write_int64_timestamp_statistics(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup statistics correctly for
+    int64 milli/micro/nano timestamps."""
+    table_name = "int96_nanos"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+
+    create_table_stmt = "create table {0} (ts timestamp);".format(qualified_table_name)
+    self.execute_query(create_table_stmt)
+
+    insert_stmt = """insert into {0} values
+        ("1969-12-31 23:59:59.999999999"),
+        ("1970-01-01 00:00:00.001001001")""".format(qualified_table_name)
+    self.execute_query(insert_stmt)
+
+    vector.get_value('exec_option')['parquet_timestamp_type'] = "int64_millis"
+    expected_min_max_values = [
+      ColumnStats('ts', -1, 1, 0)
+    ]
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      qualified_table_name,
+                                      expected_min_max_values,
+                                      table_name="int64_millis")
+
+    vector.get_value('exec_option')['parquet_timestamp_type'] = "int64_micros"
+    expected_min_max_values = [
+      ColumnStats('ts', -1, 1001, 0)
+    ]
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      qualified_table_name,
+                                      expected_min_max_values,
+                                      table_name="int64_micros")
+
+    # Insert values that fall outside the valid range for int64_nanos. These should
+    # be inserted as NULLs and not affect min/max stats.
+    insert_stmt = """insert into {0} values
+        ("1677-09-21 00:12:43.145224191"),
+        ("2262-04-11 23:47:16.854775808")""".format(qualified_table_name)
+    self.execute_query(insert_stmt)
+
+    vector.get_value('exec_option')['parquet_timestamp_type'] = "int64_nanos"
+    expected_min_max_values = [
+      ColumnStats('ts', -1, 1001001, 2)
+    ]
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      qualified_table_name,
+                                      expected_min_max_values,
+                                      table_name="int64_nanos")
+
   def test_too_many_columns(self, vector, unique_database):
     """Test that writing a Parquet table with too many columns results in an error."""
     num_cols = 12000
     query = "create table %s.wide stored as parquet as select \n" % unique_database
     query += ", ".join(map(str, xrange(num_cols)))
     query += ";\n"
-    result = self.execute_query_expect_failure(self.client, query);
+    result = self.execute_query_expect_failure(self.client, query)
     assert "Minimum required block size must be less than 2GB" in str(result)
diff --git a/tests/util/get_parquet_metadata.py b/tests/util/get_parquet_metadata.py
index 56281f8..107b339 100644
--- a/tests/util/get_parquet_metadata.py
+++ b/tests/util/get_parquet_metadata.py
@@ -176,8 +176,10 @@ def get_parquet_metadata_from_hdfs_folder(hdfs_path, tmp_dir):
   subdirectories. The hdfs folder is copied into 'tmp_dir' before processing.
   """
   check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
+  # Only walk the new directory to make the same tmp_dir usable for multiple tables.
+  table_dir = os.path.join(tmp_dir, os.path.basename(os.path.normpath(hdfs_path)))
   result = []
-  for root, subdirs, files in os.walk(tmp_dir):
+  for root, subdirs, files in os.walk(table_dir):
     for f in files:
       if not f.endswith('parq'):
         continue