You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/09/25 19:13:07 UTC

git commit: PARQUET-101: fix meta data lookup when not using task.side.metadata

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 59c58d0b8 -> 0c4f13a84


PARQUET-101: fix meta data lookup when not using task.side.metadata

Author: julien <ju...@twitter.com>

Closes #64 from julienledem/PARQUET-101 and squashes the following commits:

54ffbc9 [julien] fix meta data lookup when not using task.side.metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/0c4f13a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/0c4f13a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/0c4f13a8

Branch: refs/heads/master
Commit: 0c4f13a846b458e31cfcaafd8e83f0f4c1d04237
Parents: 59c58d0
Author: julien <ju...@twitter.com>
Authored: Thu Sep 25 10:12:58 2014 -0700
Committer: Tianshuo Deng <td...@twitter.com>
Committed: Thu Sep 25 10:12:58 2014 -0700

----------------------------------------------------------------------
 .../parquet/hadoop/ParquetRecordReader.java     | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/0c4f13a8/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
index 6b89e37..955a610 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
@@ -15,7 +15,11 @@
  */
 package parquet.hadoop;
 
+import static parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static parquet.format.converter.ParquetMetadataConverter.range;
+import static parquet.hadoop.ParquetFileReader.readFooter;
+import static parquet.hadoop.ParquetInputFormat.getFilter;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -36,8 +40,8 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import parquet.Log;
 import parquet.filter.UnboundRecordFilter;
 import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.RowGroupFilter;
 import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.RowGroupFilter;
 import parquet.hadoop.api.ReadSupport;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
@@ -142,18 +146,19 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
 
   private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException {
     Path path = split.getPath();
-    ParquetMetadata footer = ParquetFileReader.readFooter(
-        configuration, path, range(split.getStart(), split.getEnd()));
     long[] rowGroupOffsets = split.getRowGroupOffsets();
     List<BlockMetaData> filteredBlocks;
+    ParquetMetadata footer;
     // if task.side.metadata is set, rowGroupOffsets is null
-    MessageType fileSchema = footer.getFileMetaData().getSchema();
     if (rowGroupOffsets == null) {
       // then we need to apply the predicate push down filter
-      Filter filter = ParquetInputFormat.getFilter(configuration);
-      filteredBlocks = RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema);
+      footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
+      MessageType fileSchema = footer.getFileMetaData().getSchema();
+      Filter filter = getFilter(configuration);
+      filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
     } else {
       // otherwise we find the row groups that were selected on the client
+      footer = readFooter(configuration, path, NO_FILTER);
       Set<Long> offsets = new HashSet<Long>();
       for (long offset : rowGroupOffsets) {
         offsets.add(offset);
@@ -180,11 +185,12 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
             + " in range " + split.getStart() + ", " + split.getEnd());
       }
     }
+    MessageType fileSchema = footer.getFileMetaData().getSchema();
     MessageType requestedSchema = MessageTypeParser.parseMessageType(split.getRequestedSchema());
     Map<String, String> fileMetaData = footer.getFileMetaData().getKeyValueMetaData();
     Map<String, String> readSupportMetadata = split.getReadSupportMetadata();
     internalReader.initialize(
-        requestedSchema,fileSchema,
+        requestedSchema, fileSchema,
         fileMetaData, readSupportMetadata,
         path,
         filteredBlocks, configuration);