You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/07/11 16:42:27 UTC

[drill] branch master updated: DRILL-5797: Use Parquet new reader on all non-complex columns queries

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new a321bf0  DRILL-5797: Use Parquet new reader on all non-complex columns queries
a321bf0 is described below

commit a321bf0b9186ed25128346f0490b89ba0e2d0ddf
Author: Oleksandr Kalinin <al...@gmail.com>
AuthorDate: Tue Jul 10 09:41:48 2018 +0200

    DRILL-5797: Use Parquet new reader on all non-complex columns queries
---
 .../parquet/AbstractParquetScanBatchCreator.java   |  29 +---
 .../exec/store/parquet/ParquetReaderUtility.java   | 143 +++++++++++++++++-
 .../columnreaders/ParquetColumnMetadata.java       |   5 +-
 .../store/parquet/columnreaders/ParquetSchema.java |  11 +-
 .../store/parquet/TestComplexColumnInSchema.java   | 168 +++++++++++++++++++++
 .../store/parquet/TestParquetReaderUtility.java    |  71 +++++++++
 .../parquet/complex/complex_special_cases.parquet  | Bin 0 -> 1911 bytes
 7 files changed, 391 insertions(+), 36 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index dc09ce1..47f2e18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -25,22 +25,21 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -107,7 +106,10 @@ public abstract class AbstractParquetScanBatchCreator {
           ParquetReaderUtility.detectCorruptDates(footer, rowGroupScan.getColumns(), autoCorrectCorruptDates);
         logger.debug("Contains corrupt dates: {}", containsCorruptDates);
 
-        if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) && !isComplex(footer)) {
+        if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER)
+            && !ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns())) {
+          logger.debug("Query {} qualifies for new Parquet reader",
+              QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()));
           readers.add(new ParquetRecordReader(context,
               rowGroup.getPath(),
               rowGroup.getRowGroupIndex(),
@@ -118,6 +120,8 @@ public abstract class AbstractParquetScanBatchCreator {
               rowGroupScan.getColumns(),
               containsCorruptDates));
         } else {
+          logger.debug("Query {} doesn't qualify for new reader, using old one",
+              QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()));
           readers.add(new DrillParquetReader(context,
               footer,
               rowGroup,
@@ -161,22 +165,6 @@ public abstract class AbstractParquetScanBatchCreator {
     }
   }
 
-  private boolean isComplex(ParquetMetadata footer) {
-    MessageType schema = footer.getFileMetaData().getSchema();
-
-    for (Type type : schema.getFields()) {
-      if (!type.isPrimitive()) {
-        return true;
-      }
-    }
-    for (ColumnDescriptor col : schema.getColumns()) {
-      if (col.getMaxRepetitionLevel() > 0) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   /**
    * Helper class responsible for creating and managing DrillFileSystem.
    */
@@ -190,5 +178,4 @@ public abstract class AbstractParquetScanBatchCreator {
 
     protected abstract DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index a7f78fb..6960b35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.store.parquet;
 import com.google.common.collect.Sets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
@@ -41,8 +40,10 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
 import org.joda.time.Chronology;
 import org.joda.time.DateTimeConstants;
 import org.apache.parquet.example.data.simple.NanoTime;
@@ -51,6 +52,7 @@ import org.joda.time.DateTimeZone;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -140,15 +142,90 @@ public class ParquetReaderUtility {
     return out;
   }
 
+  /**
+   * Map full schema paths in format `a`.`b`.`c` to respective SchemaElement objects.
+   *
+   * @param footer Parquet file metadata
+   * @return       schema full path to SchemaElement map
+   */
   public static Map<String, SchemaElement> getColNameToSchemaElementMapping(ParquetMetadata footer) {
-    HashMap<String, SchemaElement> schemaElements = new HashMap<>();
+    Map<String, SchemaElement> schemaElements = new HashMap<>();
     FileMetaData fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
-    for (SchemaElement se : fileMetaData.getSchema()) {
-      schemaElements.put(se.getName(), se);
+
+    Iterator<SchemaElement> iter = fileMetaData.getSchema().iterator();
+
+    // First element in collection is default `root` element. We skip it to maintain key in `a` format instead of `root`.`a`,
+    // and thus to avoid the need to cut it out again when comparing with SchemaPath string representation
+    if (iter.hasNext()) {
+      iter.next();
+    }
+    while (iter.hasNext()) {
+      addSchemaElementMapping(iter, new StringBuilder(), schemaElements);
     }
     return schemaElements;
   }
 
+  /**
+   * Populate full path to SchemaElement map by recursively traversing schema elements referenced by the given iterator
+   *
+   * @param iter file schema values iterator
+   * @param path parent schema element path
+   * @param schemaElements schema elements map to insert next iterator element into
+   */
+  private static void addSchemaElementMapping(Iterator<SchemaElement> iter, StringBuilder path,
+      Map<String, SchemaElement> schemaElements) {
+    SchemaElement schemaElement = iter.next();
+    path.append('`').append(schemaElement.getName().toLowerCase()).append('`');
+    schemaElements.put(path.toString(), schemaElement);
+
+    // for each element that has children we need to maintain remaining children count
+    // to exit current recursion level when no more children is left
+    int remainingChildren = schemaElement.getNum_children();
+
+    while (remainingChildren > 0 && iter.hasNext()) {
+      addSchemaElementMapping(iter, new StringBuilder(path).append('.'), schemaElements);
+      remainingChildren--;
+    }
+    return;
+  }
+
+  /**
+   * generate full path of the column in format `a`.`b`.`c`
+   *
+   * @param column ColumnDescriptor object
+   * @return       full path in format `a`.`b`.`c`
+   */
+  public static String getFullColumnPath(ColumnDescriptor column) {
+    StringBuilder sb = new StringBuilder();
+    String[] path = column.getPath();
+    for (int i = 0; i < path.length; i++) {
+      sb.append("`").append(path[i].toLowerCase()).append("`").append(".");
+    }
+
+    // remove trailing dot
+    if (sb.length() > 0) {
+      sb.deleteCharAt(sb.length() - 1);
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Map full column paths to all ColumnDescriptors in file schema
+   *
+   * @param footer Parquet file metadata
+   * @return       column full path to ColumnDescriptor object map
+   */
+  public static Map<String, ColumnDescriptor> getColNameToColumnDescriptorMapping(ParquetMetadata footer) {
+    Map<String, ColumnDescriptor> colDescMap = new HashMap<>();
+    List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
+
+    for (ColumnDescriptor column : columns) {
+      colDescMap.put(getFullColumnPath(column), column);
+    }
+    return colDescMap;
+  }
+
   public static int autoCorrectCorruptedDate(int corruptedDate) {
     return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT);
   }
@@ -361,7 +438,6 @@ public class ParquetReaderUtility {
     }
   }
 
-
   /**
    * Detect corrupt date values by looking at the min/max values in the metadata.
    *
@@ -401,9 +477,9 @@ public class ParquetReaderUtility {
         // creating a NameSegment makes sure we are using the standard code for comparing names,
         // currently it is all case-insensitive
         if (Utilities.isStarQuery(columns)
-            || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
+            || getFullColumnPath(column).equalsIgnoreCase(schemaPath.getUnIndexed().toString())) {
           int colIndex = -1;
-          ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();
+          ConvertedType convertedType = schemaElements.get(getFullColumnPath(column)).getConverted_type();
           if (convertedType != null && convertedType.equals(ConvertedType.DATE)) {
             List<ColumnChunkMetaData> colChunkList = footer.getBlocks().get(rowGroupIndex).getColumns();
             for (int j = 0; j < colChunkList.size(); j++) {
@@ -525,4 +601,57 @@ public class ParquetReaderUtility {
     }
   }
 
+  /**
+   * Check whether any of columns in the given list is either nested or repetitive.
+   *
+   * @param footer  Parquet file schema
+   * @param columns list of query SchemaPath objects
+   */
+  public static boolean containsComplexColumn(ParquetMetadata footer, List<SchemaPath> columns) {
+
+    MessageType schema = footer.getFileMetaData().getSchema();
+
+    if (Utilities.isStarQuery(columns)) {
+      for (Type type : schema.getFields()) {
+        if (!type.isPrimitive()) {
+          return true;
+        }
+      }
+      for (ColumnDescriptor col : schema.getColumns()) {
+        if (col.getMaxRepetitionLevel() > 0) {
+          return true;
+        }
+      }
+      return false;
+    } else {
+      Map<String, ColumnDescriptor> colDescMap = ParquetReaderUtility.getColNameToColumnDescriptorMapping(footer);
+      Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
+
+      for (SchemaPath schemaPath : columns) {
+        // Schema path which is non-leaf is complex column
+        if (!schemaPath.isLeaf()) {
+          logger.trace("rowGroupScan contains complex column: {}", schemaPath.getUnIndexed().toString());
+          return true;
+        }
+
+        // following column descriptor lookup failure may mean two cases, depending on subsequent SchemaElement lookup:
+        // 1. success: queried column is complex, i.e. GroupType
+        // 2. failure: queried column is not in schema and thus is non-complex
+        ColumnDescriptor column = colDescMap.get(schemaPath.getUnIndexed().toString().toLowerCase());
+
+        if (column == null) {
+          SchemaElement schemaElement = schemaElements.get(schemaPath.getUnIndexed().toString().toLowerCase());
+          if (schemaElement != null) {
+            return true;
+          }
+        } else {
+          if (column.getMaxRepetitionLevel() > 0) {
+            logger.trace("rowGroupScan contains repetitive column: {}", schemaPath.getUnIndexed().toString());
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
index 147938d..9c2444c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -56,8 +57,8 @@ public class ParquetColumnMetadata {
   }
 
   public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) {
-    se = schemaElements.get(column.getPath()[0]);
-    type = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
+    se = schemaElements.get(ParquetReaderUtility.getFullColumnPath(column));
+    type = ParquetToDrillTypeConverter.toMajorType(column.getType(), column.getTypeLength(),
         getDataMode(column), se, options);
     field = MaterializedField.create(toFieldName(column.getPath()).getLastSegment().getNameSegment().getPath(), type);
     length = getDataTypeLength();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 6717445..c5f70f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -130,7 +130,7 @@ public final class ParquetSchema {
     for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) {
       ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column);
       columnMetadata.resolveDrillType(schemaElements, options);
-      if (! fieldSelected(columnMetadata.field)) {
+      if (! columnSelected(column)) {
         continue;
       }
       selectedColumnMetadata.add(columnMetadata);
@@ -174,23 +174,22 @@ public final class ParquetSchema {
   }
 
   /**
-   * Determine if a Parquet field is selected for the query. It is selected
+   * Determine if a Parquet column is selected for the query. It is selected
    * either if this is a star query (we want all columns), or the column
    * appears in the select list.
    *
-   * @param field the Parquet column expressed as as Drill field.
+   * @param column the Parquet column expressed as column descriptor
    * @return true if the column is to be included in the scan, false
    * if not
    */
-
-  private boolean fieldSelected(MaterializedField field) {
+  private boolean columnSelected(ColumnDescriptor column) {
     if (isStarQuery()) {
       return true;
     }
 
     int i = 0;
     for (SchemaPath expr : selectedCols) {
-      if (field.getName().equalsIgnoreCase(expr.getRootSegmentPath())) {
+      if (ParquetReaderUtility.getFullColumnPath(column).equalsIgnoreCase(expr.getUnIndexed().toString())) {
         columnsFound[i] = true;
         return true;
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestComplexColumnInSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestComplexColumnInSchema.java
new file mode 100644
index 0000000..d0977b8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestComplexColumnInSchema.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import java.io.IOException;
+
+/**
+ * This test checks correctness of complex column detection in the Parquet file schema.
+ */
+public class TestComplexColumnInSchema {
+
+  /*
+  Parquet schema:
+    message root {
+      optional int64 id;
+      optional binary a (UTF8);
+      repeated int64 repeated;
+      optional binary VariableCase (UTF8);
+      optional group nested {
+        optional int64 id;
+        repeated int64 repeated;
+        optional binary VaRiAbLeCaSe (UTF8);
+      }
+    }
+
+   Data set:
+   complex_special_cases.parquet
+   {
+     "id": 1,
+     "a": "some string",
+     "repeated": [1, 2],
+     "VariableCase": "top level variable case column",
+     "nested": {
+       "id": 2,
+       "repeated": [3, 4],
+       "VaRiAbLeCaSe": "nested variable case column"
+     }
+   }
+   */
+  private static final String path = "src/test/resources/store/parquet/complex/complex_special_cases.parquet";
+  private static ParquetMetadata footer;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    Configuration conf = new Configuration();
+
+    footer = ParquetFileReader.readFooter(conf, new Path(path));
+  }
+
+  @Test
+  public void testGroupTypeColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nested"));
+    assertTrue("GroupType column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testNestedColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nested", "id"));
+    assertTrue("Nested column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testCombinedColumns() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("id"));
+    columns.add(SchemaPath.getCompoundPath("nested", "id"));
+    assertTrue("Nested column in the list list must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testSimpleColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("id"));
+    assertFalse("No complex column must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testSimpleColumns() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("id"));
+    columns.add(SchemaPath.getCompoundPath("a"));
+    assertFalse("No complex columns must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testNonexistentColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nonexistent"));
+    assertFalse("No complex column must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testVariableCaseColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("variablecase"));
+    assertFalse("No complex column must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testVariableCaseSchemaPath() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("VaRiAbLeCaSe"));
+    assertFalse("No complex column must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testNestedVariableCaseColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nested", "variablecase"));
+    assertTrue("Nested variable case column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testRepeatedColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("repeated"));
+    assertTrue("Repeated column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testNestedRepeatedColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nested", "repeated"));
+    assertTrue("Nested repeated column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderUtility.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderUtility.java
new file mode 100644
index 0000000..4b24212
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderUtility.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TestParquetReaderUtility {
+
+  private static final String path = "src/test/resources/store/parquet/complex/complex.parquet";
+  private static ParquetMetadata footer;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    Configuration conf = new Configuration();
+
+    footer = ParquetFileReader.readFooter(conf, new Path(path));
+  }
+
+  @Test
+  public void testSchemaElementsMap() {
+    Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
+    assertEquals("Schema elements map size must be 14", schemaElements.size(), 14);
+
+    SchemaElement schemaElement = schemaElements.get("`marketing_info`.`camp_id`");
+    assertNotNull("Schema element must be not null", schemaElement);
+    assertEquals("Schema element must be named 'camp_id'", schemaElement.getName(), "camp_id");
+
+    schemaElement = schemaElements.get("`marketing_info`");
+    assertNotNull("Schema element must be not null", schemaElement);
+    assertEquals("Schema element name match lookup key", schemaElement.getName(), "marketing_info");
+  }
+
+  @Test
+  public void testColumnDescriptorMap() {
+    Map<String, ColumnDescriptor> colDescMap = ParquetReaderUtility.getColNameToColumnDescriptorMapping(footer);
+    assertEquals("Column descriptors map size must be 11", colDescMap.size(), 11);
+
+    assertNotNull("column descriptor lookup must return not null", colDescMap.get("`marketing_info`.`camp_id`"));
+    assertNull("column descriptor lookup must return null on GroupType column", colDescMap.get("`marketing_info`"));
+  }
+}
diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/complex_special_cases.parquet b/exec/java-exec/src/test/resources/store/parquet/complex/complex_special_cases.parquet
new file mode 100644
index 0000000..bbfb6d4
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/parquet/complex/complex_special_cases.parquet differ