You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/04/26 15:46:50 UTC

[hive] branch branch-3.1 updated: HIVE-21291: Restore historical way of handling timestamps in Avro while keeping the new semantics at the same time (Karen Coppage, reviewed by Jesus Camacho Rodriguez)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new e7ac273  HIVE-21291: Restore historical way of handling timestamps in Avro while keeping the new semantics at the same time (Karen Coppage, reviewed by Jesus Camacho Rodriguez)
e7ac273 is described below

commit e7ac273431bedf0818b066591b793b7a9c530f02
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Fri Apr 26 08:46:32 2019 -0700

    HIVE-21291: Restore historical way of handling timestamps in Avro while keeping the new semantics at the same time (Karen Coppage, reviewed by Jesus Camacho Rodriguez)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   6 +++
 data/files/avro_historical_timestamp_legacy.avro   | Bin 0 -> 216 bytes
 data/files/avro_historical_timestamp_new.avro      | Bin 0 -> 245 bytes
 .../hive/ql/io/avro/AvroContainerOutputFormat.java |   4 ++
 .../hive/ql/io/avro/AvroGenericRecordReader.java   |  31 ++++++++++-
 .../clientpositive/avro_historical_timestamp.q     |  19 +++++++
 .../clientpositive/avro_historical_timestamp.q.out |  59 +++++++++++++++++++++
 .../avro_schema_evolution_native.q.out             |  14 ++---
 .../clientpositive/cbo_ppd_non_deterministic.q.out |  24 ++++-----
 .../hadoop/hive/serde2/avro/AvroDeserializer.java  |  45 +++++++++++++++-
 .../serde2/avro/AvroGenericRecordWritable.java     |  12 +++++
 .../apache/hadoop/hive/serde2/avro/AvroSerDe.java  |   3 +-
 .../hadoop/hive/serde2/avro/AvroSerializer.java    |   5 ++
 .../hive/serde2/avro/TestAvroDeserializer.java     |  52 ++++++++++++++++++
 .../avro/TestAvroObjectInspectorGenerator.java     |  10 ++++
 .../hive/serde2/avro/TestAvroSerializer.java       |   8 +++
 16 files changed, 270 insertions(+), 22 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e7c3c0a..8959ec1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1879,6 +1879,12 @@ public class HiveConf extends Configuration {
     HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION("hive.parquet.timestamp.skip.conversion", false,
       "Current Hive implementation of parquet stores timestamps to UTC, this flag allows skipping of the conversion" +
       "on reading parquet files from other tools"),
+    HIVE_AVRO_TIMESTAMP_SKIP_CONVERSION("hive.avro.timestamp.skip.conversion", false,
+        "Some older Hive implementations (pre-3.1) wrote Avro timestamps in a UTC-normalized" +
+        "manner, while from version 3.1 until now Hive wrote time zone agnostic timestamps. " +
+        "Setting this flag to true will treat legacy timestamps as time zone agnostic. Setting " +
+        "it to false will treat legacy timestamps as UTC-normalized. This flag will not affect " +
+        "timestamps written after this change."),
     HIVE_INT_TIMESTAMP_CONVERSION_IN_SECONDS("hive.int.timestamp.conversion.in.seconds", false,
         "Boolean/tinyint/smallint/int/bigint value is interpreted as milliseconds during the timestamp conversion.\n" +
         "Set this flag to true to interpret the value as seconds to be consistent with float/double." ),
diff --git a/data/files/avro_historical_timestamp_legacy.avro b/data/files/avro_historical_timestamp_legacy.avro
new file mode 100644
index 0000000..97accd4
Binary files /dev/null and b/data/files/avro_historical_timestamp_legacy.avro differ
diff --git a/data/files/avro_historical_timestamp_new.avro b/data/files/avro_historical_timestamp_new.avro
new file mode 100644
index 0000000..fe01e54
Binary files /dev/null and b/data/files/avro_historical_timestamp_new.avro differ
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
index 59d3bba..be7d8b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
@@ -24,12 +24,14 @@ import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
 
 import java.io.IOException;
 import java.util.Properties;
+import java.util.TimeZone;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -76,6 +78,8 @@ public class AvroContainerOutputFormat
       dfw.setCodec(factory);
     }
 
