You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2017/12/18 23:25:56 UTC
[13/50] [abbrv] hive git commit: HIVE-18211: Support to read multiple
level definition for Map type in Parquet file (Colin Ma,
reviewed by Ferdinand Xu)
HIVE-18211: Support to read multiple level definition for Map type in Parquet file (Colin Ma, reviewed by Ferdinand Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7acc4ce1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7acc4ce1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7acc4ce1
Branch: refs/heads/standalone-metastore
Commit: 7acc4ce1bbae060d890494c1499938c1eda5f3b6
Parents: 646ccce
Author: Ferdinand Xu <ch...@intel.com>
Authored: Mon Dec 18 09:35:16 2017 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Mon Dec 18 09:35:16 2017 +0800
----------------------------------------------------------------------
.../vector/VectorizedParquetRecordReader.java | 27 +++++++++++++++++++-
.../parquet/TestVectorizedMapColumnReader.java | 26 ++++++++++++++++++-
2 files changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7acc4ce1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 4303ca9..bffe008 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -61,6 +61,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -97,6 +98,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private VectorizedRowBatchCtx rbCtx;
private Object[] partitionValues;
private Path cacheFsPath;
+ private static final int MAP_DEFINITION_LEVEL_MAX = 3;
/**
* For each request column, the reader to read this column. This is NULL if this column
@@ -507,7 +509,30 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
throw new RuntimeException(
"Failed to find related Parquet column descriptor with type " + type);
}
- List<Type> kvTypes = type.asGroupType().getFields();
+
+ // to handle the different Map definition in Parquet, eg:
+ // definition has 1 group:
+ // repeated group map (MAP_KEY_VALUE)
+ // {required binary key (UTF8); optional binary value (UTF8);}
+ // definition has 2 groups:
+ // optional group m1 (MAP) {
+ // repeated group map (MAP_KEY_VALUE)
+ // {required binary key (UTF8); optional binary value (UTF8);}
+ // }
+ int nestGroup = 0;
+ GroupType groupType = type.asGroupType();
+ // if FieldCount == 2, get types for key & value,
+ // otherwise, continue to get the group type until MAP_DEFINITION_LEVEL_MAX.
+ while (groupType.getFieldCount() < 2) {
+ if (nestGroup > MAP_DEFINITION_LEVEL_MAX) {
+ throw new RuntimeException(
+ "More than " + MAP_DEFINITION_LEVEL_MAX + " level is found in Map definition, " +
+ "Failed to get the field types for Map with type " + type);
+ }
+ groupType = groupType.getFields().get(0).asGroupType();
+ nestGroup++;
+ }
+ List<Type> kvTypes = groupType.getFields();
VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader(
descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion,
kvTypes.get(0));
http://git-wip-us.apache.org/repos/asf/hive/blob/7acc4ce1/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
index c33e8ab..185dfbb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedMapColumnReader.java
@@ -56,6 +56,8 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
int mapSize = i % mapMaxSize + 1;
if (!isNull) {
+ // the map_field is to test multiple level map definition
+ Group multipleLevelGroup = group.addGroup("map_field");
for (int j = 0; j < mapSize; j++) {
int intValForMap = getIntValue(isDictionaryEncoding, mapElementIndex);
long longValForMap = getLongValue(isDictionaryEncoding, mapElementIndex);
@@ -74,6 +76,8 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
.append("value", binaryValForMap);
group.addGroup("map_decimal").append("key", decimalValForMap)
.append("value", decimalValForMap);
+ multipleLevelGroup.addGroup("map").append("key", binaryValForMap)
+ .append("value", binaryValForMap);
mapElementIndex++;
}
}
@@ -160,6 +164,14 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
removeFile();
}
+ @Test
+ public void testMultipleDefinitionMapRead() throws Exception {
+ removeFile();
+ writeMapData(initWriterFromFile(), false, 1023);
+ testMapRead(false, "multipleLevel", 1023);
+ removeFile();
+ }
+
private void testMapReadAllType(boolean isDictionaryEncoding, int elementNum) throws Exception {
testMapRead(isDictionaryEncoding, "int", elementNum);
testMapRead(isDictionaryEncoding, "long", elementNum);
@@ -267,6 +279,9 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
} else if ("decimal".equals(type)) {
conf.set(IOConstants.COLUMNS, "map_decimal");
conf.set(IOConstants.COLUMNS_TYPES, "map<decimal(5,2),decimal(5,2)>");
+ } else if ("multipleLevel".equals(type)) {
+ conf.set(IOConstants.COLUMNS, "map_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "map<string,string>");
}
}
@@ -291,6 +306,15 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
case "decimal":
return String.format(schemaFormat, "decimal", "binary", "(DECIMAL(5,2))",
"binary", "(DECIMAL(5,2))");
+ case "multipleLevel":
+ return "message hive_schema {\n"
+ + "optional group map_field (MAP) {\n"
+ + " repeated group map (MAP_KEY_VALUE) {\n"
+ + " required binary key;\n"
+ + " optional binary value;\n"
+ + " }\n"
+ + "}\n"
+ + "}\n";
default:
throw new RuntimeException("Unsupported type for TestVectorizedMapColumnReader!");
}
@@ -310,7 +334,7 @@ public class TestVectorizedMapColumnReader extends VectorizedColumnReaderTestBas
} else if ("float".equals(type)) {
assertEquals(getFloatValue(isDictionaryEncoding, valueIndex),
((DoubleColumnVector)childVector).vector[position], 0);
- } else if ("binary".equals(type)) {
+ } else if ("binary".equals(type) || "multipleLevel".equals(type)) {
String actual = new String(ArrayUtils
.subarray(((BytesColumnVector)childVector).vector[position],
((BytesColumnVector)childVector).start[position],