You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/01/16 01:48:23 UTC
[iceberg] branch master updated: Hive: Fix Deserializer to use
source deserializer (#2078)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7292c51 Hive: Fix Deserializer to use source deserializer (#2078)
7292c51 is described below
commit 7292c513ee3fa3fa654dc22df8d7cf6e7005b119
Author: pvary <pv...@cloudera.com>
AuthorDate: Sat Jan 16 02:48:15 2021 +0100
Hive: Fix Deserializer to use source deserializer (#2078)
---
.../IcebergTimestampObjectInspectorHive3.java | 2 +-
...ebergTimestampWithZoneObjectInspectorHive3.java | 2 +-
.../TestIcebergTimestampObjectInspectorHive3.java | 2 +-
...ebergTimestampWithZoneObjectInspectorHive3.java | 2 +-
.../org/apache/iceberg/mr/hive/Deserializer.java | 176 +++++++++++----------
.../IcebergBinaryObjectInspector.java | 4 +-
.../IcebergDateObjectInspector.java | 2 +-
.../IcebergDecimalObjectInspector.java | 2 +-
.../IcebergTimestampObjectInspector.java | 2 +-
.../IcebergTimestampWithZoneObjectInspector.java | 3 +-
.../objectinspector/WriteObjectInspector.java | 5 +
.../iceberg/mr/hive/HiveIcebergTestUtils.java | 55 +++----
.../apache/iceberg/mr/hive/TestDeserializer.java | 32 ++--
.../TestIcebergTimestampObjectInspector.java | 2 +-
...estIcebergTimestampWithZoneObjectInspector.java | 4 +-
15 files changed, 150 insertions(+), 145 deletions(-)
diff --git a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java
index 652cf13..be33870 100644
--- a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java
+++ b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspectorHive3.java
@@ -45,7 +45,7 @@ public class IcebergTimestampObjectInspectorHive3 extends AbstractPrimitiveJavaO
if (o == null) {
return null;
}
- Timestamp timestamp = ((TimestampWritableV2) o).getTimestamp();
+ Timestamp timestamp = (Timestamp) o;
return LocalDateTime.ofEpochSecond(timestamp.toEpochSecond(), timestamp.getNanos(), ZoneOffset.UTC);
}
diff --git a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspectorHive3.java b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspectorHive3.java
index 417864d..199ea36 100644
--- a/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspectorHive3.java
+++ b/hive3/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspectorHive3.java
@@ -47,7 +47,7 @@ public class IcebergTimestampWithZoneObjectInspectorHive3 extends AbstractPrimit
if (o == null) {
return null;
}
- ZonedDateTime zdt = ((TimestampLocalTZWritable) o).getTimestampTZ().getZonedDateTime();
+ ZonedDateTime zdt = ((TimestampTZ) o).getZonedDateTime();
return OffsetDateTime.of(zdt.toLocalDateTime(), zdt.getOffset());
}
diff --git a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java
index e22f71f..89ee0cd 100644
--- a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java
+++ b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspectorHive3.java
@@ -65,7 +65,7 @@ public class TestIcebergTimestampObjectInspectorHive3 {
Assert.assertFalse(oi.preferWritable());
- Assert.assertEquals(local, oi.convert(new TimestampWritableV2(ts)));
+ Assert.assertEquals(local, oi.convert(ts));
}
}
diff --git a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspectorHive3.java b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspectorHive3.java
index 9aacdb9..bc49245 100644
--- a/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspectorHive3.java
+++ b/hive3/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspectorHive3.java
@@ -70,7 +70,7 @@ public class TestIcebergTimestampWithZoneObjectInspectorHive3 {
Assert.assertFalse(oi.preferWritable());
- Assert.assertEquals(OffsetDateTime.of(dateTimeAtUTC, ZoneOffset.UTC), oi.convert(new TimestampLocalTZWritable(ts)));
+ Assert.assertEquals(OffsetDateTime.of(dateTimeAtUTC, ZoneOffset.UTC), oi.convert(ts));
}
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
index b59db22..458affd 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
@@ -23,14 +23,12 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
@@ -52,20 +50,26 @@ class Deserializer {
*/
static class Builder {
private Schema schema;
- private ObjectInspector inspector;
+ private StructObjectInspector writerInspector;
+ private StructObjectInspector sourceInspector;
Builder schema(Schema mainSchema) {
this.schema = mainSchema;
return this;
}
- Builder inspector(ObjectInspector mainInspector) {
- this.inspector = mainInspector;
+ Builder writerInspector(StructObjectInspector inspector) {
+ this.writerInspector = inspector;
+ return this;
+ }
+
+ Builder sourceInspector(StructObjectInspector inspector) {
+ this.sourceInspector = inspector;
return this;
}
Deserializer build() {
- return new Deserializer(schema, inspector);
+ return new Deserializer(schema, new ObjectInspectorPair(writerInspector, sourceInspector));
}
}
@@ -78,62 +82,55 @@ class Deserializer {
return (Record) fieldDeserializer.value(data);
}
- private Deserializer(Schema schema, ObjectInspector fieldInspector) {
- this.fieldDeserializer = DeserializerVisitor.visit(schema, fieldInspector);
+ private Deserializer(Schema schema, ObjectInspectorPair pair) {
+ this.fieldDeserializer = DeserializerVisitor.visit(schema, pair);
}
- private static class DeserializerVisitor extends SchemaWithPartnerVisitor<ObjectInspector, FieldDeserializer> {
+ private static class DeserializerVisitor extends SchemaWithPartnerVisitor<ObjectInspectorPair, FieldDeserializer> {
- public static FieldDeserializer visit(Schema schema, ObjectInspector objectInspector) {
- return visit(schema, new FixNameMappingObjectInspector(schema, objectInspector), new DeserializerVisitor(),
+ public static FieldDeserializer visit(Schema schema, ObjectInspectorPair pair) {
+ return visit(schema, new FixNameMappingObjectInspectorPair(schema, pair), new DeserializerVisitor(),
new PartnerObjectInspectorByNameAccessors());
}
@Override
- public FieldDeserializer schema(Schema schema, ObjectInspector inspector, FieldDeserializer deserializer) {
+ public FieldDeserializer schema(Schema schema, ObjectInspectorPair pair, FieldDeserializer deserializer) {
return deserializer;
}
@Override
- public FieldDeserializer field(NestedField field, ObjectInspector inspector, FieldDeserializer deserializer) {
+ public FieldDeserializer field(NestedField field, ObjectInspectorPair pair, FieldDeserializer deserializer) {
return deserializer;
}
@Override
- public FieldDeserializer primitive(PrimitiveType type, ObjectInspector inspector) {
- switch (type.typeId()) {
- case BOOLEAN:
- case INTEGER:
- case LONG:
- case FLOAT:
- case DOUBLE:
- case STRING:
- // Generic conversions where Iceberg and Hive are using the same java object
- return o -> ((PrimitiveObjectInspector) inspector).getPrimitiveJavaObject(o);
- case UUID:
- // TODO: This will not work with Parquet. Parquet UUID expect byte[], others are expecting UUID
- return o -> UUID.fromString(((StringObjectInspector) inspector).getPrimitiveJavaObject(o));
- case DATE:
- case TIMESTAMP:
- case FIXED:
- case BINARY:
- case DECIMAL:
- // Iceberg specific conversions
- return o -> ((WriteObjectInspector) inspector).convert(o);
- case TIME:
- default:
- throw new IllegalArgumentException("Unsupported column type: " + type);
- }
+ public FieldDeserializer primitive(PrimitiveType type, ObjectInspectorPair pair) {
+ return o -> {
+ if (o == null) {
+ return null;
+ }
+
+ ObjectInspector writerFieldInspector = pair.writerInspector();
+ ObjectInspector sourceFieldInspector = pair.sourceInspector();
+
+ Object result = ((PrimitiveObjectInspector) sourceFieldInspector).getPrimitiveJavaObject(o);
+ if (writerFieldInspector instanceof WriteObjectInspector) {
+ // If we have a conversion method defined for the ObjectInspector then convert
+ result = ((WriteObjectInspector) writerFieldInspector).convert(result);
+ }
+
+ return result;
+ };
}
@Override
- public FieldDeserializer struct(StructType type, ObjectInspector inspector, List<FieldDeserializer> deserializers) {
+ public FieldDeserializer struct(StructType type, ObjectInspectorPair pair, List<FieldDeserializer> deserializers) {
return o -> {
if (o == null) {
return null;
}
- List<Object> data = ((StructObjectInspector) inspector).getStructFieldsDataAsList(o);
+ List<Object> data = ((StructObjectInspector) pair.sourceInspector()).getStructFieldsDataAsList(o);
Record result = GenericRecord.create(type);
for (int i = 0; i < deserializers.size(); i++) {
@@ -150,14 +147,14 @@ class Deserializer {
}
@Override
- public FieldDeserializer list(ListType listTypeInfo, ObjectInspector inspector, FieldDeserializer deserializer) {
+ public FieldDeserializer list(ListType listTypeInfo, ObjectInspectorPair pair, FieldDeserializer deserializer) {
return o -> {
if (o == null) {
return null;
}
List<Object> result = new ArrayList<>();
- ListObjectInspector listInspector = (ListObjectInspector) inspector;
+ ListObjectInspector listInspector = (ListObjectInspector) pair.sourceInspector();
for (Object val : listInspector.getList(o)) {
result.add(deserializer.value(val));
@@ -168,7 +165,7 @@ class Deserializer {
}
@Override
- public FieldDeserializer map(MapType mapType, ObjectInspector inspector, FieldDeserializer keyDeserializer,
+ public FieldDeserializer map(MapType mapType, ObjectInspectorPair pair, FieldDeserializer keyDeserializer,
FieldDeserializer valueDeserializer) {
return o -> {
if (o == null) {
@@ -176,7 +173,7 @@ class Deserializer {
}
Map<Object, Object> result = new HashMap<>();
- MapObjectInspector mapObjectInspector = (MapObjectInspector) inspector;
+ MapObjectInspector mapObjectInspector = (MapObjectInspector) pair.sourceInspector();
for (Map.Entry<?, ?> entry : mapObjectInspector.getMap(o).entrySet()) {
result.put(keyDeserializer.value(entry.getKey()), valueDeserializer.value(entry.getValue()));
@@ -187,30 +184,35 @@ class Deserializer {
}
private static class PartnerObjectInspectorByNameAccessors
- implements SchemaWithPartnerVisitor.PartnerAccessors<ObjectInspector> {
+ implements SchemaWithPartnerVisitor.PartnerAccessors<ObjectInspectorPair> {
@Override
- public ObjectInspector fieldPartner(ObjectInspector inspector, int fieldId, String name) {
- StructObjectInspector fieldInspector = (StructObjectInspector) inspector;
- return fieldInspector.getStructFieldRef(name).getFieldObjectInspector();
+ public ObjectInspectorPair fieldPartner(ObjectInspectorPair pair, int fieldId, String name) {
+ String sourceName = pair.sourceName(name);
+ return new ObjectInspectorPair(
+ ((StructObjectInspector) pair.writerInspector()).getStructFieldRef(name).getFieldObjectInspector(),
+ ((StructObjectInspector) pair.sourceInspector()).getStructFieldRef(sourceName).getFieldObjectInspector());
}
@Override
- public ObjectInspector mapKeyPartner(ObjectInspector inspector) {
- MapObjectInspector fieldInspector = (MapObjectInspector) inspector;
- return fieldInspector.getMapKeyObjectInspector();
+ public ObjectInspectorPair mapKeyPartner(ObjectInspectorPair pair) {
+ return new ObjectInspectorPair(
+ ((MapObjectInspector) pair.writerInspector()).getMapKeyObjectInspector(),
+ ((MapObjectInspector) pair.sourceInspector()).getMapKeyObjectInspector());
}
@Override
- public ObjectInspector mapValuePartner(ObjectInspector inspector) {
- MapObjectInspector fieldInspector = (MapObjectInspector) inspector;
- return fieldInspector.getMapValueObjectInspector();
+ public ObjectInspectorPair mapValuePartner(ObjectInspectorPair pair) {
+ return new ObjectInspectorPair(
+ ((MapObjectInspector) pair.writerInspector()).getMapValueObjectInspector(),
+ ((MapObjectInspector) pair.sourceInspector()).getMapValueObjectInspector());
}
@Override
- public ObjectInspector listElementPartner(ObjectInspector inspector) {
- ListObjectInspector fieldInspector = (ListObjectInspector) inspector;
- return fieldInspector.getListElementObjectInspector();
+ public ObjectInspectorPair listElementPartner(ObjectInspectorPair pair) {
+ return new ObjectInspectorPair(
+ ((ListObjectInspector) pair.writerInspector()).getListElementObjectInspector(),
+ ((ListObjectInspector) pair.sourceInspector()).getListElementObjectInspector());
}
}
@@ -221,51 +223,57 @@ class Deserializer {
/**
* Hive query results schema column names do not match the target Iceberg column names.
* Instead we have to rely on the column order. To keep the other parts of the code generic we fix this with a
- * wrapper around the ObjectInspector. This wrapper uses the Iceberg schema column names instead of the Hive column
- * names for {@link #getStructFieldRef(String) getStructFieldRef}
+ * wrapper around the ObjectInspectorPair. This wrapper maps the Iceberg schema column names instead of the Hive
+ * column names.
*/
- private static class FixNameMappingObjectInspector extends StructObjectInspector {
- private final StructObjectInspector innerInspector;
- private final Map<String, StructField> nameMap;
+ private static class FixNameMappingObjectInspectorPair extends ObjectInspectorPair {
+ private final Map<String, String> sourceNameMap;
+
+ FixNameMappingObjectInspectorPair(Schema schema, ObjectInspectorPair pair) {
+ super(pair.writerInspector(), pair.sourceInspector());
- private FixNameMappingObjectInspector(Schema schema, ObjectInspector inspector) {
- this.nameMap = new HashMap<>(schema.columns().size());
- this.innerInspector = (StructObjectInspector) inspector;
- List<? extends StructField> fields = innerInspector.getAllStructFieldRefs();
+ this.sourceNameMap = new HashMap<>(schema.columns().size());
+ List<? extends StructField> fields = ((StructObjectInspector) sourceInspector()).getAllStructFieldRefs();
for (int i = 0; i < schema.columns().size(); ++i) {
- nameMap.put(schema.columns().get(i).name(), fields.get(i));
+ sourceNameMap.put(schema.columns().get(i).name(), fields.get(i).getFieldName());
}
}
@Override
- public List<? extends StructField> getAllStructFieldRefs() {
- return innerInspector.getAllStructFieldRefs();
+ String sourceName(String originalName) {
+ return sourceNameMap.get(originalName);
}
+ }
- @Override
- public StructField getStructFieldRef(String fieldName) {
- return nameMap.get(fieldName);
- }
+ /**
+ * To get the data for Iceberg {@link Record}s we have to use both ObjectInspectors.
+ * <p>
+ * We use the Hive ObjectInspectors (sourceInspector) to get the Hive primitive types.
+ * <p>
+ * We use the Iceberg ObjectInspectors (writerInspector) only if conversion is needed for
+ * generating the correct type for Iceberg Records. See: {@link WriteObjectInspector} interface on the provided
+ * writerInspector.
+ */
+ private static class ObjectInspectorPair {
+ private ObjectInspector writerInspector;
+ private ObjectInspector sourceInspector;
- @Override
- public Object getStructFieldData(Object data, StructField fieldRef) {
- return innerInspector.getStructFieldData(data, fieldRef);
+ ObjectInspectorPair(ObjectInspector writerInspector, ObjectInspector sourceInspector) {
+ this.writerInspector = writerInspector;
+ this.sourceInspector = sourceInspector;
}
- @Override
- public List<Object> getStructFieldsDataAsList(Object data) {
- return innerInspector.getStructFieldsDataAsList(data);
+ ObjectInspector writerInspector() {
+ return writerInspector;
}
- @Override
- public String getTypeName() {
- return innerInspector.getTypeName();
+ ObjectInspector sourceInspector() {
+ return sourceInspector;
}
- @Override
- public Category getCategory() {
- return innerInspector.getCategory();
+ String sourceName(String originalName) {
+ return originalName;
}
}
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java
index de6a26f..5fa0181 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergBinaryObjectInspector.java
@@ -38,7 +38,7 @@ public abstract class IcebergBinaryObjectInspector extends AbstractPrimitiveJava
@Override
public byte[] convert(Object o) {
- return o == null ? null : ((BytesWritable) o).getBytes();
+ return (byte[]) o;
}
};
@@ -50,7 +50,7 @@ public abstract class IcebergBinaryObjectInspector extends AbstractPrimitiveJava
@Override
public ByteBuffer convert(Object o) {
- return o == null ? null : ByteBuffer.wrap(((BytesWritable) o).getBytes());
+ return o == null ? null : ByteBuffer.wrap((byte[]) o);
}
};
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java
index 8959780..63af550 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDateObjectInspector.java
@@ -67,6 +67,6 @@ public final class IcebergDateObjectInspector extends AbstractPrimitiveJavaObjec
@Override
public LocalDate convert(Object o) {
- return o == null ? null : ((DateWritable) o).get().toLocalDate();
+ return o == null ? null : ((Date) o).toLocalDate();
}
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java
index fa023d2..3a16833 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergDecimalObjectInspector.java
@@ -80,6 +80,6 @@ public final class IcebergDecimalObjectInspector extends AbstractPrimitiveJavaOb
@Override
public BigDecimal convert(Object o) {
- return o == null ? null : ((HiveDecimalWritable) o).getHiveDecimal().bigDecimalValue();
+ return o == null ? null : ((HiveDecimal) o).bigDecimalValue();
}
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java
index 39b8226..78f7fa9 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampObjectInspector.java
@@ -41,7 +41,7 @@ public class IcebergTimestampObjectInspector extends AbstractPrimitiveJavaObject
@Override
public LocalDateTime convert(Object o) {
- return o == null ? null : ((TimestampWritable) o).getTimestamp().toLocalDateTime();
+ return o == null ? null : ((Timestamp) o).toLocalDateTime();
}
@Override
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspector.java
index 2840024..b708e4e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergTimestampWithZoneObjectInspector.java
@@ -42,8 +42,7 @@ public class IcebergTimestampWithZoneObjectInspector extends AbstractPrimitiveJa
@Override
public OffsetDateTime convert(Object o) {
- return o == null ? null :
- OffsetDateTime.ofInstant(((TimestampWritable) o).getTimestamp().toInstant(), ZoneOffset.UTC);
+ return o == null ? null : OffsetDateTime.ofInstant(((Timestamp) o).toInstant(), ZoneOffset.UTC);
}
@Override
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java
index a7ac2ac..2092dc0 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/WriteObjectInspector.java
@@ -19,6 +19,11 @@
package org.apache.iceberg.mr.hive.serde.objectinspector;
+/**
+ * Interface for converting the Hive primitive objects for the objects which could be added to an Iceberg Record.
+ * If the IcebergObjectInspector does not implement this then the default Hive primitive objects will be used without
+ * conversion.
+ */
public interface WriteObjectInspector {
Object convert(Object value);
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
index 56f93e9..9114400 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
@@ -35,7 +35,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
-import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -59,12 +58,8 @@ import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergBinaryObjectInspector;
-import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDecimalObjectInspector;
-import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
-import org.apache.iceberg.util.UUIDUtil;
import org.junit.Assert;
import static org.apache.iceberg.types.Types.NestedField.optional;
@@ -72,6 +67,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional;
public class HiveIcebergTestUtils {
// TODO: Can this be a constant all around the Iceberg tests?
public static final Schema FULL_SCHEMA = new Schema(
+ // TODO: Create tests for field case insensitivity.
optional(1, "boolean_type", Types.BooleanType.get()),
optional(2, "integer_type", Types.IntegerType.get()),
optional(3, "long_type", Types.LongType.get()),
@@ -80,33 +76,30 @@ public class HiveIcebergTestUtils {
optional(6, "date_type", Types.DateType.get()),
// TimeType is not supported
// required(7, "time_type", Types.TimeType.get()),
- optional(7, "tsTz", Types.TimestampType.withZone()),
+ optional(7, "tstz", Types.TimestampType.withZone()),
optional(8, "ts", Types.TimestampType.withoutZone()),
optional(9, "string_type", Types.StringType.get()),
- optional(10, "uuid_type", Types.UUIDType.get()),
- optional(11, "fixed_type", Types.FixedType.ofLength(3)),
- optional(12, "binary_type", Types.BinaryType.get()),
- optional(13, "decimal_type", Types.DecimalType.of(38, 10)));
+ optional(10, "fixed_type", Types.FixedType.ofLength(3)),
+ optional(11, "binary_type", Types.BinaryType.get()),
+ optional(12, "decimal_type", Types.DecimalType.of(38, 10)));
public static final StandardStructObjectInspector FULL_SCHEMA_OBJECT_INSPECTOR =
ObjectInspectorFactory.getStandardStructObjectInspector(
- // Capitalized `boolean_type` field to check for field case insensitivity.
- Arrays.asList("Boolean_Type", "integer_type", "long_type", "float_type", "double_type",
- "date_type", "tsTz", "ts", "string_type", "uuid_type", "fixed_type", "binary_type", "decimal_type"),
+ Arrays.asList("boolean_type", "integer_type", "long_type", "float_type", "double_type",
+ "date_type", "tstz", "ts", "string_type", "fixed_type", "binary_type", "decimal_type"),
Arrays.asList(
PrimitiveObjectInspectorFactory.writableBooleanObjectInspector,
PrimitiveObjectInspectorFactory.writableIntObjectInspector,
PrimitiveObjectInspectorFactory.writableLongObjectInspector,
PrimitiveObjectInspectorFactory.writableFloatObjectInspector,
PrimitiveObjectInspectorFactory.writableDoubleObjectInspector,
- IcebergObjectInspector.DATE_INSPECTOR,
- IcebergObjectInspector.TIMESTAMP_INSPECTOR_WITH_TZ,
- IcebergObjectInspector.TIMESTAMP_INSPECTOR,
+ PrimitiveObjectInspectorFactory.writableDateObjectInspector,
+ PrimitiveObjectInspectorFactory.writableTimestampObjectInspector,
+ PrimitiveObjectInspectorFactory.writableTimestampObjectInspector,
PrimitiveObjectInspectorFactory.writableStringObjectInspector,
- PrimitiveObjectInspectorFactory.writableStringObjectInspector,
- IcebergBinaryObjectInspector.byteArray(),
- IcebergBinaryObjectInspector.byteBuffer(),
- IcebergDecimalObjectInspector.get(38, 10)
+ PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
+ PrimitiveObjectInspectorFactory.writableBinaryObjectInspector,
+ PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector
));
private HiveIcebergTestUtils() {
@@ -115,10 +108,9 @@ public class HiveIcebergTestUtils {
/**
* Generates a test record where every field has a value.
- * @param uuidAsByte As per #1881 Parquet needs byte[] value for UUID, other file formats need UUID object
* @return Record with every field set
*/
- public static Record getTestRecord(boolean uuidAsByte) {
+ public static Record getTestRecord() {
Record record = GenericRecord.create(HiveIcebergTestUtils.FULL_SCHEMA);
record.set(0, true);
record.set(1, 1);
@@ -132,15 +124,9 @@ public class HiveIcebergTestUtils {
record.set(6, OffsetDateTime.of(2017, 11, 22, 11, 30, 7, 0, ZoneOffset.ofHours(2)));
record.set(7, LocalDateTime.of(2019, 2, 22, 9, 44, 54));
record.set(8, "kilenc");
- if (uuidAsByte) {
- // TODO: Parquet UUID expect byte[], others are expecting UUID
- record.set(9, UUIDUtil.convert(UUID.fromString("1-2-3-4-5")));
- } else {
- record.set(9, UUID.fromString("1-2-3-4-5"));
- }
- record.set(10, new byte[]{0, 1, 2});
- record.set(11, ByteBuffer.wrap(new byte[]{0, 1, 2, 3}));
- record.set(12, new BigDecimal("0.0000000013"));
+ record.set(9, new byte[]{0, 1, 2});
+ record.set(10, ByteBuffer.wrap(new byte[]{0, 1, 2, 3}));
+ record.set(11, new BigDecimal("0.0000000013"));
return record;
}
@@ -177,10 +163,9 @@ public class HiveIcebergTestUtils {
new TimestampWritable(Timestamp.from(record.get(6, OffsetDateTime.class).toInstant())),
new TimestampWritable(Timestamp.valueOf(record.get(7, LocalDateTime.class))),
new Text(record.get(8, String.class)),
- new Text(record.get(9, UUID.class).toString()),
- new BytesWritable(record.get(10, byte[].class)),
- new BytesWritable(ByteBuffers.toByteArray(record.get(11, ByteBuffer.class))),
- new HiveDecimalWritable(HiveDecimal.create(record.get(12, BigDecimal.class)))
+ new BytesWritable(record.get(9, byte[].class)),
+ new BytesWritable(ByteBuffers.toByteArray(record.get(10, ByteBuffer.class))),
+ new HiveDecimalWritable(HiveDecimal.create(record.get(11, BigDecimal.class)))
);
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
index 76f7cf5..d31a5d7 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestDeserializer.java
@@ -21,9 +21,9 @@ package org.apache.iceberg.mr.hive;
import java.util.Arrays;
import java.util.Collections;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
@@ -33,6 +33,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
@@ -66,7 +67,8 @@ public class TestDeserializer {
Deserializer deserializer = new Deserializer.Builder()
.schema(CUSTOMER_SCHEMA)
- .inspector(schemaObjectInspector)
+ .writerInspector((StructObjectInspector) IcebergObjectInspector.create(CUSTOMER_SCHEMA))
+ .sourceInspector(schemaObjectInspector)
.build();
Record expected = GenericRecord.create(CUSTOMER_SCHEMA);
@@ -82,7 +84,8 @@ public class TestDeserializer {
public void testStructDeserialize() {
Deserializer deserializer = new Deserializer.Builder()
.schema(CUSTOMER_SCHEMA)
- .inspector(CUSTOMER_OBJECT_INSPECTOR)
+ .writerInspector((StructObjectInspector) IcebergObjectInspector.create(CUSTOMER_SCHEMA))
+ .sourceInspector(CUSTOMER_OBJECT_INSPECTOR)
.build();
Record expected = GenericRecord.create(CUSTOMER_SCHEMA);
@@ -103,7 +106,7 @@ public class TestDeserializer {
))
);
- ObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ StructObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList("map_type"),
Arrays.asList(
ObjectInspectorFactory.getStandardMapObjectInspector(
@@ -114,7 +117,8 @@ public class TestDeserializer {
Deserializer deserializer = new Deserializer.Builder()
.schema(schema)
- .inspector(inspector)
+ .writerInspector((StructObjectInspector) IcebergObjectInspector.create(schema))
+ .sourceInspector(inspector)
.build();
Record expected = GenericRecord.create(schema);
@@ -134,7 +138,7 @@ public class TestDeserializer {
optional(1, "list_type", Types.ListType.ofOptional(2, Types.LongType.get()))
);
- ObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ StructObjectInspector inspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList("list_type"),
Arrays.asList(
ObjectInspectorFactory.getStandardListObjectInspector(
@@ -143,7 +147,8 @@ public class TestDeserializer {
Deserializer deserializer = new Deserializer.Builder()
.schema(schema)
- .inspector(inspector)
+ .writerInspector((StructObjectInspector) IcebergObjectInspector.create(schema))
+ .sourceInspector(inspector)
.build();
Record expected = GenericRecord.create(schema);
@@ -161,10 +166,11 @@ public class TestDeserializer {
Deserializer deserializer = new Deserializer.Builder()
.schema(HiveIcebergTestUtils.FULL_SCHEMA)
- .inspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR)
+ .writerInspector((StructObjectInspector) IcebergObjectInspector.create(HiveIcebergTestUtils.FULL_SCHEMA))
+ .sourceInspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR)
.build();
- Record expected = HiveIcebergTestUtils.getTestRecord(false);
+ Record expected = HiveIcebergTestUtils.getTestRecord();
Record actual = deserializer.deserialize(HiveIcebergTestUtils.valuesForTestRecord(expected));
HiveIcebergTestUtils.assertEquals(expected, actual);
@@ -174,7 +180,8 @@ public class TestDeserializer {
public void testNullDeserialize() {
Deserializer deserializer = new Deserializer.Builder()
.schema(HiveIcebergTestUtils.FULL_SCHEMA)
- .inspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR)
+ .writerInspector((StructObjectInspector) IcebergObjectInspector.create(HiveIcebergTestUtils.FULL_SCHEMA))
+ .sourceInspector(HiveIcebergTestUtils.FULL_SCHEMA_OBJECT_INSPECTOR)
.build();
Record expected = HiveIcebergTestUtils.getNullTestRecord();
@@ -202,10 +209,11 @@ public class TestDeserializer {
));
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
- "Unsupported column type", () -> {
+ "type is not supported", () -> {
new Deserializer.Builder()
.schema(unsupported)
- .inspector(objectInspector)
+ .writerInspector((StructObjectInspector) IcebergObjectInspector.create(unsupported))
+ .sourceInspector(objectInspector)
.build();
}
);
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspector.java
index 79629f0..85fe373 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspector.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampObjectInspector.java
@@ -61,7 +61,7 @@ public class TestIcebergTimestampObjectInspector {
Assert.assertFalse(oi.preferWritable());
- Assert.assertEquals(local, oi.convert(new TimestampWritable(ts)));
+ Assert.assertEquals(local, oi.convert(ts));
}
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspector.java b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspector.java
index e4edeb9..2aef6d7 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspector.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/serde/objectinspector/TestIcebergTimestampWithZoneObjectInspector.java
@@ -65,10 +65,10 @@ public class TestIcebergTimestampWithZoneObjectInspector {
Assert.assertFalse(oi.preferWritable());
Assert.assertEquals(OffsetDateTime.ofInstant(local.toInstant(ZoneOffset.ofHours(-5)), ZoneOffset.UTC),
- oi.convert(new TimestampWritable(ts)));
+ oi.convert(ts));
Assert.assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC),
- oi.convert(new TimestampWritable(Timestamp.from(offsetDateTime.toInstant()))));
+ oi.convert(Timestamp.from(offsetDateTime.toInstant())));
}
}