You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2022/03/20 00:00:30 UTC
[parquet-mr] branch master updated: PARQUET-2117: Expose Row Index via ParquetReader and ParquetRecordReader (#945)
This is an automated email from the ASF dual-hosted git repository.
shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new c7bff51 PARQUET-2117: Expose Row Index via ParquetReader and ParquetRecordReader (#945)
c7bff51 is described below
commit c7bff519094920a8609df6cbd98821a43ed779e3
Author: Prakhar Jain <pr...@gmail.com>
AuthorDate: Sat Mar 19 17:00:24 2022 -0700
PARQUET-2117: Expose Row Index via ParquetReader and ParquetRecordReader (#945)
* PARQUET-2117: Changes to generate row index in InternalParquetRecordReader, also expose the row index via ParquetReader or ParquetRecordReader
- Add and populate rowIndexOffset field in BlockMetaData
- Changes to generate row index in InternalParquetRecordReader, also expose the row index via ParquetReader or ParquetRecordReader
- Add new unit tests and extend all the ColumnIndexFiltering and BloomFiltering unit tests to validate row indexes also.
* address review comments
* add test based on old parquet file without column indexes
* address review comments - Return -1 when row index info not available, document the same, Return -1 when rowIndexOffset info not available in BlockMetadata
* address review comments - Fix java doc style
* address review comments from ggershinsky - early return and reduce indentation
* fix build
---
.../apache/parquet/column/page/PageReadStore.java | 8 +
.../format/converter/ParquetMetadataConverter.java | 63 +++++--
.../parquet/hadoop/ColumnChunkPageReadStore.java | 18 +-
.../hadoop/InternalParquetRecordReader.java | 55 ++++++-
.../apache/parquet/hadoop/ParquetFileReader.java | 4 +-
.../org/apache/parquet/hadoop/ParquetReader.java | 10 ++
.../apache/parquet/hadoop/ParquetRecordReader.java | 7 +
.../parquet/hadoop/metadata/BlockMetaData.java | 19 ++-
.../filter2/recordlevel/PhoneBookWriter.java | 19 ++-
.../apache/parquet/hadoop/TestBloomFiltering.java | 2 +-
.../parquet/hadoop/TestColumnIndexFiltering.java | 4 +-
.../apache/parquet/hadoop/TestParquetReader.java | 181 +++++++++++++++++++++
.../test-file-with-no-column-indexes-1.parquet | Bin 0 -> 35855 bytes
13 files changed, 370 insertions(+), 20 deletions(-)
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
index 753bda8..796cf17 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
@@ -44,6 +44,14 @@ public interface PageReadStore {
long getRowCount();
/**
+ * @return the optional of the long representing the row index offset of this row group or an empty optional if the
+ * related data is not available
+ */
+ default Optional<Long> getRowIndexOffset() {
+ return Optional.empty();
+ }
+
+ /**
* Returns the indexes of the rows to be read/built if the related data is available. All the rows which index is not
* returned shall be skipped.
*
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 96980a4..0ea75f3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1400,6 +1400,31 @@ public class ParquetMetadataConverter {
return readParquetMetadata(from, filter, null, false, 0);
}
+ private Map<RowGroup, Long> generateRowGroupOffsets(FileMetaData metaData) {
+ Map<RowGroup, Long> rowGroupOrdinalToRowIdx = new HashMap<>();
+ List<RowGroup> rowGroups = metaData.getRow_groups();
+ if (rowGroups != null) {
+ long rowIdxSum = 0;
+ for (int i = 0; i < rowGroups.size(); i++) {
+ rowGroupOrdinalToRowIdx.put(rowGroups.get(i), rowIdxSum);
+ rowIdxSum += rowGroups.get(i).getNum_rows();
+ }
+ }
+ return rowGroupOrdinalToRowIdx;
+ }
+
+ /**
+ * A container for [[FileMetaData]] and [[RowGroup]] to ROW_INDEX offset map.
+ */
+ private class FileMetaDataAndRowGroupOffsetInfo {
+ final FileMetaData fileMetadata;
+ final Map<RowGroup, Long> rowGroupToRowIndexOffsetMap;
+ public FileMetaDataAndRowGroupOffsetInfo(FileMetaData fileMetadata, Map<RowGroup, Long> rowGroupToRowIndexOffsetMap) {
+ this.fileMetadata = fileMetadata;
+ this.rowGroupToRowIndexOffsetMap = rowGroupToRowIndexOffsetMap;
+ }
+ }
+
public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter,
final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter,
final int combinedFooterLength) throws IOException {
@@ -1407,27 +1432,35 @@ public class ParquetMetadataConverter {
final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? fileDecryptor.fetchFooterDecryptor() : null);
final byte[] encryptedFooterAAD = (encryptedFooter? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);
- FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
+ FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
@Override
- public FileMetaData visit(NoFilter filter) throws IOException {
- return readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
+ FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
- public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
- return readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
+ public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
+ FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
+ return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
- public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
- return filterFileMetaDataByStart(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
+ public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
+ FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
+ return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
- public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
- return filterFileMetaDataByMidpoint(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
+ public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
+ FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
+ return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata));
}
});
+ FileMetaData fileMetaData = fileMetaDataAndRowGroupInfo.fileMetadata;
+ Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = fileMetaDataAndRowGroupInfo.rowGroupToRowIndexOffsetMap;
LOG.debug("{}", fileMetaData);
if (!encryptedFooter && null != fileDecryptor) {
@@ -1447,7 +1480,7 @@ public class ParquetMetadataConverter {
}
}
- ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter);
+ ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter, rowGroupToRowIndexOffsetMap);
if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
return parquetMetadata;
}
@@ -1476,6 +1509,13 @@ public class ParquetMetadataConverter {
public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws IOException {
+ return fromParquetMetadata(parquetMetadata, fileDecryptor, encryptedFooter, new HashMap<RowGroup, Long>());
+ }
+
+ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
+ InternalFileDecryptor fileDecryptor,
+ boolean encryptedFooter,
+ Map<RowGroup, Long> rowGroupToRowIndexOffsetMap) throws IOException {
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
@@ -1485,6 +1525,9 @@ public class ParquetMetadataConverter {
BlockMetaData blockMetaData = new BlockMetaData();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+ if (rowGroupToRowIndexOffsetMap.containsKey(rowGroup)) {
+ blockMetaData.setRowIndexOffset(rowGroupToRowIndexOffsetMap.get(rowGroup));
+ }
// not set in legacy files
if (rowGroup.isSetOrdinal()) {
blockMetaData.setOrdinal(rowGroup.getOrdinal());
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 3d1bafe..85ba98c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -248,15 +248,26 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
private final Map<ColumnDescriptor, ColumnChunkPageReader> readers = new HashMap<ColumnDescriptor, ColumnChunkPageReader>();
private final long rowCount;
+ private final long rowIndexOffset;
private final RowRanges rowRanges;
public ColumnChunkPageReadStore(long rowCount) {
+ this(rowCount, -1);
+ }
+
+ ColumnChunkPageReadStore(RowRanges rowRanges) {
+ this(rowRanges, -1);
+ }
+
+ ColumnChunkPageReadStore(long rowCount, long rowIndexOffset) {
this.rowCount = rowCount;
+ this.rowIndexOffset = rowIndexOffset;
rowRanges = null;
}
- ColumnChunkPageReadStore(RowRanges rowRanges) {
+ ColumnChunkPageReadStore(RowRanges rowRanges, long rowIndexOffset) {
this.rowRanges = rowRanges;
+ this.rowIndexOffset = rowIndexOffset;
rowCount = rowRanges.rowCount();
}
@@ -266,6 +277,11 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
}
@Override
+ public Optional<Long> getRowIndexOffset() {
+ return rowIndexOffset < 0 ? Optional.empty() : Optional.of(rowIndexOffset);
+ }
+
+ @Override
public PageReader getPageReader(ColumnDescriptor path) {
final PageReader pageReader = readers.get(path);
if (pageReader == null) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 8ffe19f..8203e90 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
+import java.util.Optional;
+import java.util.PrimitiveIterator;
import java.util.Set;
+import java.util.stream.LongStream;
import org.apache.hadoop.conf.Configuration;
@@ -69,6 +71,8 @@ class InternalParquetRecordReader<T> {
private long current = 0;
private int currentBlock = -1;
private ParquetFileReader reader;
+ private long currentRowIdx = -1;
+ private PrimitiveIterator.OfLong rowIdxInFileItr;
private org.apache.parquet.io.RecordReader<T> recordReader;
private boolean strictTypeChecking;
@@ -127,6 +131,7 @@ class InternalParquetRecordReader<T> {
if (pages == null) {
throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
}
+ resetRowIndexIterator(pages);
long timeSpentReading = System.currentTimeMillis() - t0;
totalTimeSpentReadingBytes += timeSpentReading;
BenchmarkCounter.incrementTime(timeSpentReading);
@@ -227,6 +232,11 @@ class InternalParquetRecordReader<T> {
try {
currentValue = recordReader.read();
+ if (rowIdxInFileItr != null && rowIdxInFileItr.hasNext()) {
+ currentRowIdx = rowIdxInFileItr.next();
+ } else {
+ currentRowIdx = -1;
+ }
} catch (RecordMaterializationException e) {
// this might throw, but it's fatal if it does.
unmaterializableRecordCounter.incErrors(e);
@@ -265,4 +275,47 @@ class InternalParquetRecordReader<T> {
return Collections.unmodifiableMap(setMultiMap);
}
+ /**
+ * Returns the row index of the current row. If no row has been processed or if the
+ * row index information is unavailable from the underlying @{@link PageReadStore}, returns -1.
+ */
+ public long getCurrentRowIndex() {
+ if (current == 0L || rowIdxInFileItr == null) {
+ return -1;
+ }
+ return currentRowIdx;
+ }
+
+ /**
+ * Resets the row index iterator based on the current processed row group.
+ */
+ private void resetRowIndexIterator(PageReadStore pages) {
+ Optional<Long> rowGroupRowIdxOffset = pages.getRowIndexOffset();
+ if (!rowGroupRowIdxOffset.isPresent()) {
+ this.rowIdxInFileItr = null;
+ return;
+ }
+
+ currentRowIdx = -1;
+ final PrimitiveIterator.OfLong rowIdxInRowGroupItr;
+ if (pages.getRowIndexes().isPresent()) {
+ rowIdxInRowGroupItr = pages.getRowIndexes().get();
+ } else {
+ rowIdxInRowGroupItr = LongStream.range(0, pages.getRowCount()).iterator();
+ }
+ // Adjust the row group offset in the `rowIndexWithinRowGroupIterator` iterator.
+ this.rowIdxInFileItr = new PrimitiveIterator.OfLong() {
+ public long nextLong() {
+ return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.nextLong();
+ }
+
+ public boolean hasNext() {
+ return rowIdxInRowGroupItr.hasNext();
+ }
+
+ public Long next() {
+ return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.next();
+ }
+ };
+ }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 63a22d1..97fe86d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -929,7 +929,7 @@ public class ParquetFileReader implements Closeable {
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}
- ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount());
+ ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
ConsecutivePartList currentParts = null;
@@ -1044,7 +1044,7 @@ public class ParquetFileReader implements Closeable {
}
private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException {
- ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges);
+ ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges, block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
List<ConsecutivePartList> allParts = new ArrayList<>();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index c215f5e..6d76723 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -140,6 +140,16 @@ public class ParquetReader<T> implements Closeable {
}
}
+ /**
+ * @return the row index of the last read row. If no row has been processed, returns -1.
+ */
+ public long getCurrentRowIndex() {
+ if (reader == null) {
+ return -1;
+ }
+ return reader.getCurrentRowIndex();
+ }
+
private void initReader() throws IOException {
if (reader != null) {
reader.close();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index 4653410..e46ccdd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -207,6 +207,13 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
return internalReader.nextKeyValue();
}
+ /**
+ * @return the row index of the current row. If no row has been processed, returns -1.
+ */
+ public long getCurrentRowIndex() throws IOException {
+ return internalReader.getCurrentRowIndex();
+ }
+
private ParquetInputSplit toParquetSplit(InputSplit split) throws IOException {
if (split instanceof ParquetInputSplit) {
return (ParquetInputSplit) split;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
index ce204dc..4f9fd14 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
@@ -33,6 +33,7 @@ public class BlockMetaData {
private long totalByteSize;
private String path;
private int ordinal;
+ private long rowIndexOffset = -1;
public BlockMetaData() {
}
@@ -66,6 +67,18 @@ public class BlockMetaData {
}
/**
+ * @return -1 if the rowIndexOffset for the {@link BlockMetaData} is unavailable else returns the actual rowIndexOffset
+ */
+ public long getRowIndexOffset() { return rowIndexOffset; }
+
+ /**
+ * @param rowIndexOffset the rowIndexOffset to set
+ */
+ public void setRowIndexOffset(long rowIndexOffset) {
+ this.rowIndexOffset = rowIndexOffset;
+ }
+
+ /**
* @return the totalByteSize
*/
public long getTotalByteSize() {
@@ -105,7 +118,11 @@ public class BlockMetaData {
@Override
public String toString() {
- return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}";
+ String rowIndexOffsetStr = "";
+ if (rowIndexOffset != -1) {
+ rowIndexOffsetStr = ", rowIndexOffset = " + rowIndexOffset;
+ }
+ return "BlockMetaData{" + rowCount + ", " + totalByteSize + rowIndexOffsetStr + " " + columns + "}";
}
/**
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index 6355f35..1e74353 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -18,6 +18,8 @@
*/
package org.apache.parquet.filter2.recordlevel;
+import static org.junit.Assert.assertEquals;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -315,7 +317,7 @@ public class PhoneBookWriter {
}
}
- private static ParquetReader<Group> createReader(Path file, Filter filter) throws IOException {
+ public static ParquetReader<Group> createReader(Path file, Filter filter) throws IOException {
Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);
@@ -341,11 +343,24 @@ public class PhoneBookWriter {
}
public static List<User> readUsers(ParquetReader.Builder<Group> builder) throws IOException {
+ return readUsers(builder, false);
+ }
+
+ /**
+ * Returns a list of users from the underlying [[ParquetReader]] builder.
+ * If `validateRowIndexes` is set to true, this method will also validate the ROW_INDEXes for the
+ * rows read from ParquetReader - ROW_INDEX for a row should be same as underlying user id.
+ */
+ public static List<User> readUsers(ParquetReader.Builder<Group> builder, boolean validateRowIndexes) throws IOException {
ParquetReader<Group> reader = builder.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()).build();
List<User> users = new ArrayList<>();
for (Group group = reader.read(); group != null; group = reader.read()) {
- users.add(userFromGroup(group));
+ User u = userFromGroup(group);
+ users.add(u);
+ if (validateRowIndexes) {
+ assertEquals(reader.getCurrentRowIndex(), u.id);
+ }
}
return users;
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
index b07fccd..68a4e34 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
@@ -200,7 +200,7 @@ public class TestBloomFiltering {
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
.useBloomFilter(useBloomFilter)
- .useColumnIndexFilter(useOtherFiltering));
+ .useColumnIndexFilter(useOtherFiltering), true);
}
// Assumes that both lists are in the same order
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
index 5e18105..0678cbf 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -248,7 +248,7 @@ public class TestColumnIndexFiltering {
.useDictionaryFilter(useOtherFiltering)
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
- .useColumnIndexFilter(useColumnIndexFilter));
+ .useColumnIndexFilter(useColumnIndexFilter), true);
}
private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering,
@@ -261,7 +261,7 @@ public class TestColumnIndexFiltering {
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
.useColumnIndexFilter(useColumnIndexFilter)
- .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
+ .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()), true);
}
private FileDecryptionProperties getFileDecryptionProperties() {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
new file mode 100644
index 0000000..86f14a8
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestParquetReader {
+
+ private static final Path FILE_V1 = createTempFile();
+ private static final Path FILE_V2 = createTempFile();
+ private static final Path STATIC_FILE_WITHOUT_COL_INDEXES = createPathFromCP("/test-file-with-no-column-indexes-1.parquet");
+ private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(makeUsers(1000));
+
+ private final Path file;
+
+ private static Path createPathFromCP(String path) {
+ try {
+ return new Path(TestParquetReader.class.getResource(path).toURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public TestParquetReader(Path file) {
+ this.file = file;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ Object[][] data = new Object[][] {
+ { FILE_V1 },
+ { FILE_V2 },
+ { STATIC_FILE_WITHOUT_COL_INDEXES } };
+ return Arrays.asList(data);
+ }
+
+ @BeforeClass
+ public static void createFiles() throws IOException {
+ writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0);
+ writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
+ @AfterClass
+ public static void deleteFiles() throws IOException {
+ deleteFile(FILE_V1);
+ deleteFile(FILE_V2);
+ }
+
+ private static void deleteFile(Path file) throws IOException {
+ file.getFileSystem(new Configuration()).delete(file, false);
+ }
+
+ public static List<PhoneBookWriter.User> makeUsers(int rowCount) {
+ List<PhoneBookWriter.User> users = new ArrayList<>();
+ for (int i = 0; i < rowCount; i++) {
+ PhoneBookWriter.Location location = null;
+ if (i % 3 == 1) {
+ location = new PhoneBookWriter.Location((double)i, (double) i * 2);
+ }
+ if (i % 3 == 2) {
+ location = new PhoneBookWriter.Location((double)i, null);
+ }
+ // row index of each row in the file is same as the user id.
+ users.add(new PhoneBookWriter.User(i, "p" + i, Arrays.asList(new PhoneBookWriter.PhoneNumber(i, "cell")), location));
+ }
+ return users;
+ }
+
+ private static Path createTempFile() {
+ try {
+ return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
+ } catch (IOException e) {
+ throw new AssertionError("Unable to create temporary file", e);
+ }
+ }
+
+ private static void writePhoneBookToFile(Path file, ParquetProperties.WriterVersion parquetVersion) throws IOException {
+ int pageSize = DATA.size() / 10; // Ensure that several pages will be created
+ int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
+
+ PhoneBookWriter.write(ExampleParquetWriter.builder(file)
+ .withWriteMode(OVERWRITE)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withWriterVersion(parquetVersion),
+ DATA);
+ }
+
+ private List<PhoneBookWriter.User> readUsers(FilterCompat.Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+ throws IOException {
+ return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+ .withFilter(filter)
+ .useDictionaryFilter(useOtherFiltering)
+ .useStatsFilter(useOtherFiltering)
+ .useRecordFilter(useOtherFiltering)
+ .useColumnIndexFilter(useColumnIndexFilter), true);
+ }
+
+ @Test
+ public void testCurrentRowIndex() throws Exception {
+ ParquetReader<Group> reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP);
+ // Fetch row index without processing any row.
+ assertEquals(reader.getCurrentRowIndex(), -1);
+ reader.read();
+ assertEquals(reader.getCurrentRowIndex(), 0);
+ // calling the same API again and again should return same result.
+ assertEquals(reader.getCurrentRowIndex(), 0);
+
+ reader.read();
+ assertEquals(reader.getCurrentRowIndex(), 1);
+ assertEquals(reader.getCurrentRowIndex(), 1);
+ long expectedCurrentRowIndex = 2L;
+ while(reader.read() != null) {
+ assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex);
+ expectedCurrentRowIndex++;
+ }
+ // reader.read() returned null and so reader doesn't have any more rows.
+ assertEquals(reader.getCurrentRowIndex(), -1);
+ }
+
+ @Test
+ public void testSimpleFiltering() throws Exception {
+ Set<Long> idSet = new HashSet<>();
+ idSet.add(123l);
+ idSet.add(567l);
+ // The readUsers also validates the rowIndex for each returned row.
+ List<PhoneBookWriter.User> filteredUsers1 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, true);
+ assertEquals(filteredUsers1.size(), 2L);
+ List<PhoneBookWriter.User> filteredUsers2 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, false);
+ assertEquals(filteredUsers2.size(), 2L);
+ List<PhoneBookWriter.User> filteredUsers3 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), false, false);
+ assertEquals(filteredUsers3.size(), 1000L);
+ }
+
+ @Test
+ public void testNoFiltering() throws Exception {
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, false, false));
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, true, false));
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, false, true));
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, true, true));
+ }
+}
diff --git a/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet b/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet
new file mode 100644
index 0000000..722e687
Binary files /dev/null and b/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet differ