You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/29 17:05:29 UTC

svn commit: r1635187 - in /hive/branches/branch-0.14: ./ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java

Author: brock
Date: Wed Oct 29 16:05:28 2014
New Revision: 1635187

URL: http://svn.apache.org/r1635187
Log:
Merge HIVE-7800 - Parquet Column Index Access Schema Size Checking from trunk

Modified:
    hive/branches/branch-0.14/   (props changed)
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java

Propchange: hive/branches/branch-0.14/
------------------------------------------------------------------------------
  Merged /hive/trunk:r1629752

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1635187&r1=1635186&r2=1635187&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Wed Oct 29 16:05:28 2014
@@ -75,6 +75,7 @@ public class DataWritableReadSupport ext
       final Map<String, String> keyValueMetaData, final MessageType fileSchema) {
     final String columns = configuration.get(IOConstants.COLUMNS);
     final Map<String, String> contextMetadata = new HashMap<String, String>();
+    final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
     if (columns != null) {
       final List<String> listColumns = getColumns(columns);
       final Map<String, String> lowerCaseFileSchemaColumns = new HashMap<String,String>();
@@ -82,45 +83,50 @@ public class DataWritableReadSupport ext
         lowerCaseFileSchemaColumns.put(c.getPath()[0].toLowerCase(), c.getPath()[0]);
       }
       final List<Type> typeListTable = new ArrayList<Type>();
-      for (String col : listColumns) {
-        col = col.toLowerCase();
-        // listColumns contains partition columns which are metadata only
-        if (lowerCaseFileSchemaColumns.containsKey(col)) {
-          typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
-        } else {
-          // below allows schema evolution
-          typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col));
+      if(indexAccess) {
+        for (int index = 0; index < listColumns.size(); index++) {
+          //Take columns based on index or pad the field
+          if(index < fileSchema.getFieldCount()) {
+            typeListTable.add(fileSchema.getType(index));
+          } else {
+            //prefixing with '_mask_' to ensure no conflict with named
+            //columns in the file schema
+            typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_mask_"+listColumns.get(index)));
+          }
+        }
+      } else {
+        for (String col : listColumns) {
+          col = col.toLowerCase();
+          // listColumns contains partition columns which are metadata only
+          if (lowerCaseFileSchemaColumns.containsKey(col)) {
+            typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
+          } else {
+            // below allows schema evolution
+            typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col));
+          }
         }
       }
       MessageType tableSchema = new MessageType(TABLE_SCHEMA, typeListTable);
       contextMetadata.put(HIVE_SCHEMA_KEY, tableSchema.toString());
 
-      MessageType requestedSchemaByUser = tableSchema;
       final List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
 
       final List<Type> typeListWanted = new ArrayList<Type>();
-      final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
+
       for (final Integer idx : indexColumnsWanted) {
         if (idx < listColumns.size()) {
           String col = listColumns.get(idx);
           if (indexAccess) {
-            typeListWanted.add(tableSchema.getType(col));
+              typeListWanted.add(fileSchema.getFields().get(idx));
           } else {
             col = col.toLowerCase();
             if (lowerCaseFileSchemaColumns.containsKey(col)) {
               typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col)));
-            } else {
-              // should never occur?
-              String msg = "Column " + col + " at index " + idx + " does not exist in " +
-              lowerCaseFileSchemaColumns;
-              throw new IllegalStateException(msg);
             }
           }
         }
       }
-      requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(),
-              typeListWanted), fileSchema, configuration);
-
+      MessageType requestedSchemaByUser = new MessageType(fileSchema.getName(), typeListWanted);
       return new ReadContext(requestedSchemaByUser, contextMetadata);
     } else {
       contextMetadata.put(HIVE_SCHEMA_KEY, fileSchema.toString());
@@ -147,26 +153,7 @@ public class DataWritableReadSupport ext
       throw new IllegalStateException("ReadContext not initialized properly. " +
         "Don't know the Hive Schema.");
     }
-    final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
-        parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration);
+    final MessageType tableSchema = MessageTypeParser.parseMessageType(metadata.get(HIVE_SCHEMA_KEY));
     return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
   }
-
-  /**
-  * Determine the file column names based on the position within the requested columns and
-  * use that as the requested schema.
-  */
-  private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema,
-          Configuration configuration) {
-    if (configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) {
-      final List<String> listColumns = getColumns(configuration.get(IOConstants.COLUMNS));
-      List<Type> requestedTypes = new ArrayList<Type>();
-      for(Type t : requestedSchema.getFields()) {
-        int index = listColumns.indexOf(t.getName());
-        requestedTypes.add(fileSchema.getType(index));
-      }
-      requestedSchema = new MessageType(requestedSchema.getName(), requestedTypes);
-    }
-    return requestedSchema;
-  }
-}
\ No newline at end of file
+}