+    // add writer.time.zone property to file metadata
+    dfw.setMeta(AvroSerDe.WRITER_TIME_ZONE, TimeZone.getDefault().toZoneId().toString());
     dfw.create(schema, path.getFileSystem(jobConf).create(path));
     return new AvroGenericRecordWriter(dfw);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java
index 68138c8..3159c5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java
@@ -19,7 +19,10 @@ package org.apache.hadoop.hive.ql.io.avro;
 
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.rmi.server.UID;
+import java.time.DateTimeException;
+import java.time.ZoneId;
 import java.util.Map;
 import java.util.Properties;
 
@@ -29,6 +32,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
@@ -58,6 +62,7 @@ public class AvroGenericRecordReader implements
   final private org.apache.avro.file.FileReader<GenericRecord> reader;
   final private long start;
   final private long stop;
+  private ZoneId writerTimezone;
   protected JobConf jobConf;
   final private boolean isEmptyInput;
   /**
@@ -94,6 +99,8 @@ public class AvroGenericRecordReader implements
     }
     this.stop = split.getStart() + split.getLength();
     this.recordReaderID = new UID();
+
+    this.writerTimezone = extractWriterTimezoneFromMetadata(job, split, gdr);
   }
 
   /**
@@ -144,6 +151,28 @@ public class AvroGenericRecordReader implements
     return null;
   }
 
+  private ZoneId extractWriterTimezoneFromMetadata(JobConf job, FileSplit split,
+      GenericDatumReader<GenericRecord> gdr) throws IOException {
+    if (job == null || gdr == null || split == null || split.getPath() == null) {
+      return null;
+    }
+    try {
+      DataFileReader<GenericRecord> dataFileReader =
+          new DataFileReader<GenericRecord>(new FsInput(split.getPath(), job), gdr);
+      if (dataFileReader.getMeta(AvroSerDe.WRITER_TIME_ZONE) != null) {
+        try {
+          return ZoneId.of(new String(dataFileReader.getMeta(AvroSerDe.WRITER_TIME_ZONE),
+              StandardCharsets.UTF_8));
+        } catch (DateTimeException e) {
+          throw new RuntimeException("Can't parse writer time zone stored in file metadata", e);
+        }
+      }
+    } catch (IOException e) {
+      // Can't access metadata, carry on.
+    }
+    return null;
+  }
+
   private boolean pathIsInPartition(Path split, Path partitionPath) {
     boolean schemeless = split.toUri().getScheme() == null;
     if (schemeless) {
@@ -176,7 +205,7 @@ public class AvroGenericRecordReader implements
 
   @Override
   public AvroGenericRecordWritable createValue() {
-    return new AvroGenericRecordWritable();
+    return new AvroGenericRecordWritable(writerTimezone);
   }
 
   @Override
diff --git a/ql/src/test/queries/clientpositive/avro_historical_timestamp.q b/ql/src/test/queries/clientpositive/avro_historical_timestamp.q
new file mode 100644
index 0000000..6d2e1a7
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/avro_historical_timestamp.q
@@ -0,0 +1,19 @@
+--These files were created by inserting timestamp '2019-01-01 00:30:30.111111111' into column (ts timestamp) where
+--writer time zone is Europe/Rome.
+
+--older writer: time zone dependent behavior. convert to reader time zone
+create table legacy_table (ts timestamp) stored as avro;
+
+load data local inpath '../../data/files/avro_historical_timestamp_legacy.avro' into table legacy_table;
+
+select * from legacy_table;
+--read legacy timestamps as time zone agnostic
+set hive.avro.timestamp.skip.conversion=true;
+select * from legacy_table;
+
+--newer writer: time zone agnostic behavior. convert to writer time zone (US/Pacific)
+create table new_table (ts timestamp) stored as avro;
+
+load data local inpath '../../data/files/avro_historical_timestamp_new.avro' into table new_table;
+
+select * from new_table;
\ No newline at end of file
diff --git a/ql/src/test/results/clientpositive/avro_historical_timestamp.q.out b/ql/src/test/results/clientpositive/avro_historical_timestamp.q.out
new file mode 100644
index 0000000..ca4e84e
--- /dev/null
+++ b/ql/src/test/results/clientpositive/avro_historical_timestamp.q.out
@@ -0,0 +1,59 @@
+PREHOOK: query: create table legacy_table (ts timestamp) stored as avro
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@legacy_table
+POSTHOOK: query: create table legacy_table (ts timestamp) stored as avro
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@legacy_table
+PREHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_legacy.avro' into table legacy_table
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@legacy_table
+POSTHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_legacy.avro' into table legacy_table
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@legacy_table
+PREHOOK: query: select * from legacy_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@legacy_table
+#### A masked pattern was here ####
+POSTHOOK: query: select * from legacy_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@legacy_table
+#### A masked pattern was here ####
+2018-12-31 15:30:30.111
+PREHOOK: query: select * from legacy_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@legacy_table
+#### A masked pattern was here ####
+POSTHOOK: query: select * from legacy_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@legacy_table
+#### A masked pattern was here ####
+2018-12-31 23:30:30.111
+PREHOOK: query: create table new_table (ts timestamp) stored as avro
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@new_table
+POSTHOOK: query: create table new_table (ts timestamp) stored as avro
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@new_table
+PREHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_new.avro' into table new_table
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@new_table
+POSTHOOK: query: load data local inpath '../../data/files/avro_historical_timestamp_new.avro' into table new_table
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@new_table
+PREHOOK: query: select * from new_table
+PREHOOK: type: QUERY
+PREHOOK: Input: default@new_table
+#### A masked pattern was here ####
+POSTHOOK: query: select * from new_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@new_table
+#### A masked pattern was here ####
+2019-01-01 00:30:30.111
diff --git a/ql/src/test/results/clientpositive/avro_schema_evolution_native.q.out b/ql/src/test/results/clientpositive/avro_schema_evolution_native.q.out
index 726f387..c865734 100644
--- a/ql/src/test/results/clientpositive/avro_schema_evolution_native.q.out
+++ b/ql/src/test/results/clientpositive/avro_schema_evolution_native.q.out
@@ -107,7 +107,7 @@ Table Parameters:
 	numPartitions       	7                   
 	numRows             	8                   
 	rawDataSize         	0                   
-	totalSize           	3098                
+	totalSize           	3294                
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 
@@ -219,7 +219,7 @@ Table Parameters:
 	numPartitions       	7                   
 	numRows             	8                   
 	rawDataSize         	0                   
-	totalSize           	3098                
+	totalSize           	3294                
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 
@@ -258,11 +258,11 @@ STAGE PLANS:
       Processor Tree:
         TableScan
           alias: episodes_partitioned_n0
-          Statistics: Num rows: 3 Data size: 8950 Basic stats: COMPLETE Column stats: NONE
+          Statistics: Num rows: 3 Data size: 9510 Basic stats: COMPLETE Column stats: NONE
           Select Operator
             expressions: title (type: string), air_date (type: string), doctor (type: int), value (type: int), doctor_pt (type: int)
             outputColumnNames: _col0, _col1, _col2, _col3, _col4
-            Statistics: Num rows: 3 Data size: 8950 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 3 Data size: 9510 Basic stats: COMPLETE Column stats: NONE
             ListSink
 
 PREHOOK: query: SELECT * FROM episodes_partitioned_n0 WHERE doctor_pt > 6
@@ -355,14 +355,14 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: episodes_partitioned_n0
-            Statistics: Num rows: 3 Data size: 8950 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 3 Data size: 9510 Basic stats: COMPLETE Column stats: NONE
             Select Operator
               expressions: title (type: string), air_date (type: string), doctor (type: int), value (type: int), doctor_pt (type: int)
               outputColumnNames: _col0, _col1, _col2, _col3, _col4
-              Statistics: Num rows: 3 Data size: 8950 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 3 Data size: 9510 Basic stats: COMPLETE Column stats: NONE
               File Output Operator
                 compressed: false
-                Statistics: Num rows: 3 Data size: 8950 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 3 Data size: 9510 Basic stats: COMPLETE Column stats: NONE
                 table:
                     input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                     output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
diff --git a/ql/src/test/results/clientpositive/cbo_ppd_non_deterministic.q.out b/ql/src/test/results/clientpositive/cbo_ppd_non_deterministic.q.out
index f2a9dad..8fdca86 100644
--- a/ql/src/test/results/clientpositive/cbo_ppd_non_deterministic.q.out
+++ b/ql/src/test/results/clientpositive/cbo_ppd_non_deterministic.q.out
@@ -120,24 +120,24 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: testa
-            Statistics: Num rows: 2 Data size: 4580 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 2 Data size: 4860 Basic stats: COMPLETE Column stats: NONE
             Select Operator
               expressions: rand() (type: double)
               outputColumnNames: _col0
-              Statistics: Num rows: 2 Data size: 4580 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 2 Data size: 4860 Basic stats: COMPLETE Column stats: NONE
               Filter Operator
                 predicate: ((_col0 <= 0.5D) and (_col0 > 0.25D)) (type: boolean)
-                Statistics: Num rows: 1 Data size: 2290 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 1 Data size: 2430 Basic stats: COMPLETE Column stats: NONE
                 Select Operator
                   expressions: 'CA' (type: string), _col0 (type: double)
                   outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 1 Data size: 2290 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 1 Data size: 2430 Basic stats: COMPLETE Column stats: NONE
                   Limit
                     Number of rows: 20
-                    Statistics: Num rows: 1 Data size: 2290 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 1 Data size: 2430 Basic stats: COMPLETE Column stats: NONE
                     File Output Operator
                       compressed: false
-                      Statistics: Num rows: 1 Data size: 2290 Basic stats: COMPLETE Column stats: NONE
+                      Statistics: Num rows: 1 Data size: 2430 Basic stats: COMPLETE Column stats: NONE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                           output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -175,24 +175,24 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: testa
-            Statistics: Num rows: 2 Data size: 4580 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 2 Data size: 4860 Basic stats: COMPLETE Column stats: NONE
             Select Operator
               expressions: rand() (type: double)
               outputColumnNames: _col0
-              Statistics: Num rows: 2 Data size: 4580 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 2 Data size: 4860 Basic stats: COMPLETE Column stats: NONE
               Filter Operator
                 predicate: ((_col0 <= 0.5D) and (_col0 > 0.25D)) (type: boolean)
-                Statistics: Num rows: 1 Data size: 2290 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 1 Data size: 2430 Basic stats: COMPLETE Column stats: NONE
                 Select Operator
                   expressions: 'CA' (type: string), _col0 (type: double)
                   outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 1 Data size: 2290 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 1 Data size: 2430 Basic stats: COMPLETE Column stats: NONE
                   Limit
                     Number of rows: 20
-                    Statistics: Num rows: 1 Data size: 2290 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 1 Data size: 2430 Basic stats: COMPLETE Column stats: NONE
                     File Output Operator
                       compressed: false
-                      Statistics: Num rows: 1 Data size: 2290 Basic stats: COMPLETE Column stats: NONE
+                      Statistics: Num rows: 1 Data size: 2430 Basic stats: COMPLETE Column stats: NONE
                       table:
                           input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                           output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
index 8cdc567..db8db1c 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
@@ -22,11 +22,14 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.rmi.server.UID;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
@@ -40,8 +43,11 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.UnresolvedUnionException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.Date;
 import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.common.type.TimestampTZUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.type.HiveChar;
@@ -76,6 +82,20 @@ class AvroDeserializer {
    * record encoding.
    */
   private boolean warnedOnce = false;
