You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/11 16:42:26 UTC

[GitHub] arina-ielchiieva closed pull request #1370: DRILL-5797: Use Parquet new reader in all non-complex column queries

arina-ielchiieva closed pull request #1370: DRILL-5797: Use Parquet new reader in all non-complex column queries
URL: https://github.com/apache/drill/pull/1370
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index dc09ce1b695..47f2e18a9a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -25,22 +25,21 @@
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -107,7 +106,10 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
           ParquetReaderUtility.detectCorruptDates(footer, rowGroupScan.getColumns(), autoCorrectCorruptDates);
         logger.debug("Contains corrupt dates: {}", containsCorruptDates);
 
-        if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) && !isComplex(footer)) {
+        if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER)
+            && !ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns())) {
+          logger.debug("Query {} qualifies for new Parquet reader",
+              QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()));
           readers.add(new ParquetRecordReader(context,
               rowGroup.getPath(),
               rowGroup.getRowGroupIndex(),
@@ -118,6 +120,8 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
               rowGroupScan.getColumns(),
               containsCorruptDates));
         } else {
+          logger.debug("Query {} doesn't qualify for new reader, using old one",
+              QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()));
           readers.add(new DrillParquetReader(context,
               footer,
               rowGroup,
@@ -161,22 +165,6 @@ private ParquetMetadata readFooter(Configuration conf, String path) throws IOExc
     }
   }
 
