You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2017/12/18 23:25:56 UTC

[13/50] [abbrv] hive git commit: HIVE-18211: Support to read multiple level definition for Map type in Parquet file (Colin Ma, reviewed by Ferdinand Xu)

HIVE-18211: Support to read multiple level definition for Map type in Parquet file (Colin Ma, reviewed by Ferdinand Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7acc4ce1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7acc4ce1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7acc4ce1

Branch: refs/heads/standalone-metastore
Commit: 7acc4ce1bbae060d890494c1499938c1eda5f3b6
Parents: 646ccce
Author: Ferdinand Xu <ch...@intel.com>
Authored: Mon Dec 18 09:35:16 2017 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Mon Dec 18 09:35:16 2017 +0800

----------------------------------------------------------------------
 .../vector/VectorizedParquetRecordReader.java   | 27 +++++++++++++++++++-
 .../parquet/TestVectorizedMapColumnReader.java  | 26 ++++++++++++++++++-
 2 files changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7acc4ce1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 4303ca9..bffe008 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -61,6 +61,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopStreams;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
@@ -97,6 +98,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private VectorizedRowBatchCtx rbCtx;
   private Object[] partitionValues;
   private Path cacheFsPath;
+  private static final int MAP_DEFINITION_LEVEL_MAX = 3;
 
   /**
    * For each request column, the reader to read this column. This is NULL if this column
@@ -507,7 +509,30 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
         throw new RuntimeException(
             "Failed to find related Parquet column descriptor with type " + type);
       }
-      List<Type> kvTypes = type.asGroupType().getFields();
+
+      // to handle the different Map definition in Parquet, eg:
+      // definition has 1 group:
+      //   repeated group map (MAP_KEY_VALUE)
+      //     {required binary key (UTF8); optional binary value (UTF8);}
+      // definition has 2 groups:
+      //   optional group m1 (MAP) {
+      //     repeated group map (MAP_KEY_VALUE)
+      //       {required binary key (UTF8); optional binary value (UTF8);}
+      //   }
+      int nestGroup = 0;
+      GroupType groupType = type.asGroupType();
+      // if FieldCount == 2, get types for key & value,
+      // otherwise, continue to get the group type until MAP_DEFINITION_LEVEL_MAX.
+      while (groupType.getFieldCount() < 2) {
+        if (nestGroup > MAP_DEFINITION_LEVEL_MAX) {
+          throw new RuntimeException(
+              "More than " + MAP_DEFINITION_LEVEL_MAX + " level is found in Map definition, " +
+                  "Failed to get the field types for Map with type " + type);
+        }
+        groupType = groupType.getFields().get(0).asGroupType();
+        nestGroup++;
+      }
+      List<Type> kvTypes = groupType.getFields();
       VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader(
           descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion,
           kvTypes.get(0));

http://git-wip-us.apache.org/repos/asf/hive/blob/7acc4ce1/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
index c33e8ab..185dfbb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
@@ -56,6 +56,8 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
 
       int mapSize = i % mapMaxSize + 1;
       if (!isNull) {
+        // the map_field is to test multiple level map definition
+        Group multipleLevelGroup = group.addGroup("map_field");
         for (int j = 0; j < mapSize; j++) {
           int intValForMap = getIntValue(isDictionaryEncoding, mapElementIndex);
           long longValForMap = getLongValue(isDictionaryEncoding, mapElementIndex);
@@ -74,6 +76,8 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
               .append("value", binaryValForMap);
           group.addGroup("map_decimal").append("key", decimalValForMap)
               .append("value", decimalValForMap);
+          multipleLevelGroup.addGroup("map").append("key", binaryValForMap)
+              .append("value", binaryValForMap);
           mapElementIndex++;
         }
       }
@@ -160,6 +164,14 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
     removeFile();
   }
 
+  @Test
+  public void testMultipleDefinitionMapRead() throws Exception {
+    removeFile();
+    writeMapData(initWriterFromFile(), false, 1023);
+    testMapRead(false, "multipleLevel", 1023);
+    removeFile();
+  }
+
   private void testMapReadAllType(boolean isDictionaryEncoding, int elementNum) throws Exception {
     testMapRead(isDictionaryEncoding, "int", elementNum);
     testMapRead(isDictionaryEncoding, "long", elementNum);
@@ -267,6 +279,9 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
     } else if ("decimal".equals(type)) {
       conf.set(IOConstants.COLUMNS, "map_decimal");
       conf.set(IOConstants.COLUMNS_TYPES, "map<decimal(5,2),decimal(5,2)>");
+    } else if ("multipleLevel".equals(type)) {
+      conf.set(IOConstants.COLUMNS, "map_field");
+      conf.set(IOConstants.COLUMNS_TYPES, "map<string,string>");
     }
   }
 
@@ -291,6 +306,15 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
       case "decimal":
         return String.format(schemaFormat, "decimal", "binary", "(DECIMAL(5,2))",
             "binary", "(DECIMAL(5,2))");
+      case "multipleLevel":
+        return "message hive_schema {\n"
+            + "optional group map_field (MAP) {\n"
+            + "  repeated group map (MAP_KEY_VALUE) {\n"
+            + "    required binary key;\n"
+            + "    optional binary value;\n"
+            + "  }\n"
+            + "}\n"
+            + "}\n";
       default:
         throw new RuntimeException("Unsupported type for TestVectorizedMapColumnReader!");
     }
@@ -310,7 +334,7 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
     } else if ("float".equals(type)) {
       assertEquals(getFloatValue(isDictionaryEncoding, valueIndex),
           ((DoubleColumnVector)childVector).vector[position], 0);
-    } else if ("binary".equals(type)) {
+    } else if ("binary".equals(type) || "multipleLevel".equals(type)) {
       String actual = new String(ArrayUtils
           .subarray(((BytesColumnVector)childVector).vector[position],
               ((BytesColumnVector)childVector).start[position],