You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/18 02:54:28 UTC

hive git commit: HIVE-18410 : [Performance][Avro] Reading flat Avro tables is very expensive in Hive (Ratandeep Ratti via Anthony Hsu, Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 5b4c29d43 -> cacb1c095


HIVE-18410 : [Performance][Avro] Reading flat Avro tables is very expensive in Hive (Ratandeep Ratti via Anthony Hsu, Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cacb1c09
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cacb1c09
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cacb1c09

Branch: refs/heads/master
Commit: cacb1c09574c89ac07fcffc0b8c3fad18e283aec
Parents: 5b4c29d
Author: Ratandeep Ratti <rd...@gmail.com>
Authored: Mon Jan 8 16:47:00 2018 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Apr 17 19:53:33 2018 -0700

----------------------------------------------------------------------
 .../hive/serde2/avro/AvroDeserializer.java      | 77 +++-----------------
 .../hadoop/hive/serde2/avro/AvroSerDe.java      | 25 ++-----
 .../hadoop/hive/serde2/avro/AvroSerdeUtils.java | 23 ++++--
 .../hive/serde2/avro/TestAvroDeserializer.java  | 67 +++++++++++++++++
 .../avro/TestAvroObjectInspectorGenerator.java  | 11 +++
 5 files changed, 114 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
index b7b3d12..34da50d 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
@@ -198,11 +198,18 @@ class AvroDeserializer {
 
   private Object worker(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType)
           throws AvroSerdeException {
-    // Klaxon! Klaxon! Klaxon!
-    // Avro requires NULLable types to be defined as unions of some type T
-    // and NULL.  This is annoying and we're going to hide it from the user.
+    if (datum == null) {
+      return null;
+    }
+
+    // Avro requires nullable types to be defined as unions of some type T
+    // and NULL. This is annoying and we're going to hide it from the user.
+
     if (AvroSerdeUtils.isNullableType(recordSchema)) {
-      return deserializeNullableUnion(datum, fileSchema, recordSchema, columnType);
+      recordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema);
+    }
+    if (fileSchema != null && AvroSerdeUtils.isNullableType(fileSchema)) {
+      fileSchema = AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema);
     }
 
     switch(columnType.getCategory()) {
@@ -300,68 +307,6 @@ class AvroDeserializer {
     }
   }
 
-  /**
-   * Extract either a null or the correct type from a Nullable type.
-   */
-  private Object deserializeNullableUnion(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType)
-                                            throws AvroSerdeException {
-    if (recordSchema.getTypes().size() == 2) {
-      // A type like [NULL, T]
-      return deserializeSingleItemNullableUnion(datum, fileSchema, recordSchema, columnType);
-    } else {
-      // Types like [NULL, T1, T2, ...]
-      if (datum == null) {
-        return null;
-      } else {
-        Schema newRecordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema);
-        return worker(datum, fileSchema, newRecordSchema, columnType);
-      }
-    }
-  }
-
-  private Object deserializeSingleItemNullableUnion(Object datum,
-                                                    Schema fileSchema,
-                                                    Schema recordSchema,
-                                                    TypeInfo columnType)
-      throws AvroSerdeException {
-    int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value
-    Schema schema = recordSchema.getTypes().get(tag);
-    if (schema.getType().equals(Type.NULL)) {
-      return null;
-    }
-
-    Schema currentFileSchema = null;
-    if (fileSchema != null) {
-      if (fileSchema.getType() == Type.UNION) {
-        // The fileSchema may have the null value in a different position, so
-        // we need to get the correct tag
-        try {
-          tag = GenericData.get().resolveUnion(fileSchema, datum);
-          currentFileSchema = fileSchema.getTypes().get(tag);
-        } catch (UnresolvedUnionException e) {
-          if (LOG.isDebugEnabled()) {
-            String datumClazz = null;
-            if (datum != null) {
-              datumClazz = datum.getClass().getName();
-            }
-            String msg = "File schema union could not resolve union. fileSchema = " + fileSchema +
-              ", recordSchema = " + recordSchema + ", datum class = " + datumClazz + ": " + e;
-            LOG.debug(msg, e);
-          }
-          // This occurs when the datum type is different between
-          // the file and record schema. For example if datum is long
-          // and the field in the file schema is int. See HIVE-9462.
-          // in this case we will re-use the record schema as the file
-          // schema, Ultimately we need to clean this code up and will
-          // do as a follow-on to HIVE-9462.
-          currentFileSchema = schema;
-        }
-      } else {
-        currentFileSchema = fileSchema;
-      }
-    }
-    return worker(datum, currentFileSchema, schema, columnType);
-  }
 
   private Object deserializeStruct(GenericData.Record datum, Schema fileSchema, StructTypeInfo columnType)
           throws AvroSerdeException {

http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
index 1746a0f..3955611 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
@@ -136,6 +136,11 @@ public class AvroSerDe extends AbstractSerDe {
     this.columnNames = StringInternUtils.internStringsInList(aoig.getColumnNames());
     this.columnTypes = aoig.getColumnTypes();
     this.oi = aoig.getObjectInspector();
+
+    if(!badSchema) {
+      this.avroSerializer = new AvroSerializer();
+      this.avroDeserializer = new AvroDeserializer();
+    }
   }
 
   private boolean hasExternalSchema(Properties properties) {
@@ -214,7 +219,7 @@ public class AvroSerDe extends AbstractSerDe {
     if(badSchema) {
       throw new BadSchemaException();
     }
-    return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema);
+    return avroSerializer.serialize(o, objectInspector, columnNames, columnTypes, schema);
   }
 
   @Override
@@ -222,7 +227,7 @@ public class AvroSerDe extends AbstractSerDe {
     if(badSchema) {
       throw new BadSchemaException();
     }
-    return getDeserializer().deserialize(columnNames, columnTypes, writable, schema);
+    return avroDeserializer.deserialize(columnNames, columnTypes, writable, schema);
   }
 
   @Override
@@ -236,22 +241,6 @@ public class AvroSerDe extends AbstractSerDe {
     return null;
   }
 
-  private AvroDeserializer getDeserializer() {
-    if(avroDeserializer == null) {
-      avroDeserializer = new AvroDeserializer();
-    }
-
-    return avroDeserializer;
-  }
-
-  private AvroSerializer getSerializer() {
-    if(avroSerializer == null) {
-      avroSerializer = new AvroSerializer();
-    }
-
-    return avroSerializer;
-  }
-
   @Override
   public boolean shouldStoreFieldsInMetastore(Map<String, String> tableParams) {
     return !hasExternalSchema(tableParams);

http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
index 391a300..d16abdb 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
@@ -205,12 +205,25 @@ public class AvroSerdeUtils {
   }
 
   /**
-   * In a nullable type, get the schema for the non-nullable type.  This method
-   * does no checking that the provides Schema is nullable.
+   * If the union schema is a nullable union, get the schema for the non-nullable type.
+   * This method does no checking that the provided Schema is nullable. If the provided
+   * union schema is non-nullable, it simply returns the union schema
    */
-  public static Schema getOtherTypeFromNullableType(Schema schema) {
-    List<Schema> itemSchemas = new ArrayList<>();
-    for (Schema itemSchema : schema.getTypes()) {
+  public static Schema getOtherTypeFromNullableType(Schema unionSchema) {
+    final List<Schema> types = unionSchema.getTypes();
+    if (types.size() == 2) { // most common scenario
+      if (types.get(0).getType() == Schema.Type.NULL) {
+        return types.get(1);
+      }
+      if (types.get(1).getType() == Schema.Type.NULL) {
+        return types.get(0);
+      }
+      // not a nullable union
+      return unionSchema;
+    }
+
+    final List<Schema> itemSchemas = new ArrayList<>();
+    for (Schema itemSchema : types) {
       if (!Schema.Type.NULL.equals(itemSchema.getType())) {
         itemSchemas.add(itemSchema);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
index 3dc3331..ef97d2d 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestAvroDeserializer {
@@ -256,6 +257,20 @@ public class TestAvroDeserializer {
   }
 
   @Test
+  public void canDeserializeSingleItemUnions() throws SerDeException, IOException {
+    Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.SINGLE_ITEM_UNION_SCHEMA);
+    GenericData.Record record = new GenericData.Record(s);
+
+    record.put("aUnion", "this is a string");
+
+    ResultPair result = unionTester(s, record);
+    assertTrue(result.value instanceof String);
+    assertEquals("this is a string", result.value);
+    UnionObjectInspector uoi = (UnionObjectInspector)result.oi;
+    assertEquals(0, uoi.getTag(result.unionObject));
+  }
+
+  @Test
   public void canDeserializeUnions() throws SerDeException, IOException {
     Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.UNION_SCHEMA);
     GenericData.Record record = new GenericData.Record(s);
@@ -361,6 +376,58 @@ public class TestAvroDeserializer {
     return new ResultPair(fieldObjectInspector, value, theUnion);
   }
 
+  @Test
+  public void primitiveSchemaEvolution() throws Exception {
+    Schema fileSchema = AvroSerdeUtils.getSchemaFor(
+     "{\n"
+         + "  \"type\": \"record\",\n"
+         + "  \"name\": \"r1\",\n"
+         + "  \"fields\": [\n"
+         + "    {\n"
+         + "      \"name\": \"int_field\",\n"
+         + "      \"type\": \"int\"\n"
+         + "    }\n"
+         + "  ]\n"
+         + "}"
+    );
+    Schema readerSchema = AvroSerdeUtils.getSchemaFor(
+       "{\n"
+           + "  \"type\": \"record\",\n"
+           + "  \"name\": \"r1\",\n"
+           + "  \"fields\": [\n"
+           + "    {\n"
+           + "      \"name\": \"int_field\",\n"
+           + "      \"type\": \"int\"\n"
+           + "    },\n"
+           + "    {\n"
+           + "      \"name\": \"dec_field\",\n"
+           + "      \"type\": [\n"
+           + "        \"null\",\n"
+           + "        {\n"
+           + "          \"type\": \"bytes\",\n"
+           + "          \"logicalType\": \"decimal\",\n"
+           + "          \"precision\": 5,\n"
+           + "          \"scale\": 4\n"
+           + "        }\n"
+           + "      ],\n"
+           + "      \"default\": null\n"
+           + "    }\n"
+           + "  ]\n"
+           + "}"
+    );
+    GenericData.Record record = new GenericData.Record(fileSchema);
+
+    record.put("int_field", 1);
+    assertTrue(GENERIC_DATA.validate(fileSchema, record));
+    AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+    AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(readerSchema);
+
+    AvroDeserializer de = new AvroDeserializer();
+    List<Object> row = (List<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, readerSchema);
+    Assert.assertEquals(1, row.get(0));
+    Assert.assertNull(row.get(1));
+  }
+
   @Test // Enums are one of two types we fudge for Hive. Enums go in, Strings come out.
   public void canDeserializeEnums() throws SerDeException, IOException {
     Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.ENUM_SCHEMA);

http://git-wip-us.apache.org/repos/asf/hive/blob/cacb1c09/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
index 3736a1f..ee83ba3 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
@@ -101,6 +101,17 @@ public class TestAvroObjectInspectorGenerator {
       "  ]\n" +
       "}";
   public static final String NULLABLE_RECORD_SCHEMA = "[\"null\", " + RECORD_SCHEMA + "]";
+  public static final String SINGLE_ITEM_UNION_SCHEMA = "{\n" +
+      "  \"namespace\": \"test.a.rossa\",\n" +
+      "  \"name\": \"oneUnion\",\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"fields\": [\n" +
+      "    {\n" +
+      "      \"name\":\"aUnion\",\n" +
+      "      \"type\":[\"string\"]\n" +
+      "    }\n" +
+      "  ]\n" +
+      "}";
   public static final String UNION_SCHEMA = "{\n" +
       "  \"namespace\": \"test.a.rossa\",\n" +
       "  \"name\": \"oneUnion\",\n" +