You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/01/16 01:48:23 UTC

[iceberg] branch master updated: Hive: Fix Deserializer to use source deserializer (#2078)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7292c51  Hive: Fix Deserializer to use source deserializer (#2078)
7292c51 is described below

commit 7292c513ee3fa3fa654dc22df8d7cf6e7005b119
Author: pvary <pv...@cloudera.com>
AuthorDate: Sat Jan 16 02:48:15 2021 +0100

    Hive: Fix Deserializer to use source deserializer (#2078)
---
 .../IcebergTimestampObjectInspectorHive3.java      |   2 +-
 ...ebergTimestampWithZoneObjectInspectorHive3.java |   2 +-
 .../TestIcebergTimestampObjectInspectorHive3.java  |   2 +-
 ...ebergTimestampWithZoneObjectInspectorHive3.java |   2 +-
 .../org/apache/iceberg/mr/hive/Deserializer.java   | 176 +++++++++++----------
 .../IcebergBinaryObjectInspector.java              |   4 +-
 .../IcebergDateObjectInspector.java                |   2 +-
 .../IcebergDecimalObjectInspector.java             |   2 +-
 .../IcebergTimestampObjectInspector.java           |   2 +-
 .../IcebergTimestampWithZoneObjectInspector.java   |   3 +-
 .../objectinspector/WriteObjectInspector.java      |   5 +
 .../iceberg/mr/hive/HiveIcebergTestUtils.java      |  55 +++----
 .../apache/iceberg/mr/hive/TestDeserializer.java   |  32 ++--
 .../TestIcebergTimestampObjectInspector.java       |   2 +-
 ...estIcebergTimestampWithZoneObjectInspector.java |   4 +-
 15 files changed, 150 insertions(+), 145 deletions(-)

diff --git a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java
index 652cf13..be33870 100644
--- a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java
+++ b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java
@@ -45,7 +45,7 @@ public class IcebergTimestampObjectInspectorHive3 extends AbstractPrimitiveJavaO
     if (o == null) {
       return null;
     }
-    Timestamp timestamp = ((TimestampWritableV2) o).getTimestamp();
+    Timestamp timestamp = (Timestamp) o;
     return LocalDateTime.ofEpochSecond(timestamp.toEpochSecond(), timestamp.getNanos(), ZoneOffset.UTC);
   }
 
diff --git a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspectorHive3.java b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspectorHive3.java
index 417864d..199ea36 100644
--- a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspectorHive3.java
+++ b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspectorHive3.java
@@ -47,7 +47,7 @@ public class IcebergTimestampWithZoneObjectInspectorHive3 extends AbstractPrimit
     if (o == null) {
       return null;
     }
-    ZonedDateTime zdt = ((TimestampLocalTZWritable) o).getTimestampTZ().getZonedDateTime();
+    ZonedDateTime zdt = ((TimestampTZ) o).getZonedDateTime();
     return OffsetDateTime.of(zdt.toLocalDateTime(), zdt.getOffset());
   }
 
diff --git a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java
index e22f71f..89ee0cd 100644
--- a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java
+++ b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java
@@ -65,7 +65,7 @@ public class TestIcebergTimestampObjectInspectorHive3 {
 
     Assert.assertFalse(oi.preferWritable());
 
-    Assert.assertEquals(local, oi.convert(new TimestampWritableV2(ts)));
+    Assert.assertEquals(local, oi.convert(ts));
   }
 
 }
