You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/18 02:54:28 UTC
hive git commit: HIVE-18410 : [Performance][Avro] Reading flat Avro
tables is very expensive in Hive (Ratandeep Ratti via Anthony Hsu,
Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 5b4c29d43 -> cacb1c095
HIVE-18410 : [Performance][Avro] Reading flat Avro tables is very expensive in Hive (Ratandeep Ratti via Anthony Hsu, Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cacb1c09
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cacb1c09
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cacb1c09
Branch: refs/heads/master
Commit: cacb1c09574c89ac07fcffc0b8c3fad18e283aec
Parents: 5b4c29d
Author: Ratandeep Ratti <rd...@gmail.com>
Authored: Mon Jan 8 16:47:00 2018 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Apr 17 19:53:33 2018 -0700
----------------------------------------------------------------------
.../hive/serde2/avro/AvroDeserializer.java | 77 +++-----------------
.../hadoop/hive/serde2/avro/AvroSerDe.java | 25 ++-----
.../hadoop/hive/serde2/avro/AvroSerdeUtils.java | 23 ++++--
.../hive/serde2/avro/TestAvroDeserializer.java | 67 +++++++++++++++++
.../avro/TestAvroObjectInspectorGenerator.java | 11 +++
5 files changed, 114 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
index b7b3d12..34da50d 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
@@ -198,11 +198,18 @@ class AvroDeserializer {
private Object worker(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType)
throws AvroSerdeException {
- // Klaxon! Klaxon! Klaxon!
- // Avro requires NULLable types to be defined as unions of some type T
- // and NULL. This is annoying and we're going to hide it from the user.
+ if (datum == null) {
+ return null;
+ }
+
+ // Avro requires nullable types to be defined as unions of some type T
+ // and NULL. This is annoying and we're going to hide it from the user.
+
if (AvroSerdeUtils.isNullableType(recordSchema)) {
- return deserializeNullableUnion(datum, fileSchema, recordSchema, columnType);
+ recordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema);
+ }
+ if (fileSchema != null && AvroSerdeUtils.isNullableType(fileSchema)) {
+ fileSchema = AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema);
}
switch(columnType.getCategory()) {
@@ -300,68 +307,6 @@ class AvroDeserializer {
}
}
- /**
- * Extract either a null or the correct type from a Nullable type.
- */
- private Object deserializeNullableUnion(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType)
- throws AvroSerdeException {
- if (recordSchema.getTypes().size() == 2) {
- // A type like [NULL, T]
- return deserializeSingleItemNullableUnion(datum, fileSchema, recordSchema, columnType);
- } else {
- // Types like [NULL, T1, T2, ...]
- if (datum == null) {
- return null;
- } else {
- Schema newRecordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema);
- return worker(datum, fileSchema, newRecordSchema, columnType);
- }
- }
- }
-
- private Object deserializeSingleItemNullableUnion(Object datum,
- Schema fileSchema,
- Schema recordSchema,
- TypeInfo columnType)
- throws AvroSerdeException {
- int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value
- Schema schema = recordSchema.getTypes().get(tag);
- if (schema.getType().equals(Type.NULL)) {
- return null;
- }
-
- Schema currentFileSchema = null;
- if (fileSchema != null) {
- if (fileSchema.getType() == Type.UNION) {
- // The fileSchema may have the null value in a different position, so
- // we need to get the correct tag
- try {
- tag = GenericData.get().resolveUnion(fileSchema, datum);
- currentFileSchema = fileSchema.getTypes().get(tag);
- } catch (UnresolvedUnionException e) {
- if (LOG.isDebugEnabled()) {
- String datumClazz = null;
- if (datum != null) {
- datumClazz = datum.getClass().getName();
- }
- String msg = "File schema union could not resolve union. fileSchema = " + fileSchema +
- ", recordSchema = " + recordSchema + ", datum class = " + datumClazz + ": " + e;
- LOG.debug(msg, e);
- }
- // This occurs when the datum type is different between
- // the file and record schema. For example if datum is long
- // and the field in the file schema is int. See HIVE-9462.
- // in this case we will re-use the record schema as the file
- // schema, Ultimately we need to clean this code up and will
- // do as a follow-on to HIVE-9462.
- currentFileSchema = schema;
- }
- } else {
- currentFileSchema = fileSchema;
- }
- }
- return worker(datum, currentFileSchema, schema, columnType);
- }
private Object deserializeStruct(GenericData.Record datum, Schema fileSchema, StructTypeInfo columnType)
throws AvroSerdeException {
http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
index 1746a0f..3955611 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
@@ -136,6 +136,11 @@ public class AvroSerDe extends AbstractSerDe {
this.columnNames = StringInternUtils.internStringsInList(aoig.getColumnNames());
this.columnTypes = aoig.getColumnTypes();
this.oi = aoig.getObjectInspector();
+
+ if(!badSchema) {
+ this.avroSerializer = new AvroSerializer();
+ this.avroDeserializer = new AvroDeserializer();
+ }
}
private boolean hasExternalSchema(Properties properties) {
@@ -214,7 +219,7 @@ public class AvroSerDe extends AbstractSerDe {
if(badSchema) {
throw new BadSchemaException();
}
- return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema);
+ return avroSerializer.serialize(o, objectInspector, columnNames, columnTypes, schema);
}
@Override
@@ -222,7 +227,7 @@ public class AvroSerDe extends AbstractSerDe {
if(badSchema) {
throw new BadSchemaException();
}
- return getDeserializer().deserialize(columnNames, columnTypes, writable, schema);
+ return avroDeserializer.deserialize(columnNames, columnTypes, writable, schema);
}
@Override
@@ -236,22 +241,6 @@ public class AvroSerDe extends AbstractSerDe {
return null;
}
- private AvroDeserializer getDeserializer() {
- if(avroDeserializer == null) {
- avroDeserializer = new AvroDeserializer();
- }
-
- return avroDeserializer;
- }
-
- private AvroSerializer getSerializer() {
- if(avroSerializer == null) {
- avroSerializer = new AvroSerializer();
- }
-
- return avroSerializer;
- }
-
@Override
public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) {
return !hasExternalSchema(tableParams);
http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
index 391a300..d16abdb 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
@@ -205,12 +205,25 @@ public class AvroSerdeUtils {
}
/**
- * In a nullable type, get the schema for the non-nullable type. This method
- * does no checking that the provides Schema is nullable.
+ * If the union schema is a nullable union, get the schema for the non-nullable type.
+ * This method does no checking that the provided Schema is nullable. If the provided
+ * union schema is non-nullable, it simply returns the union schema
*/
- public static Schema getOtherTypeFromNullableType(Schema schema) {
- List<Schema> itemSchemas = new ArrayList<>();
- for (Schema itemSchema : schema.getTypes()) {
+ public static Schema getOtherTypeFromNullableType(Schema unionSchema) {
+ final List<Schema> types = unionSchema.getTypes();
+ if (types.size() == 2) { // most common scenario
+ if (types.get(0).getType() == Schema.Type.NULL) {
+ return types.get(1);
+ }
+ if (types.get(1).getType() == Schema.Type.NULL) {
+ return types.get(0);
+ }
+ // not a nullable union
+ return unionSchema;
+ }
+
+ final List<Schema> itemSchemas = new ArrayList<>();
+ for (Schema itemSchema : types) {
if (!Schema.Type.NULL.equals(itemSchema.getType())) {
itemSchemas.add(itemSchema);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
index 3dc3331..ef97d2d 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.junit.Assert;
import org.junit.Test;
public class TestAvroDeserializer {
@@ -256,6 +257,20 @@ public class TestAvroDeserializer {
}
@Test
+ public void canDeserializeSingleItemUnions() throws SerDeException, IOException {
+ Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.SINGLE_ITEM_UNION_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+
+ record.put("aUnion", "this is a string");
+
+ ResultPair result = unionTester(s, record);
+ assertTrue(result.value instanceof String);
+ assertEquals("this is a string", result.value);
+ UnionObjectInspector uoi = (UnionObjectInspector)result.oi;
+ assertEquals(0, uoi.getTag(result.unionObject));
+ }
+
+ @Test
public void canDeserializeUnions() throws SerDeException, IOException {
Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.UNION_SCHEMA);
GenericData.Record record = new GenericData.Record(s);
@@ -361,6 +376,58 @@ public class TestAvroDeserializer {
return new ResultPair(fieldObjectInspector, value, theUnion);
}
+ @Test
+ public void primitiveSchemaEvolution() throws Exception {
+ Schema fileSchema = AvroSerdeUtils.getSchemaFor(
+ "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"r1\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"int_field\",\n"
+ + " \"type\": \"int\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}"
+ );
+ Schema readerSchema = AvroSerdeUtils.getSchemaFor(
+ "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"r1\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"int_field\",\n"
+ + " \"type\": \"int\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"dec_field\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\": \"bytes\",\n"
+ + " \"logicalType\": \"decimal\",\n"
+ + " \"precision\": 5,\n"
+ + " \"scale\": 4\n"
+ + " }\n"
+ + " ],\n"
+ + " \"default\": null\n"
+ + " }\n"
+ + " ]\n"
+ + "}"
+ );
+ GenericData.Record record = new GenericData.Record(fileSchema);
+
+ record.put("int_field", 1);
+ assertTrue(GENERIC_DATA.validate(fileSchema, record));
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(readerSchema);
+
+ AvroDeserializer de = new AvroDeserializer();
+ List<Object> row = (List<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, readerSchema);
+ Assert.assertEquals(1, row.get(0));
+ Assert.assertNull(row.get(1));
+ }
+
@Test // Enums are one of two types we fudge for Hive. Enums go in, Strings come out.
public void canDeserializeEnums() throws SerDeException, IOException {
Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.ENUM_SCHEMA);
http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
index 3736a1f..ee83ba3 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
@@ -101,6 +101,17 @@ public class TestAvroObjectInspectorGenerator {
" ]\n" +
"}";
public static final String NULLABLE_RECORD_SCHEMA = "[\"null\", " + RECORD_SCHEMA + "]";
+ public static final String SINGLE_ITEM_UNION_SCHEMA = "{\n" +
+ " \"namespace\": \"test.a.rossa\",\n" +
+ " \"name\": \"oneUnion\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"aUnion\",\n" +
+ " \"type\":[\"string\"]\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
public static final String UNION_SCHEMA = "{\n" +
" \"namespace\": \"test.a.rossa\",\n" +
" \"name\": \"oneUnion\",\n" +