-  private boolean isComplex(ParquetMetadata footer) {
-    MessageType schema = footer.getFileMetaData().getSchema();
-
-    for (Type type : schema.getFields()) {
-      if (!type.isPrimitive()) {
-        return true;
-      }
-    }
-    for (ColumnDescriptor col : schema.getColumns()) {
-      if (col.getMaxRepetitionLevel() > 0) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   /**
    * Helper class responsible for creating and managing DrillFileSystem.
    */
@@ -190,5 +178,4 @@ protected AbstractDrillFileSystemManager(OperatorContext operatorContext) {
 
     protected abstract DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index a7f78fb40c6..6960b35a031 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -20,7 +20,6 @@
 import com.google.common.collect.Sets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
@@ -41,8 +40,10 @@
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
 import org.joda.time.Chronology;
 import org.joda.time.DateTimeConstants;
 import org.apache.parquet.example.data.simple.NanoTime;
@@ -51,6 +52,7 @@
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -140,15 +142,90 @@ public static int getIntFromLEBytes(byte[] input, int start) {
     return out;
   }
 
+  /**
+   * Map full schema paths in format `a`.`b`.`c` to respective SchemaElement objects.
+   *
+   * @param footer Parquet file metadata
+   * @return       schema full path to SchemaElement map
+   */
   public static Map<String, SchemaElement> getColNameToSchemaElementMapping(ParquetMetadata footer) {
-    HashMap<String, SchemaElement> schemaElements = new HashMap<>();
+    Map<String, SchemaElement> schemaElements = new HashMap<>();
     FileMetaData fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
-    for (SchemaElement se : fileMetaData.getSchema()) {
-      schemaElements.put(se.getName(), se);
+
+    Iterator<SchemaElement> iter = fileMetaData.getSchema().iterator();
+
+    // First element in collection is default `root` element. We skip it to maintain key in `a` format instead of `root`.`a`,
+    // and thus to avoid the need to cut it out again when comparing with SchemaPath string representation
+    if (iter.hasNext()) {
+      iter.next();
+    }
+    while (iter.hasNext()) {
+      addSchemaElementMapping(iter, new StringBuilder(), schemaElements);
     }
     return schemaElements;
   }
 
+  /**
+   * Populate full path to SchemaElement map by recursively traversing schema elements referenced by the given iterator
+   *
+   * @param iter file schema values iterator
+   * @param path parent schema element path
+   * @param schemaElements schema elements map to insert next iterator element into
+   */
+  private static void addSchemaElementMapping(Iterator<SchemaElement> iter, StringBuilder path,
+      Map<String, SchemaElement> schemaElements) {
+    SchemaElement schemaElement = iter.next();
+    path.append('`').append(schemaElement.getName().toLowerCase()).append('`');
+    schemaElements.put(path.toString(), schemaElement);
+
+    // for each element that has children we need to maintain remaining children count
+    // to exit current recursion level when no more children is left
+    int remainingChildren = schemaElement.getNum_children();
+
+    while (remainingChildren > 0 && iter.hasNext()) {
+      addSchemaElementMapping(iter, new StringBuilder(path).append('.'), schemaElements);
+      remainingChildren--;
+    }
+    return;
+  }
+
+  /**
+   * generate full path of the column in format `a`.`b`.`c`
+   *
+   * @param column ColumnDescriptor object
+   * @return       full path in format `a`.`b`.`c`
+   */
+  public static String getFullColumnPath(ColumnDescriptor column) {
+    StringBuilder sb = new StringBuilder();
+    String[] path = column.getPath();
+    for (int i = 0; i < path.length; i++) {
+      sb.append("`").append(path[i].toLowerCase()).append("`").append(".");
+    }
+
+    // remove trailing dot
+    if (sb.length() > 0) {
+      sb.deleteCharAt(sb.length() - 1);
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Map full column paths to all ColumnDescriptors in file schema
+   *
+   * @param footer Parquet file metadata
+   * @return       column full path to ColumnDescriptor object map
+   */
+  public static Map<String, ColumnDescriptor> getColNameToColumnDescriptorMapping(ParquetMetadata footer) {
+    Map<String, ColumnDescriptor> colDescMap = new HashMap<>();
+    List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
+
+    for (ColumnDescriptor column : columns) {
+      colDescMap.put(getFullColumnPath(column), column);
+    }
+    return colDescMap;
+  }
+
   public static int autoCorrectCorruptedDate(int corruptedDate) {
     return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT);
   }
@@ -361,7 +438,6 @@ else if (Boolean.valueOf(isDateCorrect)) {
     }
   }
 
-
   /**
    * Detect corrupt date values by looking at the min/max values in the metadata.
    *
@@ -401,9 +477,9 @@ public static DateCorruptionStatus checkForCorruptDateValuesInStatistics(Parquet
         // creating a NameSegment makes sure we are using the standard code for comparing names,
         // currently it is all case-insensitive
         if (Utilities.isStarQuery(columns)
-            || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
+            || getFullColumnPath(column).equalsIgnoreCase(schemaPath.getUnIndexed().toString())) {
           int colIndex = -1;
-          ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();
+          ConvertedType convertedType = schemaElements.get(getFullColumnPath(column)).getConverted_type();
           if (convertedType != null && convertedType.equals(ConvertedType.DATE)) {
             List<ColumnChunkMetaData> colChunkList = footer.getBlocks().get(rowGroupIndex).getColumns();
             for (int j = 0; j < colChunkList.size(); j++) {
@@ -525,4 +601,57 @@ public static long getDateTimeValueFromBinary(Binary binaryTimeStampValue, boole
     }
   }
 
+  /**
+   * Check whether any of columns in the given list is either nested or repetitive.
+   *
+   * @param footer  Parquet file schema
+   * @param columns list of query SchemaPath objects
+   */
+  public static boolean containsComplexColumn(ParquetMetadata footer, List<SchemaPath> columns) {
+
+    MessageType schema = footer.getFileMetaData().getSchema();
+
+    if (Utilities.isStarQuery(columns)) {
+      for (Type type : schema.getFields()) {
+        if (!type.isPrimitive()) {
+          return true;
+        }
+      }
+      for (ColumnDescriptor col : schema.getColumns()) {
+        if (col.getMaxRepetitionLevel() > 0) {
+          return true;
+        }
+      }
+      return false;
+    } else {
+      Map<String, ColumnDescriptor> colDescMap = ParquetReaderUtility.getColNameToColumnDescriptorMapping(footer);
+      Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
+
+      for (SchemaPath schemaPath : columns) {
+        // Schema path which is non-leaf is complex column
+        if (!schemaPath.isLeaf()) {
+          logger.trace("rowGroupScan contains complex column: {}", schemaPath.getUnIndexed().toString());
+          return true;
+        }
+
+        // following column descriptor lookup failure may mean two cases, depending on subsequent SchemaElement lookup:
+        // 1. success: queried column is complex, i.e. GroupType
+        // 2. failure: queried column is not in schema and thus is non-complex
+        ColumnDescriptor column = colDescMap.get(schemaPath.getUnIndexed().toString().toLowerCase());
+
+        if (column == null) {
+          SchemaElement schemaElement = schemaElements.get(schemaPath.getUnIndexed().toString().toLowerCase());
+          if (schemaElement != null) {
+            return true;
+          }
+        } else {
+          if (column.getMaxRepetitionLevel() > 0) {
+            logger.trace("rowGroupScan contains repetitive column: {}", schemaPath.getUnIndexed().toString());
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
index 147938d4b93..9c2444c2837 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
@@ -29,6 +29,7 @@
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -56,8 +57,8 @@ public ParquetColumnMetadata(ColumnDescriptor column) {
   }
 
   public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) {
-    se = schemaElements.get(column.getPath()[0]);
-    type = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
+    se = schemaElements.get(ParquetReaderUtility.getFullColumnPath(column));
+    type = ParquetToDrillTypeConverter.toMajorType(column.getType(), column.getTypeLength(),
         getDataMode(column), se, options);
     field = MaterializedField.create(toFieldName(column.getPath()).getLastSegment().getNameSegment().getPath(), type);
     length = getDataTypeLength();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 67174459e0a..c5f70f3c5b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -130,7 +130,7 @@ private void loadParquetSchema() {
     for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) {
       ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column);
       columnMetadata.resolveDrillType(schemaElements, options);
-      if (! fieldSelected(columnMetadata.field)) {
+      if (! columnSelected(column)) {
         continue;
       }
       selectedColumnMetadata.add(columnMetadata);
@@ -174,23 +174,22 @@ public BlockMetaData getRowGroupMetadata() {
   }
 
   /**
-   * Determine if a Parquet field is selected for the query. It is selected
+   * Determine if a Parquet column is selected for the query. It is selected
    * either if this is a star query (we want all columns), or the column
    * appears in the select list.
    *
-   * @param field the Parquet column expressed as as Drill field.
+   * @param column the Parquet column expressed as column descriptor
    * @return true if the column is to be included in the scan, false
    * if not
    */
-
-  private boolean fieldSelected(MaterializedField field) {
+  private boolean columnSelected(ColumnDescriptor column) {
     if (isStarQuery()) {
       return true;
     }
 
     int i = 0;
     for (SchemaPath expr : selectedCols) {
-      if (field.getName().equalsIgnoreCase(expr.getRootSegmentPath())) {
+      if (ParquetReaderUtility.getFullColumnPath(column).equalsIgnoreCase(expr.getUnIndexed().toString())) {
         columnsFound[i] = true;
         return true;
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestComplexColumnInSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestComplexColumnInSchema.java
new file mode 100644
index 00000000000..d0977b8f38f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestComplexColumnInSchema.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import java.io.IOException;
+
+/**
+ * This test checks correctness of complex column detection in the Parquet file schema.
+ */
+public class TestComplexColumnInSchema {
+
+  /*
+  Parquet schema:
+    message root {
+      optional int64 id;
+      optional binary a (UTF8);
+      repeated int64 repeated;
+      optional binary VariableCase (UTF8);
+      optional group nested {
+        optional int64 id;
+        repeated int64 repeated;
+        optional binary VaRiAbLeCaSe (UTF8);
+      }
+    }
+
+   Data set:
+   complex_special_cases.parquet
+   {
+     "id": 1,
+     "a": "some string",
+     "repeated": [1, 2],
+     "VariableCase": "top level variable case column",
+     "nested": {
+       "id": 2,
+       "repeated": [3, 4],
+       "VaRiAbLeCaSe": "nested variable case column"
+     }
+   }
+   */
+  private static final String path = "src/test/resources/store/parquet/complex/complex_special_cases.parquet";
+  private static ParquetMetadata footer;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    Configuration conf = new Configuration();
+
+    footer = ParquetFileReader.readFooter(conf, new Path(path));
+  }
+
+  @Test
+  public void testGroupTypeColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nested"));
+    assertTrue("GroupType column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testNestedColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nested", "id"));
+    assertTrue("Nested column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testCombinedColumns() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("id"));
+    columns.add(SchemaPath.getCompoundPath("nested", "id"));
+    assertTrue("Nested column in the list list must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testSimpleColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("id"));
+    assertFalse("No complex column must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testSimpleColumns() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("id"));
+    columns.add(SchemaPath.getCompoundPath("a"));
+    assertFalse("No complex columns must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testNonexistentColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nonexistent"));
+    assertFalse("No complex column must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testVariableCaseColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("variablecase"));
+    assertFalse("No complex column must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testVariableCaseSchemaPath() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("VaRiAbLeCaSe"));
+    assertFalse("No complex column must be detected",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testNestedVariableCaseColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nested", "variablecase"));
+    assertTrue("Nested variable case column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testRepeatedColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("repeated"));
+    assertTrue("Repeated column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+
+  @Test
+  public void testNestedRepeatedColumn() {
+    List<SchemaPath> columns = new ArrayList<>();
+    columns.add(SchemaPath.getCompoundPath("nested", "repeated"));
+    assertTrue("Nested repeated column must be detected as complex",
+        ParquetReaderUtility.containsComplexColumn(footer, columns));
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderUtility.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderUtility.java
new file mode 100644
index 00000000000..4b24212c378
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderUtility.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TestParquetReaderUtility {
+
+  private static final String path = "src/test/resources/store/parquet/complex/complex.parquet";
+  private static ParquetMetadata footer;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    Configuration conf = new Configuration();
+
+    footer = ParquetFileReader.readFooter(conf, new Path(path));
+  }
+
+  @Test
+  public void testSchemaElementsMap() {
+    Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
+    assertEquals("Schema elements map size must be 14", schemaElements.size(), 14);
+
+    SchemaElement schemaElement = schemaElements.get("`marketing_info`.`camp_id`");
+    assertNotNull("Schema element must be not null", schemaElement);
+    assertEquals("Schema element must be named 'camp_id'", schemaElement.getName(), "camp_id");
+
+    schemaElement = schemaElements.get("`marketing_info`");
+    assertNotNull("Schema element must be not null", schemaElement);
+    assertEquals("Schema element name match lookup key", schemaElement.getName(), "marketing_info");
+  }
+
+  @Test
+  public void testColumnDescriptorMap() {
+    Map<String, ColumnDescriptor> colDescMap = ParquetReaderUtility.getColNameToColumnDescriptorMapping(footer);
+    assertEquals("Column descriptors map size must be 11", colDescMap.size(), 11);
+
+    assertNotNull("column descriptor lookup must return not null", colDescMap.get("`marketing_info`.`camp_id`"));
+    assertNull("column descriptor lookup must return null on GroupType column", colDescMap.get("`marketing_info`"));
+  }
+}
diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/complex_special_cases.parquet b/exec/java-exec/src/test/resources/store/parquet/complex/complex_special_cases.parquet
new file mode 100644
index 00000000000..bbfb6d4096c
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/parquet/complex/complex_special_cases.parquet differ


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services