diff --git a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspectorHive3.java b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspectorHive3.java
index 9aacdb9..bc49245 100644
--- a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspectorHive3.java
+++ b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspectorHive3.java
@@ -70,7 +70,7 @@ public class TestIcebergTimestampWithZoneObjectInspectorHive3 {
 
     Assert.assertFalse(oi.preferWritable());
 
-    Assert.assertEquals(OffsetDateTime.of(dateTimeAtUTC, ZoneOffset.UTC), oi.convert(new TimestampLocalTZWritable(ts)));
+    Assert.assertEquals(OffsetDateTime.of(dateTimeAtUTC, ZoneOffset.UTC), oi.convert(ts));
   }
 
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
index b59db22..458affd 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
@@ -23,14 +23,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
@@ -52,20 +50,26 @@ class Deserializer {
    */
   static class Builder {
     private Schema schema;
-    private ObjectInspector inspector;
+    private StructObjectInspector writerInspector;
+    private StructObjectInspector sourceInspector;
 
     Builder schema(Schema mainSchema) {
       this.schema = mainSchema;
       return this;
     }
 
-    Builder inspector(ObjectInspector mainInspector) {
-      this.inspector = mainInspector;
+    Builder writerInspector(StructObjectInspector inspector) {
+      this.writerInspector = inspector;
+      return this;
+    }
+
+    Builder sourceInspector(StructObjectInspector inspector) {
+      this.sourceInspector = inspector;
       return this;
     }
 
     Deserializer build() {
-      return new Deserializer(schema, inspector);
+      return new Deserializer(schema, new ObjectInspectorPair(writerInspector, sourceInspector));
     }
   }
 
@@ -78,62 +82,55 @@ class Deserializer {
     return (Record) fieldDeserializer.value(data);
   }
 
-  private Deserializer(Schema schema, ObjectInspector fieldInspector) {
-    this.fieldDeserializer = DeserializerVisitor.visit(schema, fieldInspector);
+  private Deserializer(Schema schema, ObjectInspectorPair pair) {
+    this.fieldDeserializer = DeserializerVisitor.visit(schema, pair);
   }
 
-  private static class DeserializerVisitor extends SchemaWithPartnerVisitor<ObjectInspector, FieldDeserializer> {
+  private static class DeserializerVisitor extends SchemaWithPartnerVisitor<ObjectInspectorPair, FieldDeserializer> {
 
-    public static FieldDeserializer visit(Schema schema, ObjectInspector objectInspector) {
-      return visit(schema, new FixNameMappingObjectInspector(schema, objectInspector), new DeserializerVisitor(),
+    public static FieldDeserializer visit(Schema schema, ObjectInspectorPair pair) {
+      return visit(schema, new FixNameMappingObjectInspectorPair(schema, pair), new DeserializerVisitor(),
           new PartnerObjectInspectorByNameAccessors());
     }
 
     @Override
-    public FieldDeserializer schema(Schema schema, ObjectInspector inspector, FieldDeserializer deserializer) {
+    public FieldDeserializer schema(Schema schema, ObjectInspectorPair pair, FieldDeserializer deserializer) {
       return deserializer;
     }
 
     @Override
-    public FieldDeserializer field(NestedField field, ObjectInspector inspector, FieldDeserializer deserializer) {
+    public FieldDeserializer field(NestedField field, ObjectInspectorPair pair, FieldDeserializer deserializer) {
       return deserializer;
     }
 
     @Override
-    public FieldDeserializer primitive(PrimitiveType type, ObjectInspector inspector) {
-      switch (type.typeId()) {
-        case BOOLEAN:
-        case INTEGER:
-        case LONG:
-        case FLOAT:
-        case DOUBLE:
-        case STRING:
-          // Generic conversions where Iceberg and Hive are using the same java object
-          return o -> ((PrimitiveObjectInspector) inspector).getPrimitiveJavaObject(o);
-        case UUID:
-          // TODO: This will not work with Parquet. Parquet UUID expect byte[], others are expecting UUID
-          return o -> UUID.fromString(((StringObjectInspector) inspector).getPrimitiveJavaObject(o));
-        case DATE:
-        case TIMESTAMP:
-        case FIXED:
-        case BINARY:
-        case DECIMAL:
-          // Iceberg specific conversions
-          return o -> ((WriteObjectInspector) inspector).convert(o);
-        case TIME:
-        default:
-          throw new IllegalArgumentException("Unsupported column type: " + type);
-      }
+    public FieldDeserializer primitive(PrimitiveType type, ObjectInspectorPair pair) {
+      return o -> {
+        if (o == null) {
+          return null;
+        }
+
+        ObjectInspector writerFieldInspector = pair.writerInspector();
+        ObjectInspector sourceFieldInspector = pair.sourceInspector();
+
+        Object result = ((PrimitiveObjectInspector) sourceFieldInspector).getPrimitiveJavaObject(o);
+        if (writerFieldInspector instanceof WriteObjectInspector) {
+          // If we have a conversion method defined for the ObjectInspector then convert
+          result = ((WriteObjectInspector) writerFieldInspector).convert(result);
+        }
+
+        return result;
+      };
     }
 
     @Override
-    public FieldDeserializer struct(StructType type, ObjectInspector inspector, List<FieldDeserializer> deserializers) {
+    public FieldDeserializer struct(StructType type, ObjectInspectorPair pair, List<FieldDeserializer> deserializers) {
       return o -> {
         if (o == null) {
           return null;
         }
 
-        List<Object> data = ((StructObjectInspector) inspector).getStructFieldsDataAsList(o);
+        List<Object> data = ((StructObjectInspector) pair.sourceInspector()).getStructFieldsDataAsList(o);
         Record result = GenericRecord.create(type);
 
         for (int i = 0; i < deserializers.size(); i++) {
@@ -150,14 +147,14 @@ class Deserializer {
     }
 
     @Override
-    public FieldDeserializer list(ListType listTypeInfo, ObjectInspector inspector, FieldDeserializer deserializer) {
+    public FieldDeserializer list(ListType listTypeInfo, ObjectInspectorPair pair, FieldDeserializer deserializer) {
       return o -> {
         if (o == null) {
           return null;
         }
 
         List<Object> result = new ArrayList<>();
-        ListObjectInspector listInspector = (ListObjectInspector) inspector;
+        ListObjectInspector listInspector = (ListObjectInspector) pair.sourceInspector();
 
         for (Object val : listInspector.getList(o)) {
           result.add(deserializer.value(val));
@@ -168,7 +165,7 @@ class Deserializer {
     }
 
     @Override
-    public FieldDeserializer map(MapType mapType, ObjectInspector inspector, FieldDeserializer keyDeserializer,
+    public FieldDeserializer map(MapType mapType, ObjectInspectorPair pair, FieldDeserializer keyDeserializer,
                                  FieldDeserializer valueDeserializer) {
       return o -> {
         if (o == null) {
@@ -176,7 +173,7 @@ class Deserializer {
         }
 
         Map<Object, Object> result = new HashMap<>();
-        MapObjectInspector mapObjectInspector = (MapObjectInspector) inspector;
+        MapObjectInspector mapObjectInspector = (MapObjectInspector) pair.sourceInspector();
 
         for (Map.Entry<?, ?> entry : mapObjectInspector.getMap(o).entrySet()) {
           result.put(keyDeserializer.value(entry.getKey()), valueDeserializer.value(entry.getValue()));
@@ -187,30 +184,35 @@ class Deserializer {
   }
 
   private static class PartnerObjectInspectorByNameAccessors
-      implements SchemaWithPartnerVisitor.PartnerAccessors<ObjectInspector> {
+      implements SchemaWithPartnerVisitor.PartnerAccessors<ObjectInspectorPair> {
 
     @Override
-    public ObjectInspector fieldPartner(ObjectInspector inspector, int fieldId, String name) {
-      StructObjectInspector fieldInspector  = (StructObjectInspector) inspector;
-      return fieldInspector.getStructFieldRef(name).getFieldObjectInspector();
+    public ObjectInspectorPair fieldPartner(ObjectInspectorPair pair, int fieldId, String name) {
+      String sourceName = pair.sourceName(name);
+      return new ObjectInspectorPair(
+          ((StructObjectInspector) pair.writerInspector()).getStructFieldRef(name).getFieldObjectInspector(),
+          ((StructObjectInspector) pair.sourceInspector()).getStructFieldRef(sourceName).getFieldObjectInspector());
     }
 
     @Override
-    public ObjectInspector mapKeyPartner(ObjectInspector inspector) {
-      MapObjectInspector fieldInspector  = (MapObjectInspector) inspector;
-      return fieldInspector.getMapKeyObjectInspector();
+    public ObjectInspectorPair mapKeyPartner(ObjectInspectorPair pair) {
+      return new ObjectInspectorPair(
+          ((MapObjectInspector) pair.writerInspector()).getMapKeyObjectInspector(),
+          ((MapObjectInspector) pair.sourceInspector()).getMapKeyObjectInspector());
     }
 
     @Override
-    public ObjectInspector mapValuePartner(ObjectInspector inspector) {
-      MapObjectInspector fieldInspector  = (MapObjectInspector) inspector;
-      return fieldInspector.getMapValueObjectInspector();
+    public ObjectInspectorPair mapValuePartner(ObjectInspectorPair pair) {
+      return new ObjectInspectorPair(
+          ((MapObjectInspector) pair.writerInspector()).getMapValueObjectInspector(),
+          ((MapObjectInspector) pair.sourceInspector()).getMapValueObjectInspector());
     }
 
     @Override
-    public ObjectInspector listElementPartner(ObjectInspector inspector) {
-      ListObjectInspector fieldInspector  = (ListObjectInspector) inspector;
-      return fieldInspector.getListElementObjectInspector();
+    public ObjectInspectorPair listElementPartner(ObjectInspectorPair pair) {
+      return new ObjectInspectorPair(
+          ((ListObjectInspector) pair.writerInspector()).getListElementObjectInspector(),
+          ((ListObjectInspector) pair.sourceInspector()).getListElementObjectInspector());
     }
   }
 
@@ -221,51 +223,57 @@ class Deserializer {
   /**
    * Hive query results schema column names do not match the target Iceberg column names.
    * Instead we have to rely on the column order. To keep the other parts of the code generic we fix this with a
-   * wrapper around the ObjectInspector. This wrapper uses the Iceberg schema column names instead of the Hive column
-   * names for {@link #getStructFieldRef(String) getStructFieldRef}
+   * wrapper around the ObjectInspectorPair. This wrapper maps the Iceberg schema column names instead of the Hive
+   * column names.
    */
-  private static class FixNameMappingObjectInspector extends StructObjectInspector {
-    private final StructObjectInspector innerInspector;
-    private final Map<String, StructField> nameMap;
+  private static class FixNameMappingObjectInspectorPair extends ObjectInspectorPair {
+    private final Map<String, String> sourceNameMap;
+
+    FixNameMappingObjectInspectorPair(Schema schema, ObjectInspectorPair pair) {
+      super(pair.writerInspector(), pair.sourceInspector());
 
-    private FixNameMappingObjectInspector(Schema schema, ObjectInspector inspector) {
-      this.nameMap = new HashMap<>(schema.columns().size());
-      this.innerInspector = (StructObjectInspector) inspector;
-      List<? extends StructField> fields = innerInspector.getAllStructFieldRefs();
+      this.sourceNameMap = new HashMap<>(schema.columns().size());
 
+      List<? extends StructField> fields = ((StructObjectInspector) sourceInspector()).getAllStructFieldRefs();
       for (int i = 0; i < schema.columns().size(); ++i) {
-        nameMap.put(schema.columns().get(i).name(), fields.get(i));
+        sourceNameMap.put(schema.columns().get(i).name(), fields.get(i).getFieldName());
       }
     }
 
     @Override
-    public List<? extends StructField> getAllStructFieldRefs() {
-      return innerInspector.getAllStructFieldRefs();
+    String sourceName(String originalName) {
+      return sourceNameMap.get(originalName);
     }
+  }
 
-    @Override
-    public StructField getStructFieldRef(String fieldName) {
-      return nameMap.get(fieldName);
-    }
+  /**
+   * To get the data for Iceberg {@link Record}s we have to use both ObjectInspectors.
+   * <p>
+   * We use the Hive ObjectInspectors (sourceInspector) to get the Hive primitive types.
+   * <p>
+   * We use the Iceberg ObjectInspectors (writerInspector) only if conversion is needed for
+   * generating the correct type for Iceberg Records. See: {@link WriteObjectInspector} interface on the provided
+   * writerInspector.
+   */
+  private static class ObjectInspectorPair {
+    private ObjectInspector writerInspector;
+    private ObjectInspector sourceInspector;
 
-    @Override
-    public Object getStructFieldData(Object data, StructField fieldRef) {
-      return innerInspector.getStructFieldData(data, fieldRef);
+    ObjectInspectorPair(ObjectInspector writerInspector, ObjectInspector sourceInspector) {
+      this.writerInspector = writerInspector;
+      this.sourceInspector = sourceInspector;
     }
 
-    @Override
-    public List<Object> getStructFieldsDataAsList(Object data) {
-      return innerInspector.getStructFieldsDataAsList(data);
+    ObjectInspector writerInspector() {
+      return writerInspector;
     }
 
-    @Override
-    public String getTypeName() {
-      return innerInspector.getTypeName();
+    ObjectInspector sourceInspector() {
+      return sourceInspector;
     }
 
-    @Override
-    public Category getCategory() {
-      return innerInspector.getCategory();
+    String sourceName(String originalName) {
+      return originalName;
     }
   }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java
index de6a26f..5fa0181 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java
@@ -38,7 +38,7 @@ public abstract class IcebergBinaryObjectInspector extends AbstractPrimitiveJava
 
     @Override
     public byte[] convert(Object o) {
-      return o == null ? null : ((BytesWritable) o).getBytes();
+      return (byte[]) o;
     }
   };
 
@@ -50,7 +50,7 @@ public abstract class IcebergBinaryObjectInspector extends AbstractPrimitiveJava
 
     @Override
     public ByteBuffer convert(Object o) {
-      return o == null ? null : ByteBuffer.wrap(((BytesWritable) o).getBytes());
+      return o == null ? null : ByteBuffer.wrap((byte[]) o);
     }
   };
 
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java
index 8959780..63af550 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java
@@ -67,6 +67,6 @@ public final class IcebergDateObjectInspector extends AbstractPrimitiveJavaObjec
 
   @Override
   public LocalDate convert(Object o) {
-    return o == null ? null : ((DateWritable) o).get().toLocalDate();
+    return o == null ? null : ((Date) o).toLocalDate();
   }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java
index fa023d2..3a16833 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java
@@ -80,6 +80,6 @@ public final class IcebergDecimalObjectInspector extends AbstractPrimitiveJavaOb
 
   @Override
   public BigDecimal convert(Object o) {
-    return o == null ? null : ((HiveDecimalWritable) o).getHiveDecimal().bigDecimalValue();
+    return o == null ? null : ((HiveDecimal) o).bigDecimalValue();
   }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java
index 39b8226..78f7fa9 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java
@@ -41,7 +41,7 @@ public class IcebergTimestampObjectInspector extends AbstractPrimitiveJavaObject
 
   @Override
   public LocalDateTime convert(Object o) {
-    return o == null ? null : ((TimestampWritable) o).getTimestamp().toLocalDateTime();
+    return o == null ? null : ((Timestamp) o).toLocalDateTime();
   }
 
   @Override
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspector.java
index 2840024..b708e4e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspector.java
@@ -42,8 +42,7 @@ public class IcebergTimestampWithZoneObjectInspector extends AbstractPrimitiveJa
 
   @Override
   public OffsetDateTime convert(Object o) {
-    return o == null ? null :
-        OffsetDateTime.ofInstant(((TimestampWritable) o).getTimestamp().toInstant(), ZoneOffset.UTC);
+    return o == null ? null : OffsetDateTime.ofInstant(((Timestamp) o).toInstant(), ZoneOffset.UTC);
   }
 
   @Override
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java
index a7ac2ac..2092dc0 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java
@@ -19,6 +19,11 @@
 
 package org.apache.iceberg.mr.hive.serde.objectinspector;
 
+/**
+ * Interface for converting the Hive primitive objects for the objects which could be added to an Iceberg Record.
+ * If the IcebergObjectInspector does not implement this then the default Hive primitive objects will be used without
+ * conversion.
+ */
 public interface WriteObjectInspector {
   Object convert(Object value);
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
index 56f93e9..9114400 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
@@ -35,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
-import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -59,12 +58,8 @@ import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergBinaryObjectInspector;
-import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDecimalObjectInspector;
-import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
-import org.apache.iceberg.util.UUIDUtil;
 import org.junit.Assert;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
@@ -72,6 +67,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional;
 public class HiveIcebergTestUtils {
   // TODO: Can this be a constant all around the Iceberg tests?
   public static final Schema FULL_SCHEMA = new Schema(
+      // TODO: Create tests for field case insensitivity.
       optional(1, "boolean_type", Types.BooleanType.get()),
       optional(2, "integer_type", Types.IntegerType.get()),
       optional(3, "long_type", Types.LongType.get()),
@@ -80,33 +76,30 @@ public class HiveIcebergTestUtils {
       optional(6, "date_type", Types.DateType.get()),
       // TimeType is not supported
       // required(7, "time_type", Types.TimeType.get()),
-      optional(7, "tsTz", Types.TimestampType.withZone()),
+      optional(7, "tstz", Types.TimestampType.withZone()),
       optional(8, "ts", Types.TimestampType.withoutZone()),
       optional(9, "string_type", Types.StringType.get()),
-      optional(10, "uuid_type", Types.UUIDType.get()),
-      optional(11, "fixed_type", Types.FixedType.ofLength(3)),
-      optional(12, "binary_type", Types.BinaryType.get()),
-      optional(13, "decimal_type", Types.DecimalType.of(38, 10)));
+      optional(10, "fixed_type", Types.FixedType.ofLength(3)),
+      optional(11, "binary_type", Types.BinaryType.get()),
+      optional(12, "decimal_type", Types.DecimalType.of(38, 10)));
 
   public static final StandardStructObjectInspector FULL_SCHEMA_OBJECT_INSPECTOR =
       ObjectInspectorFactory.getStandardStructObjectInspector(
-          // Capitalized `boolean_type` field to check for field case insensitivity.
-          Arrays.asList("Boolean_Type", "integer_type", "long_type", "float_type", "double_type",
-              "date_type", "tsTz", "ts", "string_type", "uuid_type", "fixed_type", "binary_type", "decimal_type"),
+          Arrays.asList("boolean_type", "integer_type", "long_type", "float_type", "double_type",
+              "date_type", "tstz", "ts", "string_type", "fixed_type", "binary_type", "decimal_type"),
           Arrays.asList(
               PrimitiveObjectInspectorFactory.writableBooleanObjectInspector,
               PrimitiveObjectInspectorFactory.writableIntObjectInspector,
               PrimitiveObjectInspectorFactory.writableLongObjectInspector,
               PrimitiveObjectInspectorFactory.writableFloatObjectInspector,
               PrimitiveObjectInspectorFactory.writableDoubleObjectInspector,
-              IcebergObjectInspector.DATE_INSPECTOR,
-              IcebergObjectInspector.TIMESTAMP_INSPECTOR_WITH_TZ,
-              IcebergObjectInspector.TIMESTAMP_INSPECTOR,
+              PrimitiveObjectInspectorFactory.writableDateObjectInspector,
+              PrimitiveObjectInspectorFactory.writableTimestampObjectInspector,
+              PrimitiveObjectInspectorFactory.writableTimestampObjectInspector,
               PrimitiveObjectInspectorFactory.writableStringObjectInspector,
-              PrimitiveObjectInspectorFactory.writableStringObjectInspector,
-              IcebergBinaryObjectInspector.byteArray(),
-              IcebergBinaryObjectInspector.byteBuffer(),
-              IcebergDecimalObjectInspector.get(38, 10)
+              PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
+              PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
+              PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector
           ));
 
   private HiveIcebergTestUtils() {
@@ -115,10 +108,9 @@ public class HiveIcebergTestUtils {
 
   /**
    * Generates a test record where every field has a value.
-   * @param uuidAsByte As per #1881 Parquet needs byte[] value for UUID, other file formats need UUID object
    * @return Record with every field set
    */
-  public static Record getTestRecord(boolean uuidAsByte) {
+  public static Record getTestRecord() {
     Record record = GenericRecord.create(HiveIcebergTestUtils.FULL_SCHEMA);
     record.set(0, true);
     record.set(1, 1);
@@ -132,15 +124,9 @@ public class HiveIcebergTestUtils {
     record.set(6, OffsetDateTime.of(2017, 11, 22, 11, 30, 7, 0, ZoneOffset.ofHours(2)));
     record.set(7, LocalDateTime.of(2019, 2, 22, 9, 44, 54));
     record.set(8, "kilenc");
-    if (uuidAsByte) {
-      // TODO: Parquet UUID expect byte[], others are expecting UUID
-      record.set(9, UUIDUtil.convert(UUID.fromString("1-2-3-4-5")));
-    } else {
-      record.set(9, UUID.fromString("1-2-3-4-5"));
-    }
-    record.set(10, new byte[]{0, 1, 2});
-    record.set(11, ByteBuffer.wrap(new byte[]{0, 1, 2, 3}));
-    record.set(12, new BigDecimal("0.0000000013"));
+    record.set(9, new byte[]{0, 1, 2});
+    record.set(10, ByteBuffer.wrap(new byte[]{0, 1, 2, 3}));
+    record.set(11, new BigDecimal("0.0000000013"));
 
     return record;
   }
@@ -177,10 +163,9 @@ public class HiveIcebergTestUtils {
         new TimestampWritable(Timestamp.from(record.get(6, OffsetDateTime.class).toInstant())),
         new TimestampWritable(Timestamp.valueOf(record.get(7, LocalDateTime.class))),
         new Text(record.get(8, String.class)),
-        new Text(record.get(9, UUID.class).toString()),
-        new BytesWritable(record.get(10, byte[].class)),
-        new BytesWritable(ByteBuffers.toByteArray(record.get(11, ByteBuffer.class))),
-        new HiveDecimalWritable(HiveDecimal.create(record.get(12, BigDecimal.class)))
+        new BytesWritable(record.get(9, byte[].class)),
+        new BytesWritable(ByteBuffers.toByteArray(record.get(10, ByteBuffer.class))),
+        new HiveDecimalWritable(HiveDecimal.create(record.get(11, BigDecimal.class)))
     );
   }
 
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
index 76f7cf5..d31a5d7 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
@@ -21,9 +21,9 @@ package org.apache.iceberg.mr.hive;
 
 import java.util.Arrays;
 import java.util.Collections;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
@@ -33,6 +33,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -66,7 +67,8 @@ public class TestDeserializer {
 
     Deserializer deserializer = new Deserializer.Builder()
         .schema(CUSTOMER_SCHEMA)
-        .inspector(schemaObjectInspector)
+        .writerInspector((StructObjectInspector) IcebergObjectInspector.create(CUSTOMER_SCHEMA))
+        .sourceInspector(schemaObjectInspector)
         .build();
 
     Record expected = GenericRecord.create(CUSTOMER_SCHEMA);
@@ -82,7 +84,8 @@ public class TestDeserializer {
   public void testStructDeserialize() {
     Deserializer deserializer = new Deserializer.Builder()
         .schema(CUSTOMER_SCHEMA)
-        .inspector(CUSTOMER_OBJECT_INSPECTOR)
+        .writerInspector((StructObjectInspector) IcebergObjectInspector.create(CUSTOMER_SCHEMA))
+        .sourceInspector(CUSTOMER_OBJECT_INSPECTOR)
         .build();
 
     Record expected = GenericRecord.create(CUSTOMER_SCHEMA);
@@ -103,7 +106,7 @@ public class TestDeserializer {
         ))
     );
 
-    ObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+    StructObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector(
         Arrays.asList("map_type"),
         Arrays.asList(
             ObjectInspectorFactory.getStandardMapObjectInspector(
@@ -114,7 +117,8 @@ public class TestDeserializer {
 
     Deserializer deserializer = new Deserializer.Builder()
         .schema(schema)
-        .inspector(inspector)
+        .writerInspector((StructObjectInspector) IcebergObjectInspector.create(schema))
+        .sourceInspector(inspector)
         .build();
 
     Record expected = GenericRecord.create(schema);
@@ -134,7 +138,7 @@ public class TestDeserializer {
         optional(1, "list_type", Types.ListType.ofOptional(2, Types.LongType.get()))
     );
 
-    ObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+    StructObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector(
         Arrays.asList("list_type"),
         Arrays.asList(
             ObjectInspectorFactory.getStandardListObjectInspector(
@@ -143,7 +147,8 @@ public class TestDeserializer {
 
     Deserializer deserializer = new Deserializer.Builder()
         .schema(schema)
-        .inspector(inspector)
+        .writerInspector((StructObjectInspector) IcebergObjectInspector.create(schema))
+        .sourceInspector(inspector)
         .build();
 
     Record expected = GenericRecord.create(schema);
@@ -161,10 +166,11 @@ public class TestDeserializer {
 
     Deserializer deserializer = new Deserializer.Builder()
         .schema(HiveIcebergTestUtils.FULL_SCHEMA)
-        .inspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR)
+        .writerInspector((StructObjectInspector) IcebergObjectInspector.create(HiveIcebergTestUtils.FULL_SCHEMA))
+        .sourceInspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR)
         .build();
 
-    Record expected = HiveIcebergTestUtils.getTestRecord(false);
+    Record expected = HiveIcebergTestUtils.getTestRecord();
     Record actual = deserializer.deserialize(HiveIcebergTestUtils.valuesForTestRecord(expected));
 
     HiveIcebergTestUtils.assertEquals(expected, actual);
@@ -174,7 +180,8 @@ public class TestDeserializer {
   public void testNullDeserialize() {
     Deserializer deserializer = new Deserializer.Builder()
         .schema(HiveIcebergTestUtils.FULL_SCHEMA)
-        .inspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR)
+        .writerInspector((StructObjectInspector) IcebergObjectInspector.create(HiveIcebergTestUtils.FULL_SCHEMA))
+        .sourceInspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR)
         .build();
 
     Record expected = HiveIcebergTestUtils.getNullTestRecord();
@@ -202,10 +209,11 @@ public class TestDeserializer {
         ));
 
     AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
-        "Unsupported column type", () -> {
+        "type is not supported", () -> {
           new Deserializer.Builder()
               .schema(unsupported)
-              .inspector(objectInspector)
+              .writerInspector((StructObjectInspector) IcebergObjectInspector.create(unsupported))
+              .sourceInspector(objectInspector)
               .build();
         }
     );
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspector.java
index 79629f0..85fe373 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspector.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspector.java
@@ -61,7 +61,7 @@ public class TestIcebergTimestampObjectInspector {
 
     Assert.assertFalse(oi.preferWritable());
 
-    Assert.assertEquals(local, oi.convert(new TimestampWritable(ts)));
+    Assert.assertEquals(local, oi.convert(ts));
   }
 
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspector.java
index e4edeb9..2aef6d7 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspector.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspector.java
@@ -65,10 +65,10 @@ public class TestIcebergTimestampWithZoneObjectInspector {
     Assert.assertFalse(oi.preferWritable());
 
     Assert.assertEquals(OffsetDateTime.ofInstant(local.toInstant(ZoneOffset.ofHours(-5)), ZoneOffset.UTC),
-            oi.convert(new TimestampWritable(ts)));
+            oi.convert(ts));
 
     Assert.assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
-            oi.convert(new TimestampWritable(Timestamp.from(offsetDateTime.toInstant()))));
+            oi.convert(Timestamp.from(offsetDateTime.toInstant())));
   }
 
 }