You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/07/11 16:42:27 UTC
[drill] branch master updated: DRILL-5797: Use Parquet new reader
on all non-complex columns queries
This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new a321bf0 DRILL-5797: Use Parquet new reader on all non-complex columns queries
a321bf0 is described below
commit a321bf0b9186ed25128346f0490b89ba0e2d0ddf
Author: Oleksandr Kalinin <al...@gmail.com>
AuthorDate: Tue Jul 10 09:41:48 2018 +0200
DRILL-5797: Use Parquet new reader on all non-complex columns queries
---
.../parquet/AbstractParquetScanBatchCreator.java | 29 +---
.../exec/store/parquet/ParquetReaderUtility.java | 143 +++++++++++++++++-
.../columnreaders/ParquetColumnMetadata.java | 5 +-
.../store/parquet/columnreaders/ParquetSchema.java | 11 +-
.../store/parquet/TestComplexColumnInSchema.java | 168 +++++++++++++++++++++
.../store/parquet/TestParquetReaderUtility.java | 71 +++++++++
.../parquet/complex/complex_special_cases.parquet | Bin 0 -> 1911 bytes
7 files changed, 391 insertions(+), 36 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index dc09ce1..47f2e18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -25,22 +25,21 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.util.ArrayList;
@@ -107,7 +106,10 @@ public abstract class AbstractParquetScanBatchCreator {
ParquetReaderUtility.detectCorruptDates(footer, rowGroupScan.getColumns(), autoCorrectCorruptDates);
logger.debug("Contains corrupt dates: {}", containsCorruptDates);
- if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) && !isComplex(footer)) {
+ if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER)
+ && !ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns())) {
+ logger.debug("Query {} qualifies for new Parquet reader",
+ QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()));
readers.add(new ParquetRecordReader(context,
rowGroup.getPath(),
rowGroup.getRowGroupIndex(),
@@ -118,6 +120,8 @@ public abstract class AbstractParquetScanBatchCreator {
rowGroupScan.getColumns(),
containsCorruptDates));
} else {
+ logger.debug("Query {} doesn't qualify for new reader, using old one",
+ QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()));
readers.add(new DrillParquetReader(context,
footer,
rowGroup,
@@ -161,22 +165,6 @@ public abstract class AbstractParquetScanBatchCreator {
}
}
- private boolean isComplex(ParquetMetadata footer) {
- MessageType schema = footer.getFileMetaData().getSchema();
-
- for (Type type : schema.getFields()) {
- if (!type.isPrimitive()) {
- return true;
- }
- }
- for (ColumnDescriptor col : schema.getColumns()) {
- if (col.getMaxRepetitionLevel() > 0) {
- return true;
- }
- }
- return false;
- }
-
/**
* Helper class responsible for creating and managing DrillFileSystem.
*/
@@ -190,5 +178,4 @@ public abstract class AbstractParquetScanBatchCreator {
protected abstract DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index a7f78fb..6960b35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.store.parquet;
import com.google.common.collect.Sets;
import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
@@ -41,8 +40,10 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
import org.joda.time.Chronology;
import org.joda.time.DateTimeConstants;
import org.apache.parquet.example.data.simple.NanoTime;
@@ -51,6 +52,7 @@ import org.joda.time.DateTimeZone;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -140,15 +142,90 @@ public class ParquetReaderUtility {
return out;
}
+ /**
+ * Map full schema paths in format `a`.`b`.`c` to respective SchemaElement objects.
+ *
+ * @param footer Parquet file metadata
+ * @return schema full path to SchemaElement map
+ */
public static Map<String, SchemaElement> getColNameToSchemaElementMapping(ParquetMetadata footer) {
- HashMap<String, SchemaElement> schemaElements = new HashMap<>();
+ Map<String, SchemaElement> schemaElements = new HashMap<>();
FileMetaData fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
- for (SchemaElement se : fileMetaData.getSchema()) {
- schemaElements.put(se.getName(), se);
+
+ Iterator<SchemaElement> iter = fileMetaData.getSchema().iterator();
+
+ // First element in collection is default `root` element. We skip it to maintain key in `a` format instead of `root`.`a`,
+ // and thus to avoid the need to cut it out again when comparing with SchemaPath string representation
+ if (iter.hasNext()) {
+ iter.next();
+ }
+ while (iter.hasNext()) {
+ addSchemaElementMapping(iter, new StringBuilder(), schemaElements);
}
return schemaElements;
}
+ /**
+ * Populate full path to SchemaElement map by recursively traversing schema elements referenced by the given iterator
+ *
+ * @param iter file schema values iterator
+ * @param path parent schema element path
+ * @param schemaElements schema elements map to insert next iterator element into
+ */
+ private static void addSchemaElementMapping(Iterator<SchemaElement> iter, StringBuilder path,
+ Map<String, SchemaElement> schemaElements) {
+ SchemaElement schemaElement = iter.next();
+ path.append('`').append(schemaElement.getName().toLowerCase()).append('`');
+ schemaElements.put(path.toString(), schemaElement);
+
+ // for each element that has children we need to maintain remaining children count
+ // to exit current recursion level when no more children is left
+ int remainingChildren = schemaElement.getNum_children();
+
+ while (remainingChildren > 0 && iter.hasNext()) {
+ addSchemaElementMapping(iter, new StringBuilder(path).append('.'), schemaElements);
+ remainingChildren--;
+ }
+ return;
+ }
+
+ /**
+ * generate full path of the column in format `a`.`b`.`c`
+ *
+ * @param column ColumnDescriptor object
+ * @return full path in format `a`.`b`.`c`
+ */
+ public static String getFullColumnPath(ColumnDescriptor column) {
+ StringBuilder sb = new StringBuilder();
+ String[] path = column.getPath();
+ for (int i = 0; i < path.length; i++) {
+ sb.append("`").append(path[i].toLowerCase()).append("`").append(".");
+ }
+
+ // remove trailing dot
+ if (sb.length() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Map full column paths to all ColumnDescriptors in file schema
+ *
+ * @param footer Parquet file metadata
+ * @return column full path to ColumnDescriptor object map
+ */
+ public static Map<String, ColumnDescriptor> getColNameToColumnDescriptorMapping(ParquetMetadata footer) {
+ Map<String, ColumnDescriptor> colDescMap = new HashMap<>();
+ List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
+
+ for (ColumnDescriptor column : columns) {
+ colDescMap.put(getFullColumnPath(column), column);
+ }
+ return colDescMap;
+ }
+
public static int autoCorrectCorruptedDate(int corruptedDate) {
return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT);
}
@@ -361,7 +438,6 @@ public class ParquetReaderUtility {
}
}
-
/**
* Detect corrupt date values by looking at the min/max values in the metadata.
*
@@ -401,9 +477,9 @@ public class ParquetReaderUtility {
// creating a NameSegment makes sure we are using the standard code for comparing names,
// currently it is all case-insensitive
if (Utilities.isStarQuery(columns)
- || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
+ || getFullColumnPath(column).equalsIgnoreCase(schemaPath.getUnIndexed().toString())) {
int colIndex = -1;
- ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();
+ ConvertedType convertedType = schemaElements.get(getFullColumnPath(column)).getConverted_type();
if (convertedType != null && convertedType.equals(ConvertedType.DATE)) {
List<ColumnChunkMetaData> colChunkList = footer.getBlocks().get(rowGroupIndex).getColumns();
for (int j = 0; j < colChunkList.size(); j++) {
@@ -525,4 +601,57 @@ public class ParquetReaderUtility {
}
}
+ /**
+ * Check whether any of columns in the given list is either nested or repetitive.
+ *
+ * @param footer Parquet file schema
+ * @param columns list of query SchemaPath objects
+ */
+ public static boolean containsComplexColumn(ParquetMetadata footer, List<SchemaPath> columns) {
+
+ MessageType schema = footer.getFileMetaData().getSchema();
+
+ if (Utilities.isStarQuery(columns)) {
+ for (Type type : schema.getFields()) {
+ if (!type.isPrimitive()) {
+ return true;
+ }
+ }
+ for (ColumnDescriptor col : schema.getColumns()) {
+ if (col.getMaxRepetitionLevel() > 0) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ Map<String, ColumnDescriptor> colDescMap = ParquetReaderUtility.getColNameToColumnDescriptorMapping(footer);
+ Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
+
+ for (SchemaPath schemaPath : columns) {
+ // Schema path which is non-leaf is complex column
+ if (!schemaPath.isLeaf()) {
+ logger.trace("rowGroupScan contains complex column: {}", schemaPath.getUnIndexed().toString());
+ return true;
+ }
+
+ // following column descriptor lookup failure may mean two cases, depending on subsequent SchemaElement lookup:
+ // 1. success: queried column is complex, i.e. GroupType
+ // 2. failure: queried column is not in schema and thus is non-complex
+ ColumnDescriptor column = colDescMap.get(schemaPath.getUnIndexed().toString().toLowerCase());
+
+ if (column == null) {
+ SchemaElement schemaElement = schemaElements.get(schemaPath.getUnIndexed().toString().toLowerCase());
+ if (schemaElement != null) {
+ return true;
+ }
+ } else {
+ if (column.getMaxRepetitionLevel() > 0) {
+ logger.trace("rowGroupScan contains repetitive column: {}", schemaPath.getUnIndexed().toString());
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
index 147938d..9c2444c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.apache.parquet.column.ColumnDescriptor;
@@ -56,8 +57,8 @@ public class ParquetColumnMetadata {
}
public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) {
- se = schemaElements.get(column.getPath()[0]);
- type = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
+ se = schemaElements.get(ParquetReaderUtility.getFullColumnPath(column));
+ type = ParquetToDrillTypeConverter.toMajorType(column.getType(), column.getTypeLength(),
getDataMode(column), se, options);
field = MaterializedField.create(toFieldName(column.getPath()).getLastSegment().getNameSegment().getPath(), type);
length = getDataTypeLength();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 6717445..c5f70f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -130,7 +130,7 @@ public final class ParquetSchema {
for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) {
ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column);
columnMetadata.resolveDrillType(schemaElements, options);
- if (! fieldSelected(columnMetadata.field)) {
+ if (! columnSelected(column)) {
continue;
}
selectedColumnMetadata.add(columnMetadata);
@@ -174,23 +174,22 @@ public final class ParquetSchema {
}
/**
- * Determine if a Parquet field is selected for the query. It is selected
+ * Determine if a Parquet column is selected for the query. It is selected
* either if this is a star query (we want all columns), or the column
* appears in the select list.
*
- * @param field the Parquet column expressed as as Drill field.
+ * @param column the Parquet column expressed as column descriptor
* @return true if the column is to be included in the scan, false
* if not
*/
-
- private boolean fieldSelected(MaterializedField field) {
+ private boolean columnSelected(ColumnDescriptor column) {
if (isStarQuery()) {
return true;
}
int i = 0;
for (SchemaPath expr : selectedCols) {
- if (field.getName().equalsIgnoreCase(expr.getRootSegmentPath())) {
+ if (ParquetReaderUtility.getFullColumnPath(column).equalsIgnoreCase(expr.getUnIndexed().toString())) {
columnsFound[i] = true;
return true;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestComplexColumnInSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestComplexColumnInSchema.java
new file mode 100644
index 0000000..d0977b8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestComplexColumnInSchema.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import java.io.IOException;
+
+/**
+ * This test checks correctness of complex column detection in the Parquet file schema.
+ */
+public class TestComplexColumnInSchema {
+
+ /*
+ Parquet schema:
+ message root {
+ optional int64 id;
+ optional binary a (UTF8);
+ repeated int64 repeated;
+ optional binary VariableCase (UTF8);
+ optional group nested {
+ optional int64 id;
+ repeated int64 repeated;
+ optional binary VaRiAbLeCaSe (UTF8);
+ }
+ }
+
+ Data set:
+ complex_special_cases.parquet
+ {
+ "id": 1,
+ "a": "some string",
+ "repeated": [1, 2],
+ "VariableCase": "top level variable case column",
+ "nested": {
+ "id": 2,
+ "repeated": [3, 4],
+ "VaRiAbLeCaSe": "nested variable case column"
+ }
+ }
+ */
+ private static final String path = "src/test/resources/store/parquet/complex/complex_special_cases.parquet";
+ private static ParquetMetadata footer;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ Configuration conf = new Configuration();
+
+ footer = ParquetFileReader.readFooter(conf, new Path(path));
+ }
+
+ @Test
+ public void testGroupTypeColumn() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("nested"));
+ assertTrue("GroupType column must be detected as complex",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testNestedColumn() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("nested", "id"));
+ assertTrue("Nested column must be detected as complex",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testCombinedColumns() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("id"));
+ columns.add(SchemaPath.getCompoundPath("nested", "id"));
+ assertTrue("Nested column in the list list must be detected as complex",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testSimpleColumn() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("id"));
+ assertFalse("No complex column must be detected",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testSimpleColumns() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("id"));
+ columns.add(SchemaPath.getCompoundPath("a"));
+ assertFalse("No complex columns must be detected",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testNonexistentColumn() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("nonexistent"));
+ assertFalse("No complex column must be detected",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testVariableCaseColumn() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("variablecase"));
+ assertFalse("No complex column must be detected",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testVariableCaseSchemaPath() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("VaRiAbLeCaSe"));
+ assertFalse("No complex column must be detected",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testNestedVariableCaseColumn() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("nested", "variablecase"));
+ assertTrue("Nested variable case column must be detected as complex",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testRepeatedColumn() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("repeated"));
+ assertTrue("Repeated column must be detected as complex",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+
+ @Test
+ public void testNestedRepeatedColumn() {
+ List<SchemaPath> columns = new ArrayList<>();
+ columns.add(SchemaPath.getCompoundPath("nested", "repeated"));
+ assertTrue("Nested repeated column must be detected as complex",
+ ParquetReaderUtility.containsComplexColumn(footer, columns));
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderUtility.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderUtility.java
new file mode 100644
index 0000000..4b24212
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderUtility.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TestParquetReaderUtility {
+
+ private static final String path = "src/test/resources/store/parquet/complex/complex.parquet";
+ private static ParquetMetadata footer;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ Configuration conf = new Configuration();
+
+ footer = ParquetFileReader.readFooter(conf, new Path(path));
+ }
+
+ @Test
+ public void testSchemaElementsMap() {
+ Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
+ assertEquals("Schema elements map size must be 14", schemaElements.size(), 14);
+
+ SchemaElement schemaElement = schemaElements.get("`marketing_info`.`camp_id`");
+ assertNotNull("Schema element must be not null", schemaElement);
+ assertEquals("Schema element must be named 'camp_id'", schemaElement.getName(), "camp_id");
+
+ schemaElement = schemaElements.get("`marketing_info`");
+ assertNotNull("Schema element must be not null", schemaElement);
+ assertEquals("Schema element name match lookup key", schemaElement.getName(), "marketing_info");
+ }
+
+ @Test
+ public void testColumnDescriptorMap() {
+ Map<String, ColumnDescriptor> colDescMap = ParquetReaderUtility.getColNameToColumnDescriptorMapping(footer);
+ assertEquals("Column descriptors map size must be 11", colDescMap.size(), 11);
+
+ assertNotNull("column descriptor lookup must return not null", colDescMap.get("`marketing_info`.`camp_id`"));
+ assertNull("column descriptor lookup must return null on GroupType column", colDescMap.get("`marketing_info`"));
+ }
+}
diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/complex_special_cases.parquet b/exec/java-exec/src/test/resources/store/parquet/complex/complex_special_cases.parquet
new file mode 100644
index 0000000..bbfb6d4
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/parquet/complex/complex_special_cases.parquet differ