+
+  /**
+   * Time zone in which file was written, which may be stored in metadata.
+   */
+  private ZoneId writerTimezone = null;
+
+  private Configuration configuration = null;
+
+  AvroDeserializer() {}
+
+  AvroDeserializer(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
   /**
    * When encountering a record with an older schema than the one we're trying
    * to read, it is necessary to re-encode with a reader against the newer schema.
@@ -148,6 +168,7 @@ class AvroDeserializer {
     AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable;
     GenericRecord r = recordWritable.getRecord();
     Schema fileSchema = recordWritable.getFileSchema();
+    writerTimezone = recordWritable.getWriterTimezone();
 
    UID recordReaderId = recordWritable.getRecordReaderID();
    //If the record reader (from which the record is originated) is already seen and valid,
@@ -301,7 +322,29 @@ class AvroDeserializer {
         throw new AvroSerdeException(
           "Unexpected Avro schema for Date TypeInfo: " + recordSchema.getType());
       }
-      return Timestamp.ofEpochMilli((Long)datum);
+      // If a time zone is found in file metadata (property name: writer.time.zone), convert the
+      // timestamp to that (writer) time zone in order to emulate time zone agnostic behavior.
+      // If not, then the file was written by an older version of hive, so we convert the timestamp
+      // to the server's (reader) time zone for backwards compatibility reasons - unless the
+      // session level configuration hive.avro.timestamp.skip.conversion is set to true, in which
+      // case we assume it was written by a time zone agnostic writer, so we don't convert it.
+      boolean skipConversion;
+      if (configuration != null) {
+        skipConversion = HiveConf.getBoolVar(
+            configuration, HiveConf.ConfVars.HIVE_AVRO_TIMESTAMP_SKIP_CONVERSION);
+      } else {
+        skipConversion = HiveConf.ConfVars.HIVE_AVRO_TIMESTAMP_SKIP_CONVERSION.defaultBoolVal;
+      }
+      ZoneId convertToTimeZone;
+      if (writerTimezone != null) {
+        convertToTimeZone = writerTimezone;
+      } else if (skipConversion) {
+        convertToTimeZone = ZoneOffset.UTC;
+      } else {
+        convertToTimeZone = TimeZone.getDefault().toZoneId();
+      }
+      Timestamp timestamp = Timestamp.ofEpochMilli((Long)datum);
+      return TimestampTZUtil.convertTimestampToZone(timestamp, ZoneOffset.UTC, convertToTimeZone);
     default:
       return datum;
     }
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java
index 2f0ba10..095197c 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java
@@ -24,6 +24,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.rmi.server.UID;
+import java.time.ZoneId;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
@@ -48,6 +49,9 @@ public class AvroGenericRecordWritable implements Writable{
   // Schema that exists in the Avro data file.
   private Schema fileSchema;
 
+  // Time zone file was written in, from metadata
+  private ZoneId writerTimezone = null;
+
   /**
    * Unique Id determine which record reader created this record
    */
@@ -74,6 +78,10 @@ public class AvroGenericRecordWritable implements Writable{
     this.record = record;
   }
 
+  public AvroGenericRecordWritable(ZoneId writerTimezone) {
+    this.writerTimezone = writerTimezone;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     // Write schema since we need it to pull the data out. (see point #1 above)
@@ -141,4 +149,8 @@ public class AvroGenericRecordWritable implements Writable{
   public void setFileSchema(Schema originalSchema) {
     this.fileSchema = originalSchema;
   }
+
+  public ZoneId getWriterTimezone() {
+    return writerTimezone;
+  }
 }
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
index 3955611..653f591 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
@@ -59,6 +59,7 @@ public class AvroSerDe extends AbstractSerDe {
   public static final String VARCHAR_TYPE_NAME = "varchar";
   public static final String DATE_TYPE_NAME = "date";
   public static final String TIMESTAMP_TYPE_NAME = "timestamp-millis";
+  public static final String WRITER_TIME_ZONE = "writer.time.zone";
   public static final String AVRO_PROP_LOGICAL_TYPE = "logicalType";
   public static final String AVRO_PROP_PRECISION = "precision";
   public static final String AVRO_PROP_SCALE = "scale";
@@ -139,7 +140,7 @@ public class AvroSerDe extends AbstractSerDe {
 
     if(!badSchema) {
       this.avroSerializer = new AvroSerializer();
-      this.avroDeserializer = new AvroDeserializer();
+      this.avroDeserializer = new AvroDeserializer(configuration);
     }
   }
 
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java
index 99a0b9a..4331c11 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hive.serde2.avro;
 
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
@@ -34,6 +36,7 @@ import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.common.type.TimestampTZUtil;
 import org.apache.hadoop.hive.serde2.io.DateWritableV2;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
@@ -211,6 +214,8 @@ class AvroSerializer {
     case TIMESTAMP:
       Timestamp timestamp =
         ((TimestampObjectInspector) fieldOI).getPrimitiveJavaObject(structFieldData);
+      timestamp = TimestampTZUtil.convertTimestampToZone(
+          timestamp, TimeZone.getDefault().toZoneId(), ZoneOffset.UTC);
       return timestamp.toEpochMilli();
     case UNKNOWN:
       throw new AvroSerdeException("Received UNKNOWN primitive category.");
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
index ef97d2d..1cd03f7 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.rmi.server.UID;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -32,6 +33,7 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.hive.common.type.Timestamp;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
@@ -41,6 +43,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -270,6 +274,54 @@ public class TestAvroDeserializer {
     assertEquals(0, uoi.getTag(result.unionObject));
   }
 
+  /**
+   * Test whether Avro timestamps can be deserialized according to new behavior (storage in UTC but
+   * LocalDateTime semantics as timestamps are converted back to the writer time zone) as well as
+   * old behavior (Instant semantics).
+   */
+  @Test
+  public void canDeserializeTimestamps() throws SerDeException, IOException {
+    List<String> columnNames = new ArrayList<>();
+    columnNames.add("timestampField");
+    List<TypeInfo> columnTypes = new ArrayList<>();
+    columnTypes.add(TypeInfoFactory.getPrimitiveTypeInfo("timestamp"));
+    Schema readerSchema =
+        AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.TIMESTAMP_SCHEMA);
+
+    // 2019-01-02 00:00:00 GMT is 1546387200000 milliseconds after epoch
+    GenericData.Record record = new GenericData.Record(readerSchema);
+    record.put("timestampField", 1546387200999L);
+    assertTrue(GENERIC_DATA.validate(readerSchema, record));
+
+    AvroGenericRecordWritable agrw = new AvroGenericRecordWritable(ZoneId.of("America/New_York"));
+    agrw.setRecord(record);
+    agrw.setFileSchema(readerSchema);
+    agrw.setRecordReaderID(new UID());
+
+    AvroDeserializer deserializer = new AvroDeserializer();
+    ArrayList<Object> row =
+        (ArrayList<Object>) deserializer.deserialize(columnNames, columnTypes, agrw, readerSchema);
+    Timestamp resultTimestamp = (Timestamp) row.get(0);
+
+    // 2019-01-02 00:00:00 GMT is 2019-01-01 19:00:00 GMT-0500 (America/New_York / EST)
+    assertEquals(Timestamp.valueOf("2019-01-01 19:00:00.999"), resultTimestamp);
+
+    // Do the same without specifying writer time zone. This tests deserialization of older records
+    // which should be interpreted in Instant semantics
+    AvroGenericRecordWritable agrw2 = new AvroGenericRecordWritable();
+    agrw2.setRecord(record);
+    agrw2.setFileSchema(readerSchema);
+    agrw2.setRecordReaderID(new UID());
+
+    row =
+        (ArrayList<Object>) deserializer.deserialize(columnNames, columnTypes, agrw2, readerSchema);
+    resultTimestamp = (Timestamp) row.get(0);
+
+    // 2019-01-02 00:00:00 GMT is 2019-01-01 16:00:00 in zone GMT-0800 (PST)
+    // This is the time zone for VM in test.
+    assertEquals(Timestamp.valueOf("2019-01-01 16:00:00.999"), resultTimestamp);
+  }
+
   @Test
   public void canDeserializeUnions() throws SerDeException, IOException {
     Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.UNION_SCHEMA);
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
index ee83ba3..b500c6e 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
@@ -222,6 +222,16 @@ public class TestAvroObjectInspectorGenerator {
       "  ]\n" +
       "}";
 
+  public static final String TIMESTAMP_SCHEMA = "{\n" +
+      "  \"type\": \"record\", \n" +
+      "  \"name\": \"timestampTest\",\n" +
+      "  \"fields\" : [\n" +
+      "    {\"name\":\"timestampField\", " +
+      "     \"type\":\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\", " +
+      "         \"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"}" +
+      "  ]\n" +
+      "}";
+
   public static final String KITCHEN_SINK_SCHEMA = "{\n" +
       "  \"namespace\": \"org.apache.hadoop.hive\",\n" +
       "  \"name\": \"kitchsink\",\n" +
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java
index 93eafc1..bcd0fd1 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java
@@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericEnumSymbol;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveDecimalV1;
+import org.apache.hadoop.hive.common.type.Timestamp;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -121,6 +122,13 @@ public class TestAvroSerializer {
   }
 
   @Test
+  public void canSerializeTimestamps() throws SerDeException, IOException {
+    singleFieldTest("timestamp1", Timestamp.valueOf("2011-01-01 00:00:00").toEpochMilli(),
+        "\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\"," +
+        "\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"");
+  }
+
+  @Test
   public void canSerializeDecimals() throws SerDeException, IOException {
     ByteBuffer bb = ByteBuffer.wrap(HiveDecimal.create("3.1416").bigIntegerBytes());
     singleFieldTest("dec1", bb.rewind(),