You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/09/25 19:13:07 UTC
git commit: PARQUET-101: fix meta data lookup when not using
task.side.metadata
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master 59c58d0b8 -> 0c4f13a84
PARQUET-101: fix meta data lookup when not using task.side.metadata
Author: julien <ju...@twitter.com>
Closes #64 from julienledem/PARQUET-101 and squashes the following commits:
54ffbc9 [julien] fix meta data lookup when not using task.side.metadata
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/0c4f13a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/0c4f13a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/0c4f13a8
Branch: refs/heads/master
Commit: 0c4f13a846b458e31cfcaafd8e83f0f4c1d04237
Parents: 59c58d0
Author: julien <ju...@twitter.com>
Authored: Thu Sep 25 10:12:58 2014 -0700
Committer: Tianshuo Deng <td...@twitter.com>
Committed: Thu Sep 25 10:12:58 2014 -0700
----------------------------------------------------------------------
.../parquet/hadoop/ParquetRecordReader.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/0c4f13a8/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
index 6b89e37..955a610 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
@@ -15,7 +15,11 @@
*/
package parquet.hadoop;
+import static parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static parquet.format.converter.ParquetMetadataConverter.range;
+import static parquet.hadoop.ParquetFileReader.readFooter;
+import static parquet.hadoop.ParquetInputFormat.getFilter;
import java.io.IOException;
import java.util.ArrayList;
@@ -36,8 +40,8 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import parquet.Log;
import parquet.filter.UnboundRecordFilter;
import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.RowGroupFilter;
import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.RowGroupFilter;
import parquet.hadoop.api.ReadSupport;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
@@ -142,18 +146,19 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException {
Path path = split.getPath();
- ParquetMetadata footer = ParquetFileReader.readFooter(
- configuration, path, range(split.getStart(), split.getEnd()));
long[] rowGroupOffsets = split.getRowGroupOffsets();
List<BlockMetaData> filteredBlocks;
+ ParquetMetadata footer;
// if task.side.metadata is set, rowGroupOffsets is null
- MessageType fileSchema = footer.getFileMetaData().getSchema();
if (rowGroupOffsets == null) {
// then we need to apply the predicate push down filter
- Filter filter = ParquetInputFormat.getFilter(configuration);
- filteredBlocks = RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema);
+ footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
+ MessageType fileSchema = footer.getFileMetaData().getSchema();
+ Filter filter = getFilter(configuration);
+ filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
} else {
// otherwise we find the row groups that were selected on the client
+ footer = readFooter(configuration, path, NO_FILTER);
Set<Long> offsets = new HashSet<Long>();
for (long offset : rowGroupOffsets) {
offsets.add(offset);
@@ -180,11 +185,12 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
+ " in range " + split.getStart() + ", " + split.getEnd());
}
}
+ MessageType fileSchema = footer.getFileMetaData().getSchema();
MessageType requestedSchema = MessageTypeParser.parseMessageType(split.getRequestedSchema());
Map<String, String> fileMetaData = footer.getFileMetaData().getKeyValueMetaData();
Map<String, String> readSupportMetadata = split.getReadSupportMetadata();
internalReader.initialize(
- requestedSchema,fileSchema,
+ requestedSchema, fileSchema,
fileMetaData, readSupportMetadata,
path,
filteredBlocks, configuration);