You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2018/08/17 15:06:52 UTC
[parquet-mr] branch column-indexes updated: PARQUET-1310: Column
indexes: Filtering (#509)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/column-indexes by this push:
new d8e78eb PARQUET-1310: Column indexes: Filtering (#509)
d8e78eb is described below
commit d8e78ebd8ea6fe9fc2a9a7e61d1eaa7121637ebd
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Fri Aug 17 17:06:48 2018 +0200
PARQUET-1310: Column indexes: Filtering (#509)
---
.../org/apache/parquet/column/ColumnReader.java | 3 +
.../parquet/column/impl/ColumnReadStoreImpl.java | 11 +-
...ColumnReaderImpl.java => ColumnReaderBase.java} | 74 ++-
.../parquet/column/impl/ColumnReaderImpl.java | 676 +--------------------
.../column/impl/SynchronizingColumnReader.java | 93 +++
.../org/apache/parquet/column/page/DataPage.java | 35 ++
.../org/apache/parquet/column/page/DataPageV1.java | 21 +
.../org/apache/parquet/column/page/DataPageV2.java | 48 +-
...e.java => NotInPageFilteringModeException.java} | 34 +-
.../apache/parquet/column/page/PageReadStore.java | 25 +-
.../columnindex/BinaryColumnIndexBuilder.java | 31 +-
.../columnindex/BooleanColumnIndexBuilder.java | 36 +-
.../internal/column/columnindex/BoundaryOrder.java | 330 +++++++++-
.../internal/column/columnindex/ColumnIndex.java | 9 +-
.../column/columnindex/ColumnIndexBuilder.java | 290 +++++++--
.../columnindex/DoubleColumnIndexBuilder.java | 35 +-
.../columnindex/FloatColumnIndexBuilder.java | 35 +-
.../internal/column/columnindex/IndexIterator.java | 98 +++
.../column/columnindex/IntColumnIndexBuilder.java | 35 +-
.../column/columnindex/LongColumnIndexBuilder.java | 35 +-
.../internal/column/columnindex/OffsetIndex.java | 12 +
.../filter2/columnindex/ColumnIndexFilter.java | 194 ++++++
.../filter2/columnindex/ColumnIndexStore.java | 55 ++
.../internal/filter2/columnindex/RowRanges.java | 251 ++++++++
.../column/columnindex/TestBoundaryOrder.java | 487 +++++++++++++++
.../column/columnindex/TestColumnIndexBuilder.java | 488 ++++++++++++++-
.../column/columnindex/TestIndexIterator.java | 63 ++
.../column/columnindex/TestOffsetIndexBuilder.java | 2 +
.../filter2/columnindex/TestColumnIndexFilter.java | 464 ++++++++++++++
.../filter2/columnindex/TestRowRanges.java | 155 +++++
.../java/org/apache/parquet/HadoopReadOptions.java | 9 +-
.../org/apache/parquet/ParquetReadOptions.java | 20 +-
.../parquet/hadoop/ColumnChunkPageReadStore.java | 123 +++-
.../parquet/hadoop/ColumnIndexFilterUtils.java | 157 +++++
.../parquet/hadoop/ColumnIndexStoreImpl.java | 155 +++++
.../hadoop/InternalParquetRecordReader.java | 6 +-
.../apache/parquet/hadoop/ParquetFileReader.java | 293 +++++++--
.../apache/parquet/hadoop/ParquetInputFormat.java | 5 +
.../org/apache/parquet/hadoop/ParquetReader.java | 10 +
.../hadoop/metadata/ColumnChunkMetaData.java | 2 -
.../filter2/recordlevel/PhoneBookWriter.java | 105 +++-
.../parquet/hadoop/TestColumnIndexFiltering.java | 442 ++++++++++++++
42 files changed, 4539 insertions(+), 913 deletions(-)
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
index 52d269e..6d93eee 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
@@ -41,7 +41,10 @@ public interface ColumnReader {
/**
* @return the totalCount of values to be consumed
+ * @deprecated will be removed in 2.0.0; Total values might not be able to be counted before reading the values (e.g.
+ * in case of column index based filtering)
*/
+ @Deprecated
long getTotalValueCount();
/**
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index 3784596..8066564 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -72,12 +72,13 @@ public class ColumnReadStoreImpl implements ColumnReadStore {
@Override
public ColumnReader getColumnReader(ColumnDescriptor path) {
- return newMemColumnReader(path, pageReadStore.getPageReader(path));
- }
-
- private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
PrimitiveConverter converter = getPrimitiveConverter(path);
- return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
+ PageReader pageReader = pageReadStore.getPageReader(path);
+ if (pageReadStore.isInPageFilteringMode()) {
+ return new SynchronizingColumnReader(path, pageReader, converter, writerVersion, pageReadStore.getRowIndexes());
+ } else {
+ return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
+ }
}
private PrimitiveConverter getPrimitiveConverter(ColumnDescriptor path) {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
similarity index 92%
copy from parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
copy to parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
index 8c85b37..0af85c7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
@@ -52,10 +52,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * ColumnReader implementation
+ * Base superclass for {@link ColumnReader} implementations.
*/
-public class ColumnReaderImpl implements ColumnReader {
- private static final Logger LOG = LoggerFactory.getLogger(ColumnReaderImpl.class);
+abstract class ColumnReaderBase implements ColumnReader {
+ private static final Logger LOG = LoggerFactory.getLogger(ColumnReaderBase.class);
/**
* binds the lower level page decoder to the record converter materializing the records
@@ -148,6 +148,7 @@ public class ColumnReaderImpl implements ColumnReader {
private final PrimitiveConverter converter;
private Binding binding;
+ private final int maxDefinitionLevel;
// this is needed because we will attempt to read the value twice when filtering
// TODO: rework that
@@ -328,11 +329,12 @@ public class ColumnReaderImpl implements ColumnReader {
* @param converter a converter that materializes the values in this column in the current record
* @param writerVersion writer version string from the Parquet file being read
*/
- public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter, ParsedVersion writerVersion) {
+ ColumnReaderBase(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter, ParsedVersion writerVersion) {
this.path = checkNotNull(path, "path");
this.pageReader = checkNotNull(pageReader, "pageReader");
this.converter = checkNotNull(converter, "converter");
this.writerVersion = writerVersion;
+ this.maxDefinitionLevel = path.getMaxDefinitionLevel();
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
try {
@@ -350,10 +352,9 @@ public class ColumnReaderImpl implements ColumnReader {
if (totalValueCount <= 0) {
throw new ParquetDecodingException("totalValueCount '" + totalValueCount + "' <= 0");
}
- consume();
}
- private boolean isFullyConsumed() {
+ boolean isFullyConsumed() {
return readValues >= totalValueCount;
}
@@ -508,25 +509,33 @@ public class ColumnReaderImpl implements ColumnReader {
return definitionLevel;
}
- // TODO: change the logic around read() to not tie together reading from the 3 columns
- private void readRepetitionAndDefinitionLevels() {
- repetitionLevel = repetitionLevelColumn.nextInt();
- definitionLevel = definitionLevelColumn.nextInt();
- ++readValues;
- }
-
private void checkRead() {
- if (isPageFullyConsumed()) {
- if (isFullyConsumed()) {
- LOG.debug("end reached");
- repetitionLevel = 0; // the next repetition level
- return;
+ int rl, dl;
+ for (;;) {
+ if (isPageFullyConsumed()) {
+ if (isFullyConsumed()) {
+ LOG.debug("end reached");
+ repetitionLevel = 0; // the next repetition level
+ return;
+ }
+ readPage();
+ }
+ rl = repetitionLevelColumn.nextInt();
+ dl = definitionLevelColumn.nextInt();
+ ++readValues;
+ if (!skipLevels(rl, dl)) {
+ break;
+ }
+ if (dl == maxDefinitionLevel) {
+ binding.skip();
}
- readPage();
}
- readRepetitionAndDefinitionLevels();
+ repetitionLevel = rl;
+ definitionLevel = dl;
}
+ abstract boolean skipLevels(int rl, int dl);
+
private void readPage() {
LOG.debug("loading page");
DataPage page = pageReader.readPage();
@@ -585,32 +594,42 @@ public class ColumnReaderImpl implements ColumnReader {
ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+ int valueCount = page.getValueCount();
try {
BytesInput bytes = page.getBytes();
- LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount);
+ LOG.debug("page size {} bytes and {} values", bytes.size(), valueCount);
LOG.debug("reading repetition levels at 0");
ByteBufferInputStream in = bytes.toInputStream();
- rlReader.initFromPage(pageValueCount, in);
+ rlReader.initFromPage(valueCount, in);
LOG.debug("reading definition levels at {}", in.position());
- dlReader.initFromPage(pageValueCount, in);
+ dlReader.initFromPage(valueCount, in);
LOG.debug("reading data at {}", in.position());
- initDataReader(page.getValueEncoding(), in, page.getValueCount());
+ initDataReader(page.getValueEncoding(), in, valueCount);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
+ newPageInitialized(page);
}
private void readPageV2(DataPageV2 page) {
this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
- LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
+ int valueCount = page.getValueCount();
+ LOG.debug("page data size {} bytes and {} values", page.getData().size(), valueCount);
try {
- initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
+ initDataReader(page.getDataEncoding(), page.getData().toInputStream(), valueCount);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
+ newPageInitialized(page);
+ }
+
+ final int getPageValueCount() {
+ return pageValueCount;
}
+ abstract void newPageInitialized(DataPage page);
+
private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
try {
if (maxLevel == 0) {
@@ -625,7 +644,7 @@ public class ColumnReaderImpl implements ColumnReader {
}
}
- private boolean isPageFullyConsumed() {
+ boolean isPageFullyConsumed() {
return readValues >= endOfPageValueCount;
}
@@ -643,6 +662,7 @@ public class ColumnReaderImpl implements ColumnReader {
* {@inheritDoc}
* @see org.apache.parquet.column.ColumnReader#getTotalValueCount()
*/
+ @Deprecated
@Override
public long getTotalValueCount() {
return totalValueCount;
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
index 8c85b37..b32fb0c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,675 +18,41 @@
*/
package org.apache.parquet.column.impl;
-import static java.lang.String.format;
-import static org.apache.parquet.Preconditions.checkNotNull;
-import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.VALUES;
-
-import java.io.IOException;
-
-import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.VersionParser.ParsedVersion;
-import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnReader;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.RequiresPreviousReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.PrimitiveConverter;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * ColumnReader implementation
+ * ColumnReader implementation for the scenario when column indexes are not used (all values are read)
*/
-public class ColumnReaderImpl implements ColumnReader {
- private static final Logger LOG = LoggerFactory.getLogger(ColumnReaderImpl.class);
-
- /**
- * binds the lower level page decoder to the record converter materializing the records
- */
- private static abstract class Binding {
-
- /**
- * read one value from the underlying page
- */
- abstract void read();
-
- /**
- * skip one value from the underlying page
- */
- abstract void skip();
-
- /**
- * write current value to converter
- */
- abstract void writeValue();
-
- /**
- * @return current value
- */
- public int getDictionaryId() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public int getInteger() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public boolean getBoolean() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public long getLong() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public Binary getBinary() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public float getFloat() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public double getDouble() {
- throw new UnsupportedOperationException();
- }
- }
-
- private final ParsedVersion writerVersion;
- private final ColumnDescriptor path;
- private final long totalValueCount;
- private final PageReader pageReader;
- private final Dictionary dictionary;
-
- private IntIterator repetitionLevelColumn;
- private IntIterator definitionLevelColumn;
- protected ValuesReader dataColumn;
- private Encoding currentEncoding;
-
- private int repetitionLevel;
- private int definitionLevel;
- private int dictionaryId;
-
- private long endOfPageValueCount;
- private long readValues = 0;
- private int pageValueCount = 0;
-
- private final PrimitiveConverter converter;
- private Binding binding;
-
- // this is needed because we will attempt to read the value twice when filtering
- // TODO: rework that
- private boolean valueRead;
-
- private void bindToDictionary(final Dictionary dictionary) {
- binding =
- new Binding() {
- void read() {
- dictionaryId = dataColumn.readValueDictionaryId();
- }
- public void skip() {
- dataColumn.skip();
- }
- public int getDictionaryId() {
- return dictionaryId;
- }
- void writeValue() {
- converter.addValueFromDictionary(dictionaryId);
- }
- public int getInteger() {
- return dictionary.decodeToInt(dictionaryId);
- }
- public boolean getBoolean() {
- return dictionary.decodeToBoolean(dictionaryId);
- }
- public long getLong() {
- return dictionary.decodeToLong(dictionaryId);
- }
- public Binary getBinary() {
- return dictionary.decodeToBinary(dictionaryId);
- }
- public float getFloat() {
- return dictionary.decodeToFloat(dictionaryId);
- }
- public double getDouble() {
- return dictionary.decodeToDouble(dictionaryId);
- }
- };
- }
-
- private void bind(PrimitiveTypeName type) {
- binding = type.convert(new PrimitiveTypeNameConverter<Binding, RuntimeException>() {
- @Override
- public Binding convertFLOAT(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- float current;
- void read() {
- current = dataColumn.readFloat();
- }
- public void skip() {
- current = 0;
- dataColumn.skip();
- }
- public float getFloat() {
- return current;
- }
- void writeValue() {
- converter.addFloat(current);
- }
- };
- }
- @Override
- public Binding convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- double current;
- void read() {
- current = dataColumn.readDouble();
- }
- public void skip() {
- current = 0;
- dataColumn.skip();
- }
- public double getDouble() {
- return current;
- }
- void writeValue() {
- converter.addDouble(current);
- }
- };
- }
- @Override
- public Binding convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- int current;
- void read() {
- current = dataColumn.readInteger();
- }
- public void skip() {
- current = 0;
- dataColumn.skip();
- }
- @Override
- public int getInteger() {
- return current;
- }
- void writeValue() {
- converter.addInt(current);
- }
- };
- }
- @Override
- public Binding convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- long current;
- void read() {
- current = dataColumn.readLong();
- }
- public void skip() {
- current = 0;
- dataColumn.skip();
- }
- @Override
- public long getLong() {
- return current;
- }
- void writeValue() {
- converter.addLong(current);
- }
- };
- }
- @Override
- public Binding convertINT96(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return this.convertBINARY(primitiveTypeName);
- }
- @Override
- public Binding convertFIXED_LEN_BYTE_ARRAY(
- PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return this.convertBINARY(primitiveTypeName);
- }
- @Override
- public Binding convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- boolean current;
- void read() {
- current = dataColumn.readBoolean();
- }
- public void skip() {
- current = false;
- dataColumn.skip();
- }
- @Override
- public boolean getBoolean() {
- return current;
- }
- void writeValue() {
- converter.addBoolean(current);
- }
- };
- }
- @Override
- public Binding convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- Binary current;
- void read() {
- current = dataColumn.readBytes();
- }
- public void skip() {
- current = null;
- dataColumn.skip();
- }
- @Override
- public Binary getBinary() {
- return current;
- }
- void writeValue() {
- converter.addBinary(current);
- }
- };
- }
- });
- }
+public class ColumnReaderImpl extends ColumnReaderBase {
/**
* creates a reader for triplets
- * @param path the descriptor for the corresponding column
- * @param pageReader the underlying store to read from
- * @param converter a converter that materializes the values in this column in the current record
- * @param writerVersion writer version string from the Parquet file being read
- */
- public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter, ParsedVersion writerVersion) {
- this.path = checkNotNull(path, "path");
- this.pageReader = checkNotNull(pageReader, "pageReader");
- this.converter = checkNotNull(converter, "converter");
- this.writerVersion = writerVersion;
- DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
- if (dictionaryPage != null) {
- try {
- this.dictionary = dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
- if (converter.hasDictionarySupport()) {
- converter.setDictionary(dictionary);
- }
- } catch (IOException e) {
- throw new ParquetDecodingException("could not decode the dictionary for " + path, e);
- }
- } else {
- this.dictionary = null;
- }
- this.totalValueCount = pageReader.getTotalValueCount();
- if (totalValueCount <= 0) {
- throw new ParquetDecodingException("totalValueCount '" + totalValueCount + "' <= 0");
- }
+ *
+ * @param path
+ * the descriptor for the corresponding column
+ * @param pageReader
+ * the underlying store to read from
+ * @param converter
+ * a converter that materializes the values in this column in the current record
+ * @param writerVersion
+ * writer version string from the Parquet file being read
+ */
+ public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter,
+ ParsedVersion writerVersion) {
+ super(path, pageReader, converter, writerVersion);
consume();
}
- private boolean isFullyConsumed() {
- return readValues >= totalValueCount;
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#writeCurrentValueToConverter()
- */
- @Override
- public void writeCurrentValueToConverter() {
- readValue();
- this.binding.writeValue();
- }
-
- @Override
- public int getCurrentValueDictionaryID() {
- readValue();
- return binding.getDictionaryId();
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getInteger()
- */
- @Override
- public int getInteger() {
- readValue();
- return this.binding.getInteger();
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getBoolean()
- */
- @Override
- public boolean getBoolean() {
- readValue();
- return this.binding.getBoolean();
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getLong()
- */
- @Override
- public long getLong() {
- readValue();
- return this.binding.getLong();
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getBinary()
- */
- @Override
- public Binary getBinary() {
- readValue();
- return this.binding.getBinary();
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getFloat()
- */
- @Override
- public float getFloat() {
- readValue();
- return this.binding.getFloat();
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getDouble()
- */
- @Override
- public double getDouble() {
- readValue();
- return this.binding.getDouble();
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getCurrentRepetitionLevel()
- */
- @Override
- public int getCurrentRepetitionLevel() {
- return repetitionLevel;
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getDescriptor()
- */
- @Override
- public ColumnDescriptor getDescriptor() {
- return path;
- }
-
- /**
- * Reads the value into the binding.
- */
- public void readValue() {
- try {
- if (!valueRead) {
- binding.read();
- valueRead = true;
- }
- } catch (RuntimeException e) {
- if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, currentEncoding) &&
- e instanceof ArrayIndexOutOfBoundsException) {
- // this is probably PARQUET-246, which may happen if reading data with
- // MR because this can't be detected without reading all footers
- throw new ParquetDecodingException("Read failure possibly due to " +
- "PARQUET-246: try setting parquet.split.files to false",
- new ParquetDecodingException(
- format("Can't read value in column %s at value %d out of %d, " +
- "%d out of %d in currentPage. repetition level: " +
- "%d, definition level: %d",
- path, readValues, totalValueCount,
- readValues - (endOfPageValueCount - pageValueCount),
- pageValueCount, repetitionLevel, definitionLevel),
- e));
- }
- throw new ParquetDecodingException(
- format("Can't read value in column %s at value %d out of %d, " +
- "%d out of %d in currentPage. repetition level: " +
- "%d, definition level: %d",
- path, readValues, totalValueCount,
- readValues - (endOfPageValueCount - pageValueCount),
- pageValueCount, repetitionLevel, definitionLevel),
- e);
- }
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#skip()
- */
- @Override
- public void skip() {
- if (!valueRead) {
- binding.skip();
- valueRead = true;
- }
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getCurrentDefinitionLevel()
- */
- @Override
- public int getCurrentDefinitionLevel() {
- return definitionLevel;
- }
-
- // TODO: change the logic around read() to not tie together reading from the 3 columns
- private void readRepetitionAndDefinitionLevels() {
- repetitionLevel = repetitionLevelColumn.nextInt();
- definitionLevel = definitionLevelColumn.nextInt();
- ++readValues;
- }
-
- private void checkRead() {
- if (isPageFullyConsumed()) {
- if (isFullyConsumed()) {
- LOG.debug("end reached");
- repetitionLevel = 0; // the next repetition level
- return;
- }
- readPage();
- }
- readRepetitionAndDefinitionLevels();
- }
-
- private void readPage() {
- LOG.debug("loading page");
- DataPage page = pageReader.readPage();
- page.accept(new DataPage.Visitor<Void>() {
- @Override
- public Void visit(DataPageV1 dataPageV1) {
- readPageV1(dataPageV1);
- return null;
- }
- @Override
- public Void visit(DataPageV2 dataPageV2) {
- readPageV2(dataPageV2);
- return null;
- }
- });
- }
-
- private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
- ValuesReader previousReader = this.dataColumn;
-
- this.currentEncoding = dataEncoding;
- this.pageValueCount = valueCount;
- this.endOfPageValueCount = readValues + pageValueCount;
-
- if (dataEncoding.usesDictionary()) {
- if (dictionary == null) {
- throw new ParquetDecodingException(
- "could not read page in col " + path + " as the dictionary was missing for encoding " + dataEncoding);
- }
- this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(path, VALUES, dictionary);
- } else {
- this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
- }
-
- if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
- bindToDictionary(dictionary);
- } else {
- bind(path.getType());
- }
-
- try {
- dataColumn.initFromPage(pageValueCount, in);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page in col " + path, e);
- }
-
- if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
- previousReader != null && previousReader instanceof RequiresPreviousReader) {
- // previous reader can only be set if reading sequentially
- ((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader);
- }
- }
-
- private void readPageV1(DataPageV1 page) {
- ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
- ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
- this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
- this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
- try {
- BytesInput bytes = page.getBytes();
- LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount);
- LOG.debug("reading repetition levels at 0");
- ByteBufferInputStream in = bytes.toInputStream();
- rlReader.initFromPage(pageValueCount, in);
- LOG.debug("reading definition levels at {}", in.position());
- dlReader.initFromPage(pageValueCount, in);
- LOG.debug("reading data at {}", in.position());
- initDataReader(page.getValueEncoding(), in, page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
- }
- }
-
- private void readPageV2(DataPageV2 page) {
- this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
- this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
- LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
- try {
- initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
- }
- }
-
- private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
- try {
- if (maxLevel == 0) {
- return new NullIntIterator();
- }
- return new RLEIntIterator(
- new RunLengthBitPackingHybridDecoder(
- BytesUtils.getWidthFromMaxInt(maxLevel),
- bytes.toInputStream()));
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read levels in page for col " + path, e);
- }
- }
-
- private boolean isPageFullyConsumed() {
- return readValues >= endOfPageValueCount;
- }
-
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#consume()
- */
@Override
- public void consume() {
- checkRead();
- valueRead = false;
+ boolean skipLevels(int rl, int dl) {
+ return false;
}
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.ColumnReader#getTotalValueCount()
- */
@Override
- public long getTotalValueCount() {
- return totalValueCount;
- }
-
- static abstract class IntIterator {
- abstract int nextInt();
- }
-
- static class ValuesReaderIntIterator extends IntIterator {
- ValuesReader delegate;
-
- public ValuesReaderIntIterator(ValuesReader delegate) {
- super();
- this.delegate = delegate;
- }
-
- @Override
- int nextInt() {
- return delegate.readInteger();
- }
- }
-
- static class RLEIntIterator extends IntIterator {
- RunLengthBitPackingHybridDecoder delegate;
-
- public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
- this.delegate = delegate;
- }
-
- @Override
- int nextInt() {
- try {
- return delegate.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
- }
- }
-
- private static final class NullIntIterator extends IntIterator {
- @Override
- int nextInt() {
- return 0;
- }
+ void newPageInitialized(DataPage page) {
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/SynchronizingColumnReader.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/SynchronizingColumnReader.java
new file mode 100644
index 0000000..444ef96
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/SynchronizingColumnReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import java.util.PrimitiveIterator;
+
+import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.PrimitiveConverter;
+
+/**
+ * A {@link ColumnReader} implementation for utilizing indexes. When filtering using column indexes, some of the rows
+ * may be loaded only partially, because rows are not synchronized across columns, thus pages containing other fields of
+ * a row may have been filtered out. In this case we can't assemble the row, but there is no need to do so either, since
+ * getting filtered out in another column means that it can not match the filter condition.
+ * <p>
+ * A {@link RecordReader} assembles rows by reading from each {@link ColumnReader}. Without filtering, when
+ * {@link RecordReader} starts reading a row, {@link ColumnReader}s are always positioned at the same row in respect to
+ * each other. With filtering, however, due to the misalignment described above, some of the pages read by
+ * {@link ColumnReader}s may contain values that have no corresponding values in other rows. This
+ * {@link SynchronizingColumnReader} is a column reader implementation that skips such values so that the values
+ * returned to {@link RecordReader} for the different fields all correspond to a single row.
+ *
+ * @see PageReadStore#isInPageFilteringMode()
+ */
+class SynchronizingColumnReader extends ColumnReaderBase {
+
+ private final PrimitiveIterator.OfLong rowIndexes;
+ private long currentRow;
+ private long targetRow;
+ private long lastRowInPage;
+ private int valuesReadFromPage;
+
+ SynchronizingColumnReader(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter,
+ ParsedVersion writerVersion, PrimitiveIterator.OfLong rowIndexes) {
+ super(path, pageReader, converter, writerVersion);
+ this.rowIndexes = rowIndexes;
+ targetRow = Long.MIN_VALUE;
+ consume();
+ }
+
+ @Override
+ boolean isPageFullyConsumed() {
+ return getPageValueCount() <= valuesReadFromPage || lastRowInPage < targetRow;
+ }
+
+ @Override
+ boolean isFullyConsumed() {
+ return !rowIndexes.hasNext();
+ }
+
+ @Override
+ boolean skipLevels(int rl, int dl) {
+ ++valuesReadFromPage;
+ if (rl == 0) {
+ ++currentRow;
+ if (currentRow > targetRow) {
+ targetRow = rowIndexes.hasNext() ? rowIndexes.nextLong() : Long.MAX_VALUE;
+ }
+ }
+ return currentRow < targetRow;
+ }
+
+ @Override
+ protected void newPageInitialized(DataPage page) {
+ long firstRowIndex = page.getFirstRowIndex();
+ currentRow = firstRowIndex - 1;
+ lastRowInPage = firstRowIndex + page.getRowCount() - 1;
+ valuesReadFromPage = 0;
+ }
+
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
index 4d8f381..0751d4c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
@@ -24,10 +24,18 @@ package org.apache.parquet.column.page;
abstract public class DataPage extends Page {
private final int valueCount;
+ private final long firstRowIndex;
+ private final int rowCount;
DataPage(int compressedSize, int uncompressedSize, int valueCount) {
+ this(compressedSize, uncompressedSize, valueCount, -1, -1);
+ }
+
+ DataPage(int compressedSize, int uncompressedSize, int valueCount, long firstRowIndex, int rowCount) {
super(compressedSize, uncompressedSize);
this.valueCount = valueCount;
+ this.firstRowIndex = firstRowIndex;
+ this.rowCount = rowCount;
}
/**
@@ -37,6 +45,33 @@ abstract public class DataPage extends Page {
return valueCount;
}
+ /**
+ * @return the index of the first row in this page
+ * @throws NotInPageFilteringModeException
+ * if page filtering mode is not active
+ * @see PageReadStore#isInPageFilteringMode()
+ */
+ public long getFirstRowIndex() {
+ if (firstRowIndex < 0) {
+ throw new NotInPageFilteringModeException("First row index is not available");
+ }
+ return firstRowIndex;
+ }
+
+ /**
+ * @return the number of rows in this page
+ * @throws NotInPageFilteringModeException
+ * if page filtering mode is not active; thrown only in case of {@link DataPageV1}
+ * @see PageReadStore#isInPageFilteringMode()
+ */
+ public int getRowCount() {
+ if (rowCount < 0) {
+ throw new NotInPageFilteringModeException(
+ "Row count is not available");
+ }
+ return rowCount;
+ }
+
public abstract <T> T accept(Visitor<T> visitor);
public static interface Visitor<T> {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
index 56928c3..42fc635 100755
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
@@ -50,6 +50,27 @@ public class DataPageV1 extends DataPage {
}
/**
+ * @param bytes the bytes for this page
+ * @param valueCount count of values in this page
+ * @param uncompressedSize the uncompressed size of the page
+ * @param firstRowIndex the index of the first row in this page
+ * @param rowCount the number of rows in this page
+ * @param statistics of the page's values (max, min, num_null)
+ * @param rlEncoding the repetition level encoding for this page
+ * @param dlEncoding the definition level encoding for this page
+ * @param valuesEncoding the values encoding for this page
+ */
+ public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, long firstRowIndex, int rowCount,
+ Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
+ super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount, firstRowIndex, rowCount);
+ this.bytes = bytes;
+ this.statistics = statistics;
+ this.rlEncoding = rlEncoding;
+ this.dlEncoding = dlEncoding;
+ this.valuesEncoding = valuesEncoding;
+ }
+
+ /**
* @return the bytes for the page
*/
public BytesInput getBytes() {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
index 62dac83..7ec902d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
@@ -54,6 +54,32 @@ public class DataPageV2 extends DataPage {
* @param rowCount count of rows
* @param nullCount count of nulls
* @param valueCount count of values
+ * @param firstRowIndex the index of the first row in this page
+ * @param repetitionLevels RLE encoded repetition levels
+ * @param definitionLevels RLE encoded definition levels
+ * @param dataEncoding encoding for the data
+ * @param data data encoded with dataEncoding
+ * @param statistics optional statistics for this page
+ * @return an uncompressed page
+ */
+ public static DataPageV2 uncompressed(
+ int rowCount, int nullCount, int valueCount, long firstRowIndex,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ Statistics<?> statistics) {
+ return new DataPageV2(
+ rowCount, nullCount, valueCount, firstRowIndex,
+ repetitionLevels, definitionLevels,
+ dataEncoding, data,
+ Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()),
+ statistics,
+ false);
+ }
+
+ /**
+ * @param rowCount count of rows
+ * @param nullCount count of nulls
+ * @param valueCount count of values
* @param repetitionLevels RLE encoded repetition levels
* @param definitionLevels RLE encoded definition levels
* @param dataEncoding encoding for the data
@@ -77,7 +103,6 @@ public class DataPageV2 extends DataPage {
true);
}
- private final int rowCount;
private final int nullCount;
private final BytesInput repetitionLevels;
private final BytesInput definitionLevels;
@@ -93,8 +118,7 @@ public class DataPageV2 extends DataPage {
int uncompressedSize,
Statistics<?> statistics,
boolean isCompressed) {
- super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount);
- this.rowCount = rowCount;
+ super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount, -1, rowCount);
this.nullCount = nullCount;
this.repetitionLevels = repetitionLevels;
this.definitionLevels = definitionLevels;
@@ -104,8 +128,22 @@ public class DataPageV2 extends DataPage {
this.isCompressed = isCompressed;
}
- public int getRowCount() {
- return rowCount;
+ private DataPageV2(
+ int rowCount, int nullCount, int valueCount, long firstRowIndex,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ int uncompressedSize,
+ Statistics<?> statistics,
+ boolean isCompressed) {
+ super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize,
+ valueCount, firstRowIndex, rowCount);
+ this.nullCount = nullCount;
+ this.repetitionLevels = repetitionLevels;
+ this.definitionLevels = definitionLevels;
+ this.dataEncoding = dataEncoding;
+ this.data = data;
+ this.statistics = statistics;
+ this.isCompressed = isCompressed;
}
public int getNullCount() {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/NotInPageFilteringModeException.java
similarity index 65%
copy from parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
copy to parquet-column/src/main/java/org/apache/parquet/column/page/NotInPageFilteringModeException.java
index 24d5825..839c199 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/NotInPageFilteringModeException.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,26 +18,20 @@
*/
package org.apache.parquet.column.page;
-import org.apache.parquet.column.ColumnDescriptor;
-
/**
- * contains all the readers for all the columns of the corresponding row group
- *
- * TODO: rename to RowGroup?
+ * Exception thrown if an operation has no meaning when page filtering mode is not active.
+ *
+ * @see PageReadStore#isInPageFilteringMode()
*/
-public interface PageReadStore {
+public class NotInPageFilteringModeException extends IllegalStateException {
/**
- *
- * @param descriptor the descriptor of the column
- * @return the page reader for that column
+ * Constructs an exception with the specified message
+ *
+ * @param msg
+ * the message of the exception
*/
- PageReader getPageReader(ColumnDescriptor descriptor);
-
- /**
- *
- * @return the total number of rows in that row group
- */
- long getRowCount();
-
+ public NotInPageFilteringModeException(String msg) {
+ super(msg);
+ }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
index 24d5825..0fc26ed 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.column.page;
+import java.util.PrimitiveIterator;
import org.apache.parquet.column.ColumnDescriptor;
/**
@@ -29,7 +30,8 @@ public interface PageReadStore {
/**
*
- * @param descriptor the descriptor of the column
+ * @param descriptor
+ * the descriptor of the column
* @return the page reader for that column
*/
PageReader getPageReader(ColumnDescriptor descriptor);
@@ -40,4 +42,25 @@ public interface PageReadStore {
*/
long getRowCount();
+ /**
+ * Returns the indexes of the rows to be read/built. All the rows which index is not returned shall be skipped.
+ *
+ * @return the incremental iterator of the row indexes
+ * @throws NotInPageFilteringModeException
+ * if page filtering mode is not active so the related information is not available
+ * @see #isInPageFilteringMode()
+ */
+ default PrimitiveIterator.OfLong getRowIndexes() {
+ throw new NotInPageFilteringModeException("Row indexes are not available");
+ }
+
+ /**
+ * If page filtering mode is active then some values might have to be skipped to get the rows in synch between the
+ * pages.
+ *
+ * @return {@code true} if page filtering mode is active; {@code false} otherwise
+ */
+ default boolean isInPageFilteringMode() {
+ return false;
+ }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
index 950b70f..490cc3e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
@@ -22,12 +22,13 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveType;
class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
- private static class BinaryColumnIndex extends ColumnIndexBase {
+ private static class BinaryColumnIndex extends ColumnIndexBase<Binary> {
private Binary[] minValues;
private Binary[] maxValues;
@@ -54,6 +55,28 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
String getMaxValueAsString(int pageIndex) {
return stringifier.stringify(maxValues[pageIndex]);
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+ return (Statistics<T>) new Statistics<Binary>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+ }
+
+ @Override
+ ValueComparator createValueComparator(Object value) {
+ final Binary v = (Binary) value;
+ return new ValueComparator() {
+ @Override
+ int compareValueToMin(int arrayIndex) {
+ return comparator.compare(v, minValues[arrayIndex]);
+ }
+
+ @Override
+ int compareValueToMax(int arrayIndex) {
+ return comparator.compare(v, maxValues[arrayIndex]);
+ }
+ };
+ }
}
private final List<Binary> minValues = new ArrayList<>();
@@ -76,8 +99,8 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
- minValues.add(min == null ? null : convert(min));
- maxValues.add(max == null ? null : convert(max));
+ minValues.add(convert(min));
+ maxValues.add(convert(max));
}
@Override
@@ -87,7 +110,7 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
}
@Override
- ColumnIndexBase createColumnIndex(PrimitiveType type) {
+ ColumnIndexBase<Binary> createColumnIndex(PrimitiveType type) {
BinaryColumnIndex columnIndex = new BinaryColumnIndex(type);
columnIndex.minValues = minValues.toArray(new Binary[minValues.size()]);
columnIndex.maxValues = maxValues.toArray(new Binary[maxValues.size()]);
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
index 3053f78..233bd1b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
@@ -19,7 +19,7 @@
package org.apache.parquet.internal.column.columnindex;
import java.nio.ByteBuffer;
-
+import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveType;
@@ -28,7 +28,7 @@ import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import it.unimi.dsi.fastutil.booleans.BooleanList;
class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
- private static class BooleanColumnIndex extends ColumnIndexBase {
+ private static class BooleanColumnIndex extends ColumnIndexBase<Boolean> {
private boolean[] minValues;
private boolean[] maxValues;
@@ -55,6 +55,28 @@ class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
String getMaxValueAsString(int pageIndex) {
return stringifier.stringify(maxValues[pageIndex]);
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+ return (Statistics<T>) new Statistics<Boolean>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+ }
+
+ @Override
+ ValueComparator createValueComparator(Object value) {
+ final boolean v = (boolean) value;
+ return new ValueComparator() {
+ @Override
+ int compareValueToMin(int arrayIndex) {
+ return comparator.compare(v, minValues[arrayIndex]);
+ }
+
+ @Override
+ int compareValueToMax(int arrayIndex) {
+ return comparator.compare(v, maxValues[arrayIndex]);
+ }
+ };
+ }
}
private final BooleanList minValues = new BooleanArrayList();
@@ -70,18 +92,18 @@ class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
- minValues.add(min == null ? false : convert(min));
- maxValues.add(max == null ? false : convert(max));
+ minValues.add(convert(min));
+ maxValues.add(convert(max));
}
@Override
void addMinMax(Object min, Object max) {
- minValues.add(min == null ? false : (boolean) min);
- maxValues.add(max == null ? false : (boolean) max);
+ minValues.add((boolean) min);
+ maxValues.add((boolean) max);
}
@Override
- ColumnIndexBase createColumnIndex(PrimitiveType type) {
+ ColumnIndexBase<Boolean> createColumnIndex(PrimitiveType type) {
BooleanColumnIndex columnIndex = new BooleanColumnIndex(type);
columnIndex.minValues = minValues.toBooleanArray();
columnIndex.maxValues = maxValues.toBooleanArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
index 5d82815..e47b5b3 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
@@ -18,9 +18,335 @@
*/
package org.apache.parquet.internal.column.columnindex;
+import java.util.PrimitiveIterator;
+import java.util.PrimitiveIterator.OfInt;
+
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder.ColumnIndexBase;
+
/**
- * Enum for {@link org.apache.parquet.format.BoundaryOrder}.
+ * Enum for {@link org.apache.parquet.format.BoundaryOrder}. It also contains the implementations of searching for
+ * matching page indexes for column index based filtering.
*/
public enum BoundaryOrder {
- UNORDERED, ASCENDING, DESCENDING;
+ UNORDERED {
+ @Override
+ PrimitiveIterator.OfInt eq(ColumnIndexBase<?>.ValueComparator comparator) {
+ return IndexIterator.filterTranslate(comparator.arrayLength(),
+ arrayIndex -> comparator.compareValueToMin(arrayIndex) >= 0 && comparator.compareValueToMax(arrayIndex) <= 0,
+ comparator::translate);
+ }
+
+ @Override
+ PrimitiveIterator.OfInt gt(ColumnIndexBase<?>.ValueComparator comparator) {
+ return IndexIterator.filterTranslate(comparator.arrayLength(),
+ arrayIndex -> comparator.compareValueToMax(arrayIndex) < 0,
+ comparator::translate);
+ }
+
+ @Override
+ PrimitiveIterator.OfInt gtEq(ColumnIndexBase<?>.ValueComparator comparator) {
+ return IndexIterator.filterTranslate(comparator.arrayLength(),
+ arrayIndex -> comparator.compareValueToMax(arrayIndex) <= 0,
+ comparator::translate);
+ }
+
+ @Override
+ PrimitiveIterator.OfInt lt(ColumnIndexBase<?>.ValueComparator comparator) {
+ return IndexIterator.filterTranslate(comparator.arrayLength(),
+ arrayIndex -> comparator.compareValueToMin(arrayIndex) > 0,
+ comparator::translate);
+ }
+
+ @Override
+ PrimitiveIterator.OfInt ltEq(ColumnIndexBase<?>.ValueComparator comparator) {
+ return IndexIterator.filterTranslate(comparator.arrayLength(),
+ arrayIndex -> comparator.compareValueToMin(arrayIndex) >= 0,
+ comparator::translate);
+ }
+
+ @Override
+ PrimitiveIterator.OfInt notEq(ColumnIndexBase<?>.ValueComparator comparator) {
+ return IndexIterator.filterTranslate(comparator.arrayLength(),
+ arrayIndex -> comparator.compareValueToMin(arrayIndex) != 0 || comparator.compareValueToMax(arrayIndex) != 0,
+ comparator::translate);
+ }
+ },
+ ASCENDING {
+ @Override
+ OfInt eq(ColumnIndexBase<?>.ValueComparator comparator) {
+ Bounds bounds = findBounds(comparator);
+ if (bounds == null) {
+ return IndexIterator.EMPTY;
+ }
+ return IndexIterator.rangeTranslate(bounds.lower, bounds.upper, comparator::translate);
+ }
+
+ @Override
+ OfInt gt(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int left = 0;
+ int right = length;
+ do {
+ int i = floorMid(left, right);
+ if (comparator.compareValueToMax(i) >= 0) {
+ left = i + 1;
+ } else {
+ right = i;
+ }
+ } while (left < right);
+ return IndexIterator.rangeTranslate(right, length - 1, comparator::translate);
+ }
+
+ @Override
+ OfInt gtEq(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int left = 0;
+ int right = length;
+ do {
+ int i = floorMid(left, right);
+ if (comparator.compareValueToMax(i) > 0) {
+ left = i + 1;
+ } else {
+ right = i;
+ }
+ } while (left < right);
+ return IndexIterator.rangeTranslate(right, length - 1, comparator::translate);
+ }
+
+ @Override
+ OfInt lt(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int left = -1;
+ int right = length - 1;
+ do {
+ int i = ceilingMid(left, right);
+ if (comparator.compareValueToMin(i) <= 0) {
+ right = i - 1;
+ } else {
+ left = i;
+ }
+ } while (left < right);
+ return IndexIterator.rangeTranslate(0, left, comparator::translate);
+ }
+
+ @Override
+ OfInt ltEq(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int left = -1;
+ int right = length - 1;
+ do {
+ int i = ceilingMid(left, right);
+ if (comparator.compareValueToMin(i) < 0) {
+ right = i - 1;
+ } else {
+ left = i;
+ }
+ } while (left < right);
+ return IndexIterator.rangeTranslate(0, left, comparator::translate);
+ }
+
+ @Override
+ OfInt notEq(ColumnIndexBase<?>.ValueComparator comparator) {
+ Bounds bounds = findBounds(comparator);
+ int length = comparator.arrayLength();
+ if (bounds == null) {
+ return IndexIterator.all(comparator);
+ }
+ return IndexIterator.filterTranslate(
+ length,
+ i -> i < bounds.lower || i > bounds.upper || comparator.compareValueToMin(i) != 0
+ || comparator.compareValueToMax(i) != 0,
+ comparator::translate);
+ }
+
+ private Bounds findBounds(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int lowerLeft = 0;
+ int upperLeft = 0;
+ int lowerRight = length - 1;
+ int upperRight = length - 1;
+ do {
+ if (lowerLeft > lowerRight) {
+ return null;
+ }
+ int i = floorMid(lowerLeft, lowerRight);
+ if (comparator.compareValueToMin(i) < 0) {
+ lowerRight = upperRight = i - 1;
+ } else if (comparator.compareValueToMax(i) > 0) {
+ lowerLeft = upperLeft = i + 1;
+ } else {
+ lowerRight = upperLeft = i;
+ }
+ } while (lowerLeft != lowerRight);
+ do {
+ if (upperLeft > upperRight) {
+ return null;
+ }
+ int i = ceilingMid(upperLeft, upperRight);
+ if (comparator.compareValueToMin(i) < 0) {
+ upperRight = i - 1;
+ } else if (comparator.compareValueToMax(i) > 0) {
+ upperLeft = i + 1;
+ } else {
+ upperLeft = i;
+ }
+ } while (upperLeft != upperRight);
+ return new Bounds(lowerLeft, upperRight);
+ }
+ },
+ DESCENDING {
+ @Override
+ OfInt eq(ColumnIndexBase<?>.ValueComparator comparator) {
+ Bounds bounds = findBounds(comparator);
+ if (bounds == null) {
+ return IndexIterator.EMPTY;
+ }
+ return IndexIterator.rangeTranslate(bounds.lower, bounds.upper, comparator::translate);
+ }
+
+ @Override
+ OfInt gt(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int left = -1;
+ int right = length - 1;
+ do {
+ int i = ceilingMid(left, right);
+ if (comparator.compareValueToMax(i) >= 0) {
+ right = i - 1;
+ } else {
+ left = i;
+ }
+ } while (left < right);
+ return IndexIterator.rangeTranslate(0, left, comparator::translate);
+ }
+
+ @Override
+ OfInt gtEq(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int left = -1;
+ int right = length - 1;
+ do {
+ int i = ceilingMid(left, right);
+ if (comparator.compareValueToMax(i) > 0) {
+ right = i - 1;
+ } else {
+ left = i;
+ }
+ } while (left < right);
+ return IndexIterator.rangeTranslate(0, left, comparator::translate);
+ }
+
+ @Override
+ OfInt lt(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int left = 0;
+ int right = length;
+ do {
+ int i = floorMid(left, right);
+ if (comparator.compareValueToMin(i) <= 0) {
+ left = i + 1;
+ } else {
+ right = i;
+ }
+ } while (left < right);
+ return IndexIterator.rangeTranslate(right, length - 1, comparator::translate);
+ }
+
+ @Override
+ OfInt ltEq(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int left = 0;
+ int right = length;
+ do {
+ int i = floorMid(left, right);
+ if (comparator.compareValueToMin(i) < 0) {
+ left = i + 1;
+ } else {
+ right = i;
+ }
+ } while (left < right);
+ return IndexIterator.rangeTranslate(right, length - 1, comparator::translate);
+ }
+
+ @Override
+ OfInt notEq(ColumnIndexBase<?>.ValueComparator comparator) {
+ Bounds bounds = findBounds(comparator);
+ int length = comparator.arrayLength();
+ if (bounds == null) {
+ return IndexIterator.all(comparator);
+ }
+ return IndexIterator.filterTranslate(
+ length,
+ i -> i < bounds.lower || i > bounds.upper || comparator.compareValueToMin(i) != 0
+ || comparator.compareValueToMax(i) != 0,
+ comparator::translate);
+ }
+
+ private Bounds findBounds(ColumnIndexBase<?>.ValueComparator comparator) {
+ int length = comparator.arrayLength();
+ int lowerLeft = 0;
+ int upperLeft = 0;
+ int lowerRight = length - 1;
+ int upperRight = length - 1;
+ do {
+ if (lowerLeft > lowerRight) {
+ return null;
+ }
+ int i = floorMid(lowerLeft, lowerRight);
+ if (comparator.compareValueToMax(i) > 0) {
+ lowerRight = upperRight = i - 1;
+ } else if (comparator.compareValueToMin(i) < 0) {
+ lowerLeft = upperLeft = i + 1;
+ } else {
+ lowerRight = upperLeft = i;
+ }
+ } while (lowerLeft != lowerRight);
+ do {
+ if (upperLeft > upperRight) {
+ return null;
+ }
+ int i = ceilingMid(upperLeft, upperRight);
+ if (comparator.compareValueToMax(i) > 0) {
+ upperRight = i - 1;
+ } else if (comparator.compareValueToMin(i) < 0) {
+ upperLeft = i + 1;
+ } else {
+ upperLeft = i;
+ }
+ } while (upperLeft != upperRight);
+ return new Bounds(lowerLeft, upperRight);
+ }
+ };
+
+ private static class Bounds {
+ final int lower, upper;
+
+ Bounds(int lower, int upper) {
+ assert lower <= upper;
+ this.lower = lower;
+ this.upper = upper;
+ }
+ }
+
+ private static int floorMid(int left, int right) {
+ // Avoid the possible overflow might happen in case of (left + right) / 2
+ return left + ((right - left) / 2);
+ }
+
+ private static int ceilingMid(int left, int right) {
+ // Avoid the possible overflow might happen in case of (left + right + 1) / 2
+ return left + ((right - left + 1) / 2);
+ }
+
+ abstract PrimitiveIterator.OfInt eq(ColumnIndexBase<?>.ValueComparator comparator);
+
+ abstract PrimitiveIterator.OfInt gt(ColumnIndexBase<?>.ValueComparator comparator);
+
+ abstract PrimitiveIterator.OfInt gtEq(ColumnIndexBase<?>.ValueComparator comparator);
+
+ abstract PrimitiveIterator.OfInt lt(ColumnIndexBase<?>.ValueComparator comparator);
+
+ abstract PrimitiveIterator.OfInt ltEq(ColumnIndexBase<?>.ValueComparator comparator);
+
+ abstract PrimitiveIterator.OfInt notEq(ColumnIndexBase<?>.ValueComparator comparator);
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
index f7bd16b..b91a5c0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
@@ -20,13 +20,18 @@ package org.apache.parquet.internal.column.columnindex;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.PrimitiveIterator;
+
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
/**
- * Column index containing min/max and null count values for the pages in a column chunk.
+ * Column index containing min/max and null count values for the pages in a column chunk. It also implements methods of
+ * {@link Visitor} to return the indexes of the matching pages. They are used by {@link ColumnIndexFilter}.
*
* @see org.apache.parquet.format.ColumnIndex
*/
-public interface ColumnIndex {
+public interface ColumnIndex extends Visitor<PrimitiveIterator.OfInt> {
/**
* @return the boundary order of the min/max values; used for converting to the related thrift object
*/
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
index 6d05558..9633b61 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -26,8 +26,22 @@ import java.util.EnumMap;
import java.util.Formatter;
import java.util.List;
import java.util.Map;
+import java.util.PrimitiveIterator;
+import java.util.function.IntPredicate;
import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveStringifier;
@@ -37,29 +51,53 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import it.unimi.dsi.fastutil.booleans.BooleanList;
import it.unimi.dsi.fastutil.booleans.BooleanLists;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import it.unimi.dsi.fastutil.longs.LongLists;
/**
- * Builder implementation to create {@link ColumnIndex} objects during writing a parquet file.
+ * Builder implementation to create {@link ColumnIndex} objects.
*/
public abstract class ColumnIndexBuilder {
- static abstract class ColumnIndexBase implements ColumnIndex {
+ static abstract class ColumnIndexBase<C> implements ColumnIndex {
+ /*
+ * A class containing the value to be compared to the min/max values. This way we only need to do the deboxing once
+ * per predicate execution instead for every comparison.
+ */
+ abstract class ValueComparator {
+ abstract int compareValueToMin(int arrayIndex);
+
+ abstract int compareValueToMax(int arrayIndex);
+
+ int arrayLength() {
+ return pageIndexes.length;
+ }
+
+ int translate(int arrayIndex) {
+ return pageIndexes[arrayIndex];
+ }
+ }
+
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
private static final int MAX_VALUE_LENGTH_FOR_TOSTRING = 40;
private static final String TOSTRING_TRUNCATION_MARKER = "(...)";
- private static final int TOSTRING_TRUNCATION_START_POS =
- (MAX_VALUE_LENGTH_FOR_TOSTRING - TOSTRING_TRUNCATION_MARKER.length()) / 2;
- private static final int TOSTRING_TRUNCATION_END_POS =
- MAX_VALUE_LENGTH_FOR_TOSTRING - TOSTRING_TRUNCATION_MARKER.length() - TOSTRING_TRUNCATION_START_POS;
+ private static final int TOSTRING_TRUNCATION_START_POS = (MAX_VALUE_LENGTH_FOR_TOSTRING
+ - TOSTRING_TRUNCATION_MARKER.length()) / 2;
+ private static final int TOSTRING_TRUNCATION_END_POS = MAX_VALUE_LENGTH_FOR_TOSTRING
+ - TOSTRING_TRUNCATION_MARKER.length() - TOSTRING_TRUNCATION_START_POS;
private static final String TOSTRING_MISSING_VALUE_MARKER = "<none>";
final PrimitiveStringifier stringifier;
- final PrimitiveComparator<Binary> comparator;
+ final PrimitiveComparator<C> comparator;
private boolean[] nullPages;
private BoundaryOrder boundaryOrder;
+ // Storing the page index for each array index (min/max values are not stored for null-pages)
+ private int[] pageIndexes;
// might be null
private long[] nullCounts;
@@ -97,11 +135,12 @@ public abstract class ColumnIndexBuilder {
@Override
public List<ByteBuffer> getMinValues() {
List<ByteBuffer> list = new ArrayList<>(getPageCount());
+ int arrayIndex = 0;
for (int i = 0, n = getPageCount(); i < n; ++i) {
if (isNullPage(i)) {
list.add(EMPTY_BYTE_BUFFER);
} else {
- list.add(getMinValueAsBytes(i));
+ list.add(getMinValueAsBytes(arrayIndex++));
}
}
return list;
@@ -110,11 +149,12 @@ public abstract class ColumnIndexBuilder {
@Override
public List<ByteBuffer> getMaxValues() {
List<ByteBuffer> list = new ArrayList<>(getPageCount());
+ int arrayIndex = 0;
for (int i = 0, n = getPageCount(); i < n; ++i) {
if (isNullPage(i)) {
list.add(EMPTY_BYTE_BUFFER);
} else {
- list.add(getMaxValueAsBytes(i));
+ list.add(getMaxValueAsBytes(arrayIndex++));
}
}
return list;
@@ -127,14 +167,15 @@ public abstract class ColumnIndexBuilder {
String minMaxPart = " %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s\n";
formatter.format("%-10s %20s" + minMaxPart, "", "null count", "min", "max");
String format = "page-%-5d %20s" + minMaxPart;
+ int arrayIndex = 0;
for (int i = 0, n = nullPages.length; i < n; ++i) {
String nullCount = nullCounts == null ? TOSTRING_MISSING_VALUE_MARKER : Long.toString(nullCounts[i]);
String min, max;
if (nullPages[i]) {
min = max = TOSTRING_MISSING_VALUE_MARKER;
} else {
- min = truncate(getMinValueAsString(i));
- max = truncate(getMaxValueAsString(i));
+ min = truncate(getMinValueAsString(arrayIndex));
+ max = truncate(getMaxValueAsString(arrayIndex++));
}
formatter.format(format, i, nullCount, min, max);
}
@@ -150,13 +191,164 @@ public abstract class ColumnIndexBuilder {
return nullPages[pageIndex];
}
- abstract ByteBuffer getMinValueAsBytes(int pageIndex);
+ /*
+ * Returns the min value for arrayIndex as a ByteBuffer. (Min values are not stored for null-pages so arrayIndex
+ * might not equal to pageIndex.)
+ */
+ abstract ByteBuffer getMinValueAsBytes(int arrayIndex);
+
+ /*
+ * Returns the max value for arrayIndex as a ByteBuffer. (Max values are not stored for null-pages so arrayIndex
+ * might not equal to pageIndex.)
+ */
+ abstract ByteBuffer getMaxValueAsBytes(int arrayIndex);
+
+ /*
+ * Returns the min value for arrayIndex as a String. (Min values are not stored for null-pages so arrayIndex might
+ * not equal to pageIndex.)
+ */
+ abstract String getMinValueAsString(int arrayIndex);
+
+ /*
+ * Returns the max value for arrayIndex as a String. (Max values are not stored for null-pages so arrayIndex might
+ * not equal to pageIndex.)
+ */
+ abstract String getMaxValueAsString(int arrayIndex);
+
+ /* Creates a Statistics object for filtering. Used for user defined predicates. */
+ abstract <T extends Comparable<T>> org.apache.parquet.filter2.predicate.Statistics<T> createStats(int arrayIndex);
+
+ /* Creates a ValueComparator object containing the specified value to be compared for min/max values */
+ abstract ValueComparator createValueComparator(Object value);
+
+ @Override
+ public PrimitiveIterator.OfInt visit(And and) {
+ throw new UnsupportedOperationException("AND shall not be used on column index directly");
+ }
+
+ @Override
+ public PrimitiveIterator.OfInt visit(Not not) {
+ throw new UnsupportedOperationException("NOT shall not be used on column index directly");
+ }
+
+ @Override
+ public PrimitiveIterator.OfInt visit(Or or) {
+ throw new UnsupportedOperationException("OR shall not be used on column index directly");
+ }
+
+ @Override
+ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Eq<T> eq) {
+ T value = eq.getValue();
+ if (value == null) {
+ if (nullCounts == null) {
+ // Searching for nulls so if we don't have null related statistics we have to return all pages
+ return IndexIterator.all(getPageCount());
+ } else {
+ return IndexIterator.filter(getPageCount(), pageIndex -> nullCounts[pageIndex] > 0);
+ }
+ }
+ return getBoundaryOrder().eq(createValueComparator(value));
+ }
+
+ @Override
+ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Gt<T> gt) {
+ return getBoundaryOrder().gt(createValueComparator(gt.getValue()));
+ }
+
+ @Override
+ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(GtEq<T> gtEq) {
+ return getBoundaryOrder().gtEq(createValueComparator(gtEq.getValue()));
+ }
+
+ @Override
+ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Lt<T> lt) {
+ return getBoundaryOrder().lt(createValueComparator(lt.getValue()));
+ }
+
+ @Override
+ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(LtEq<T> ltEq) {
+ return getBoundaryOrder().ltEq(createValueComparator(ltEq.getValue()));
+ }
+
+ @Override
+ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(NotEq<T> notEq) {
+ T value = notEq.getValue();
+ if (value == null) {
+ return IndexIterator.filter(getPageCount(), pageIndex -> !nullPages[pageIndex]);
+ }
+
+ if (nullCounts == null) {
+ // Nulls match so if we don't have null related statistics we have to return all pages
+ return IndexIterator.all(getPageCount());
+ }
+
+ // Merging value filtering with pages containing nulls
+ IntSet matchingIndexes = new IntOpenHashSet();
+ getBoundaryOrder().notEq(createValueComparator(value))
+ .forEachRemaining((int index) -> matchingIndexes.add(index));
+ return IndexIterator.filter(getPageCount(),
+ pageIndex -> nullCounts[pageIndex] > 0 || matchingIndexes.contains(pageIndex));
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> PrimitiveIterator.OfInt visit(
+ UserDefined<T, U> udp) {
+ final UserDefinedPredicate<T> predicate = udp.getUserDefinedPredicate();
+ final boolean acceptNulls = predicate.keep(null);
+
+ if (acceptNulls && nullCounts == null) {
+ // Nulls match so if we don't have null related statistics we have to return all pages
+ return IndexIterator.all(getPageCount());
+ }
+
+ return IndexIterator.filter(getPageCount(), new IntPredicate() {
+ private int arrayIndex = -1;
- abstract ByteBuffer getMaxValueAsBytes(int pageIndex);
+ @Override
+ public boolean test(int pageIndex) {
+ if (isNullPage(pageIndex)) {
+ return acceptNulls;
+ } else {
+ ++arrayIndex;
+ if (acceptNulls && nullCounts[pageIndex] > 0) {
+ return true;
+ }
+ org.apache.parquet.filter2.predicate.Statistics<T> stats = createStats(arrayIndex);
+ return !predicate.canDrop(stats);
+ }
+ }
+ });
+ }
- abstract String getMinValueAsString(int pageIndex);
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> PrimitiveIterator.OfInt visit(
+ LogicalNotUserDefined<T, U> udp) {
+ final UserDefinedPredicate<T> inversePredicate = udp.getUserDefined().getUserDefinedPredicate();
+ final boolean acceptNulls = !inversePredicate.keep(null);
+
+ if (acceptNulls && nullCounts == null) {
+ // Nulls match so if we don't have null related statistics we have to return all pages
+ return IndexIterator.all(getPageCount());
+ }
+
+ return IndexIterator.filter(getPageCount(), new IntPredicate() {
+ private int arrayIndex = -1;
- abstract String getMaxValueAsString(int pageIndex);
+ @Override
+ public boolean test(int pageIndex) {
+ if (isNullPage(pageIndex)) {
+ return acceptNulls;
+ } else {
+ ++arrayIndex;
+ if (acceptNulls && nullCounts[pageIndex] > 0) {
+ return true;
+ }
+ org.apache.parquet.filter2.predicate.Statistics<T> stats = createStats(arrayIndex);
+ return !inversePredicate.inverseCanDrop(stats);
+ }
+ }
+ });
+ }
}
private static final ColumnIndexBuilder NO_OP_BUILDER = new ColumnIndexBuilder() {
@@ -174,7 +366,7 @@ public abstract class ColumnIndexBuilder {
}
@Override
- ColumnIndexBase createColumnIndex(PrimitiveType type) {
+ ColumnIndexBase<?> createColumnIndex(PrimitiveType type) {
return null;
}
@@ -208,6 +400,8 @@ public abstract class ColumnIndexBuilder {
private final BooleanList nullPages = new BooleanArrayList();
private final LongList nullCounts = new LongArrayList();
private long minMaxSize;
+ private final IntList pageIndexes = new IntArrayList();
+ private int nextPageIndex;
/**
* @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at
@@ -283,7 +477,7 @@ public abstract class ColumnIndexBuilder {
}
builder.fill(nullPages, nullCounts, minValues, maxValues);
- ColumnIndexBase columnIndex = builder.build(type);
+ ColumnIndexBase<?> columnIndex = builder.build(type);
columnIndex.boundaryOrder = requireNonNull(boundaryOrder);
return columnIndex;
}
@@ -304,13 +498,14 @@ public abstract class ColumnIndexBuilder {
Object min = stats.genericGetMin();
Object max = stats.genericGetMax();
addMinMax(min, max);
+ pageIndexes.add(nextPageIndex);
minMaxSize += sizeOf(min);
minMaxSize += sizeOf(max);
} else {
nullPages.add(true);
- addMinMax(null, null);
}
nullCounts.add(stats.getNumNulls());
+ ++nextPageIndex;
}
abstract void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max);
@@ -320,9 +515,9 @@ public abstract class ColumnIndexBuilder {
private void fill(List<Boolean> nullPages, List<Long> nullCounts, List<ByteBuffer> minValues,
List<ByteBuffer> maxValues) {
clear();
- int requiredSize = nullPages.size();
- if ((nullCounts != null && nullCounts.size() != requiredSize) || minValues.size() != requiredSize
- || maxValues.size() != requiredSize) {
+ int pageCount = nullPages.size();
+ if ((nullCounts != null && nullCounts.size() != pageCount) || minValues.size() != pageCount
+ || maxValues.size() != pageCount) {
throw new IllegalArgumentException(
String.format("Not all sizes are equal (nullPages:%d, nullCounts:%s, minValues:%d, maxValues:%d",
nullPages.size(), nullCounts == null ? "null" : nullCounts.size(), minValues.size(), maxValues.size()));
@@ -333,13 +528,12 @@ public abstract class ColumnIndexBuilder {
this.nullCounts.addAll(nullCounts);
}
- for (int i = 0; i < requiredSize; ++i) {
- if (nullPages.get(i)) {
- addMinMaxFromBytes(null, null);
- } else {
+ for (int i = 0; i < pageCount; ++i) {
+ if (!nullPages.get(i)) {
ByteBuffer min = minValues.get(i);
ByteBuffer max = maxValues.get(i);
addMinMaxFromBytes(min, max);
+ pageIndexes.add(i);
minMaxSize += min.remaining();
minMaxSize += max.remaining();
}
@@ -350,7 +544,7 @@ public abstract class ColumnIndexBuilder {
* @return the newly created column index or {@code null} if the {@link ColumnIndex} would be empty
*/
public ColumnIndex build() {
- ColumnIndexBase columnIndex = build(type);
+ ColumnIndexBase<?> columnIndex = build(type);
if (columnIndex == null) {
return null;
}
@@ -358,16 +552,17 @@ public abstract class ColumnIndexBuilder {
return columnIndex;
}
- private ColumnIndexBase build(PrimitiveType type) {
+ private ColumnIndexBase<?> build(PrimitiveType type) {
if (nullPages.isEmpty()) {
return null;
}
- ColumnIndexBase columnIndex = createColumnIndex(type);
+ ColumnIndexBase<?> columnIndex = createColumnIndex(type);
columnIndex.nullPages = nullPages.toBooleanArray();
// Null counts is optional so keep it null if the builder has no values
if (!nullCounts.isEmpty()) {
columnIndex.nullCounts = nullCounts.toLongArray();
}
+ columnIndex.pageIndexes = pageIndexes.toIntArray();
return columnIndex;
}
@@ -384,51 +579,24 @@ public abstract class ColumnIndexBuilder {
// min[i] <= min[i+1] && max[i] <= max[i+1]
private boolean isAscending(PrimitiveComparator<Binary> comparator) {
- int prevPage = nextNonNullPage(0);
- // All pages are null-page
- if (prevPage < 0) {
- return false;
- }
- int nextPage = nextNonNullPage(prevPage + 1);
- while (nextPage > 0) {
- if (compareMinValues(comparator, prevPage, nextPage) > 0
- || compareMaxValues(comparator, prevPage, nextPage) > 0) {
+ for (int i = 1, n = pageIndexes.size(); i < n; ++i) {
+ if (compareMinValues(comparator, i - 1, i) > 0 || compareMaxValues(comparator, i - 1, i) > 0) {
return false;
}
- prevPage = nextPage;
- nextPage = nextNonNullPage(nextPage + 1);
}
return true;
}
// min[i] >= min[i+1] && max[i] >= max[i+1]
private boolean isDescending(PrimitiveComparator<Binary> comparator) {
- int prevPage = nextNonNullPage(0);
- // All pages are null-page
- if (prevPage < 0) {
- return false;
- }
- int nextPage = nextNonNullPage(prevPage + 1);
- while (nextPage > 0) {
- if (compareMinValues(comparator, prevPage, nextPage) < 0
- || compareMaxValues(comparator, prevPage, nextPage) < 0) {
+ for (int i = 1, n = pageIndexes.size(); i < n; ++i) {
+ if (compareMinValues(comparator, i - 1, i) < 0 || compareMaxValues(comparator, i - 1, i) < 0) {
return false;
}
- prevPage = nextPage;
- nextPage = nextNonNullPage(nextPage + 1);
}
return true;
}
- private int nextNonNullPage(int startIndex) {
- for (int i = startIndex, n = nullPages.size(); i < n; ++i) {
- if (!nullPages.get(i)) {
- return i;
- }
- }
- return -1;
- }
-
abstract int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
abstract int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
@@ -438,11 +606,13 @@ public abstract class ColumnIndexBuilder {
nullCounts.clear();
clearMinMax();
minMaxSize = 0;
+ nextPageIndex = 0;
+ pageIndexes.clear();
}
abstract void clearMinMax();
- abstract ColumnIndexBase createColumnIndex(PrimitiveType type);
+ abstract ColumnIndexBase<?> createColumnIndex(PrimitiveType type);
abstract int sizeOf(Object value);
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
index f877dfc..24be02c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
@@ -22,6 +22,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
import java.nio.ByteBuffer;
+import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveType;
@@ -30,7 +31,7 @@ import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.doubles.DoubleList;
class DoubleColumnIndexBuilder extends ColumnIndexBuilder {
- private static class DoubleColumnIndex extends ColumnIndexBase {
+ private static class DoubleColumnIndex extends ColumnIndexBase<Double> {
private double[] minValues;
private double[] maxValues;
@@ -57,6 +58,28 @@ class DoubleColumnIndexBuilder extends ColumnIndexBuilder {
String getMaxValueAsString(int pageIndex) {
return stringifier.stringify(maxValues[pageIndex]);
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+ return (Statistics<T>) new Statistics<Double>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+ }
+
+ @Override
+ ValueComparator createValueComparator(Object value) {
+ final double v = (double) value;
+ return new ValueComparator() {
+ @Override
+ int compareValueToMin(int arrayIndex) {
+ return comparator.compare(v, minValues[arrayIndex]);
+ }
+
+ @Override
+ int compareValueToMax(int arrayIndex) {
+ return comparator.compare(v, maxValues[arrayIndex]);
+ }
+ };
+ }
}
private final DoubleList minValues = new DoubleArrayList();
@@ -72,18 +95,18 @@ class DoubleColumnIndexBuilder extends ColumnIndexBuilder {
@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
- minValues.add(min == null ? 0 : convert(min));
- maxValues.add(max == null ? 0 : convert(max));
+ minValues.add(convert(min));
+ maxValues.add(convert(max));
}
@Override
void addMinMax(Object min, Object max) {
- minValues.add(min == null ? 0 : (double) min);
- maxValues.add(max == null ? 0 : (double) max);
+ minValues.add((double) min);
+ maxValues.add((double) max);
}
@Override
- ColumnIndexBase createColumnIndex(PrimitiveType type) {
+ ColumnIndexBase<Double> createColumnIndex(PrimitiveType type) {
DoubleColumnIndex columnIndex = new DoubleColumnIndex(type);
columnIndex.minValues = minValues.toDoubleArray();
columnIndex.maxValues = maxValues.toDoubleArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
index f170662..4452746 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
@@ -22,6 +22,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
import java.nio.ByteBuffer;
+import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveType;
@@ -30,7 +31,7 @@ import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.floats.FloatList;
class FloatColumnIndexBuilder extends ColumnIndexBuilder {
- private static class FloatColumnIndex extends ColumnIndexBase {
+ private static class FloatColumnIndex extends ColumnIndexBase<Float> {
private float[] minValues;
private float[] maxValues;
@@ -57,6 +58,28 @@ class FloatColumnIndexBuilder extends ColumnIndexBuilder {
String getMaxValueAsString(int pageIndex) {
return stringifier.stringify(maxValues[pageIndex]);
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+ return (Statistics<T>) new Statistics<Float>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+ }
+
+ @Override
+ ValueComparator createValueComparator(Object value) {
+ final float v = (float) value;
+ return new ValueComparator() {
+ @Override
+ int compareValueToMin(int arrayIndex) {
+ return comparator.compare(v, minValues[arrayIndex]);
+ }
+
+ @Override
+ int compareValueToMax(int arrayIndex) {
+ return comparator.compare(v, maxValues[arrayIndex]);
+ }
+ };
+ }
}
private final FloatList minValues = new FloatArrayList();
@@ -72,18 +95,18 @@ class FloatColumnIndexBuilder extends ColumnIndexBuilder {
@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
- minValues.add(min == null ? 0 : convert(min));
- maxValues.add(max == null ? 0 : convert(max));
+ minValues.add(convert(min));
+ maxValues.add(convert(max));
}
@Override
void addMinMax(Object min, Object max) {
- minValues.add(min == null ? 0 : (float) min);
- maxValues.add(max == null ? 0 : (float) max);
+ minValues.add((float) min);
+ maxValues.add((float) max);
}
@Override
- ColumnIndexBase createColumnIndex(PrimitiveType type) {
+ ColumnIndexBase<Float> createColumnIndex(PrimitiveType type) {
FloatColumnIndex columnIndex = new FloatColumnIndex(type);
columnIndex.minValues = minValues.toFloatArray();
columnIndex.maxValues = maxValues.toFloatArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
new file mode 100644
index 0000000..9eab65e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator;
+import java.util.function.IntPredicate;
+import java.util.function.IntUnaryOperator;
+
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder.ColumnIndexBase;
+
+/**
+ * Iterator implementation for page indexes.
+ */
+class IndexIterator implements PrimitiveIterator.OfInt {
+ public static final PrimitiveIterator.OfInt EMPTY = new OfInt() {
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public int nextInt() {
+ throw new NoSuchElementException();
+ }
+ };
+ private int index;
+ private final int endIndex;
+ private final IntPredicate filter;
+ private final IntUnaryOperator translator;
+
+ static PrimitiveIterator.OfInt all(int pageCount) {
+ return new IndexIterator(0, pageCount, i -> true, i -> i);
+ }
+
+ static PrimitiveIterator.OfInt all(ColumnIndexBase<?>.ValueComparator comparator) {
+ return new IndexIterator(0, comparator.arrayLength(), i -> true, comparator::translate);
+ }
+
+ static PrimitiveIterator.OfInt filter(int pageCount, IntPredicate filter) {
+ return new IndexIterator(0, pageCount, filter, i -> i);
+ }
+
+ static PrimitiveIterator.OfInt filterTranslate(int arrayLength, IntPredicate filter, IntUnaryOperator translator) {
+ return new IndexIterator(0, arrayLength, filter, translator);
+ }
+
+ static PrimitiveIterator.OfInt rangeTranslate(int from, int to, IntUnaryOperator translator) {
+ return new IndexIterator(from, to + 1, i -> true, translator);
+ }
+
+ private IndexIterator(int startIndex, int endIndex, IntPredicate filter, IntUnaryOperator translator) {
+ this.endIndex = endIndex;
+ this.filter = filter;
+ this.translator = translator;
+ index = nextPageIndex(startIndex);
+ }
+
+ private int nextPageIndex(int startIndex) {
+ for (int i = startIndex; i < endIndex; ++i) {
+ if (filter.test(i)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index >= 0;
+ }
+
+ @Override
+ public int nextInt() {
+ if (hasNext()) {
+ int ret = index;
+ index = nextPageIndex(index + 1);
+ return translator.applyAsInt(ret);
+ }
+ throw new NoSuchElementException();
+ }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
index f6bd94b..2d19d27 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
@@ -22,6 +22,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
import java.nio.ByteBuffer;
+import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveType;
@@ -30,7 +31,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
class IntColumnIndexBuilder extends ColumnIndexBuilder {
- private static class IntColumnIndex extends ColumnIndexBase {
+ private static class IntColumnIndex extends ColumnIndexBase<Integer> {
private int[] minValues;
private int[] maxValues;
@@ -57,6 +58,28 @@ class IntColumnIndexBuilder extends ColumnIndexBuilder {
String getMaxValueAsString(int pageIndex) {
return stringifier.stringify(maxValues[pageIndex]);
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+ return (Statistics<T>) new Statistics<Integer>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+ }
+
+ @Override
+ ValueComparator createValueComparator(Object value) {
+ final int v = (int) value;
+ return new ValueComparator() {
+ @Override
+ int compareValueToMin(int arrayIndex) {
+ return comparator.compare(v, minValues[arrayIndex]);
+ }
+
+ @Override
+ int compareValueToMax(int arrayIndex) {
+ return comparator.compare(v, maxValues[arrayIndex]);
+ }
+ };
+ }
}
private final IntList minValues = new IntArrayList();
@@ -72,18 +95,18 @@ class IntColumnIndexBuilder extends ColumnIndexBuilder {
@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
- minValues.add(min == null ? 0 : convert(min));
- maxValues.add(max == null ? 0 : convert(max));
+ minValues.add(convert(min));
+ maxValues.add(convert(max));
}
@Override
void addMinMax(Object min, Object max) {
- minValues.add(min == null ? 0 : (int) min);
- maxValues.add(max == null ? 0 : (int) max);
+ minValues.add((int) min);
+ maxValues.add((int) max);
}
@Override
- ColumnIndexBase createColumnIndex(PrimitiveType type) {
+ ColumnIndexBase<Integer> createColumnIndex(PrimitiveType type) {
IntColumnIndex columnIndex = new IntColumnIndex(type);
columnIndex.minValues = minValues.toIntArray();
columnIndex.maxValues = maxValues.toIntArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
index 696602d..b0189b7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
@@ -22,6 +22,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
import java.nio.ByteBuffer;
+import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveComparator;
import org.apache.parquet.schema.PrimitiveType;
@@ -30,7 +31,7 @@ import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
class LongColumnIndexBuilder extends ColumnIndexBuilder {
- private static class LongColumnIndex extends ColumnIndexBase {
+ private static class LongColumnIndex extends ColumnIndexBase<Long> {
private long[] minValues;
private long[] maxValues;
@@ -57,6 +58,28 @@ class LongColumnIndexBuilder extends ColumnIndexBuilder {
String getMaxValueAsString(int pageIndex) {
return stringifier.stringify(maxValues[pageIndex]);
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ <T extends Comparable<T>> Statistics<T> createStats(int arrayIndex) {
+ return (Statistics<T>) new Statistics<Long>(minValues[arrayIndex], maxValues[arrayIndex], comparator);
+ }
+
+ @Override
+ ValueComparator createValueComparator(Object value) {
+ final long v = (long) value;
+ return new ValueComparator() {
+ @Override
+ int compareValueToMin(int arrayIndex) {
+ return comparator.compare(v, minValues[arrayIndex]);
+ }
+
+ @Override
+ int compareValueToMax(int arrayIndex) {
+ return comparator.compare(v, maxValues[arrayIndex]);
+ }
+ };
+ }
}
private final LongList minValues = new LongArrayList();
@@ -72,18 +95,18 @@ class LongColumnIndexBuilder extends ColumnIndexBuilder {
@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
- minValues.add(min == null ? 0 : convert(min));
- maxValues.add(max == null ? 0 : convert(max));
+ minValues.add(convert(min));
+ maxValues.add(convert(max));
}
@Override
void addMinMax(Object min, Object max) {
- minValues.add(min == null ? 0 : (long) min);
- maxValues.add(max == null ? 0 : (long) max);
+ minValues.add((long) min);
+ maxValues.add((long) max);
}
@Override
- ColumnIndexBase createColumnIndex(PrimitiveType type) {
+ ColumnIndexBase<Long> createColumnIndex(PrimitiveType type) {
LongColumnIndex columnIndex = new LongColumnIndex(type);
columnIndex.minValues = minValues.toLongArray();
columnIndex.maxValues = maxValues.toLongArray();
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
index fd99ef3..ba984eb 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
@@ -49,4 +49,16 @@ public interface OffsetIndex {
* @return the index of the first row in the page
*/
public long getFirstRowIndex(int pageIndex);
+
+ /**
+ * @param pageIndex
+ * the index of the page
+ * @param rowGroupRowCount
+ * the total number of rows in the row-group
+ * @return the calculated index of the last row of the given page
+ */
+ public default long getLastRowIndex(int pageIndex, long rowGroupRowCount) {
+ int nextPageIndex = pageIndex + 1;
+ return (nextPageIndex >= getPageCount() ? rowGroupRowCount : getFirstRowIndex(nextPageIndex)) - 1;
+ }
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
new file mode 100644
index 0000000..007d753
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import java.util.PrimitiveIterator;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
+import org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore.MissingOffsetIndexException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Filter implementation based on column indexes.
+ * No filtering will be applied for columns where no column index is available.
+ * Offset index is required for all the columns in the projection, therefore a {@link MissingOffsetIndexException} will
+ * be thrown from any {@code visit} methods if any of the required offset indexes is missing.
+ */
+public class ColumnIndexFilter implements Visitor<RowRanges> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ColumnIndexFilter.class);
+ private final ColumnIndexStore columnIndexStore;
+ private final Set<ColumnPath> columns;
+ private final long rowCount;
+ private RowRanges allRows;
+
+ /**
+ * Calculates the row ranges containing the indexes of the rows might match the specified filter.
+ *
+ * @param filter
+ * to be used for filtering the rows
+ * @param columnIndexStore
+ * the store for providing column/offset indexes
+ * @param paths
+ * the paths of the columns used in the actual projection; a column not being part of the projection will be
+ * handled as containing {@code null} values only even if the column has values written in the file
+ * @param rowCount
+ * the total number of rows in the row-group
+ * @return the ranges of the possible matching row indexes; the returned ranges will contain all the rows if any of
+ * the required offset index is missing
+ */
+ public static RowRanges calculateRowRanges(FilterCompat.Filter filter, ColumnIndexStore columnIndexStore,
+ Set<ColumnPath> paths, long rowCount) {
+ return filter.accept(new FilterCompat.Visitor<RowRanges>() {
+ @Override
+ public RowRanges visit(FilterPredicateCompat filterPredicateCompat) {
+ try {
+ return filterPredicateCompat.getFilterPredicate()
+ .accept(new ColumnIndexFilter(columnIndexStore, paths, rowCount));
+ } catch (MissingOffsetIndexException e) {
+ LOGGER.warn("Unable to do filtering", e);
+ return RowRanges.single(rowCount);
+ }
+ }
+
+ @Override
+ public RowRanges visit(UnboundRecordFilterCompat unboundRecordFilterCompat) {
+ return RowRanges.single(rowCount);
+ }
+
+ @Override
+ public RowRanges visit(NoOpFilter noOpFilter) {
+ return RowRanges.single(rowCount);
+ }
+ });
+ }
+
+ private ColumnIndexFilter(ColumnIndexStore columnIndexStore, Set<ColumnPath> paths, long rowCount) {
+ this.columnIndexStore = columnIndexStore;
+ this.columns = paths;
+ this.rowCount = rowCount;
+ }
+
+ private RowRanges allRows() {
+ if (allRows == null) {
+ allRows = RowRanges.single(rowCount);
+ }
+ return allRows;
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(Eq<T> eq) {
+ return applyPredicate(eq.getColumn(), ci -> ci.visit(eq), eq.getValue() == null ? allRows() : RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(NotEq<T> notEq) {
+ return applyPredicate(notEq.getColumn(), ci -> ci.visit(notEq),
+ notEq.getValue() == null ? RowRanges.EMPTY : allRows());
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(Lt<T> lt) {
+ return applyPredicate(lt.getColumn(), ci -> ci.visit(lt), RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(LtEq<T> ltEq) {
+ return applyPredicate(ltEq.getColumn(), ci -> ci.visit(ltEq), RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(Gt<T> gt) {
+ return applyPredicate(gt.getColumn(), ci -> ci.visit(gt), RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>> RowRanges visit(GtEq<T> gtEq) {
+ return applyPredicate(gtEq.getColumn(), ci -> ci.visit(gtEq), RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> RowRanges visit(UserDefined<T, U> udp) {
+ return applyPredicate(udp.getColumn(), ci -> ci.visit(udp),
+ udp.getUserDefinedPredicate().keep(null) ? allRows() : RowRanges.EMPTY);
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> RowRanges visit(
+ LogicalNotUserDefined<T, U> udp) {
+ return applyPredicate(udp.getUserDefined().getColumn(), ci -> ci.visit(udp),
+ udp.getUserDefined().getUserDefinedPredicate().keep(null) ? RowRanges.EMPTY : allRows());
+ }
+
+ private RowRanges applyPredicate(Column<?> column, Function<ColumnIndex, PrimitiveIterator.OfInt> func,
+ RowRanges rangesForMissingColumns) {
+ ColumnPath columnPath = column.getColumnPath();
+ if (!columns.contains(columnPath)) {
+ return rangesForMissingColumns;
+ }
+
+ OffsetIndex oi = columnIndexStore.getOffsetIndex(columnPath);
+ ColumnIndex ci = columnIndexStore.getColumnIndex(columnPath);
+ if (ci == null) {
+ LOGGER.warn("No column index for column {} is available; Unable to filter on this column", columnPath);
+ return allRows();
+ }
+
+ return RowRanges.build(rowCount, func.apply(ci), oi);
+ }
+
+ @Override
+ public RowRanges visit(And and) {
+ return RowRanges.intersection(and.getLeft().accept(this), and.getRight().accept(this));
+ }
+
+ @Override
+ public RowRanges visit(Or or) {
+ return RowRanges.union(or.getLeft().accept(this), or.getRight().accept(this));
+ }
+
+ @Override
+ public RowRanges visit(Not not) {
+ throw new IllegalArgumentException(
+ "Predicates containing a NOT must be run through LogicalInverseRewriter. " + not);
+ }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexStore.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexStore.java
new file mode 100644
index 0000000..c82861a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexStore.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+/**
+ * Provides the {@link ColumnIndex} and {@link OffsetIndex} objects for a row-group.
+ */
+public interface ColumnIndexStore {
+
+ /**
+ * Exception thrown in case of an offset index is missing for any of the columns.
+ */
+ public static class MissingOffsetIndexException extends ParquetRuntimeException {
+ public MissingOffsetIndexException(ColumnPath path) {
+ super("No offset index for column " + path.toDotString() + " is available; Unable to do filtering");
+ }
+ }
+
+ /**
+ * @param column
+ * the path of the column
+ * @return the column index for the column-chunk in the row-group or {@code null} if no column index is available
+ */
+ ColumnIndex getColumnIndex(ColumnPath column);
+
+ /**
+ * @param column
+ * the path of the column
+ * @return the offset index for the column-chunk in the row-group
+ * @throws MissingOffsetIndexException
+ * if the related offset index is missing
+ */
+ OffsetIndex getOffsetIndex(ColumnPath column) throws MissingOffsetIndexException;
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
new file mode 100644
index 0000000..a04e513
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator;
+
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+/**
+ * Class representing row ranges in a row-group. These row ranges are calculated as a result of the column index based
+ * filtering.
+ *
+ * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Collection, long)
+ */
+public class RowRanges {
+ private static class Range {
+
+ // Returns the union of the two ranges or null if they are not overlapped.
+ private static Range union(Range left, Range right) {
+ if (left.from <= right.from) {
+ if (left.to + 1 >= right.from) {
+ return new Range(left.from, Math.max(left.to, right.to));
+ }
+ } else if (right.to + 1 >= left.from) {
+ return new Range(right.from, Math.max(left.to, right.to));
+ }
+ return null;
+ }
+
+ // Returns the intersection of the two ranges of null if they are not overlapped.
+ private static Range intersection(Range left, Range right) {
+ if (left.from <= right.from) {
+ if (left.to >= right.from) {
+ return new Range(right.from, Math.min(left.to, right.to));
+ }
+ } else if (right.to >= left.from) {
+ return new Range(left.from, Math.min(left.to, right.to));
+ }
+ return null;
+ }
+
+ final long from;
+ final long to;
+
+ // Creates a range of [from, to] (from and to are inclusive; empty ranges are not valid)
+ Range(long from, long to) {
+ assert from <= to;
+ this.from = from;
+ this.to = to;
+ }
+
+ long count() {
+ return to - from + 1;
+ }
+
+ boolean isBefore(Range other) {
+ return to < other.from;
+ }
+
+ boolean isAfter(Range other) {
+ return from > other.to;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + from + ", " + to + ']';
+ }
+ }
+
+ static final RowRanges EMPTY = new RowRanges();
+
+ static RowRanges single(long rowCount) {
+ RowRanges ranges = new RowRanges();
+ ranges.add(new Range(0, rowCount - 1));
+ return ranges;
+ }
+
+ static RowRanges build(long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex) {
+ RowRanges ranges = new RowRanges();
+ while (pageIndexes.hasNext()) {
+ int pageIndex = pageIndexes.nextInt();
+ ranges.add(new Range(offsetIndex.getFirstRowIndex(pageIndex), offsetIndex.getLastRowIndex(pageIndex, rowCount)));
+ }
+ return ranges;
+ }
+
+ static RowRanges union(RowRanges left, RowRanges right) {
+ RowRanges result = new RowRanges();
+ Iterator<Range> it1 = left.ranges.iterator();
+ Iterator<Range> it2 = right.ranges.iterator();
+ if (it2.hasNext()) {
+ Range range2 = it2.next();
+ while (it1.hasNext()) {
+ Range range1 = it1.next();
+ if (range1.isAfter(range2)) {
+ result.add(range2);
+ range2 = range1;
+ Iterator<Range> tmp = it1;
+ it1 = it2;
+ it2 = tmp;
+ } else {
+ result.add(range1);
+ }
+ }
+ result.add(range2);
+ } else {
+ it2 = it1;
+ }
+ while (it2.hasNext()) {
+ result.add(it2.next());
+ }
+
+ return result;
+ }
+
+ static RowRanges intersection(RowRanges left, RowRanges right) {
+ RowRanges result = new RowRanges();
+
+ int rightIndex = 0;
+ for (Range l : left.ranges) {
+ for (int i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+ Range r = right.ranges.get(i);
+ if (l.isBefore(r)) {
+ break;
+ } else if (l.isAfter(r)) {
+ rightIndex = i + 1;
+ continue;
+ }
+ result.add(Range.intersection(l, r));
+ }
+ }
+
+ return result;
+ }
+
+ private final List<Range> ranges = new ArrayList<>();
+
+ private RowRanges() {
+ }
+
+ /*
+ * Adds range to the end of the list of ranges. It maintains the disjunct ascending order of the ranges by trying to
+ * union the specified range to the last ranges if they are overlapping. The specified range shall be larger than the
+ * last one or might be overlapped with some of the last ones.
+ */
+ private void add(Range range) {
+ Range rangeToAdd = range;
+ for (int i = ranges.size() - 1; i >= 0; --i) {
+ Range last = ranges.get(i);
+ assert !last.isAfter(range);
+ Range u = Range.union(last, rangeToAdd);
+ if (u == null) {
+ break;
+ }
+ rangeToAdd = u;
+ ranges.remove(i);
+ }
+ ranges.add(rangeToAdd);
+ }
+
+ /**
+ * @return the number of rows in the ranges
+ */
+ public long rowCount() {
+ long cnt = 0;
+ for (Range range : ranges) {
+ cnt += range.count();
+ }
+ return cnt;
+ }
+
+ /**
+ * @return the ascending iterator of the row indexes contained in the ranges
+ */
+ public PrimitiveIterator.OfLong allRows() {
+ return new PrimitiveIterator.OfLong() {
+ private int currentRangeIndex = -1;
+ private Range currentRange;
+ private long next = findNext();
+
+ private long findNext() {
+ if (currentRange == null || next + 1 > currentRange.to) {
+ if (currentRangeIndex + 1 < ranges.size()) {
+ currentRange = ranges.get(++currentRangeIndex);
+ next = currentRange.from;
+ } else {
+ return -1;
+ }
+ } else {
+ ++next;
+ }
+ return next;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next >= 0;
+ }
+
+ @Override
+ public long nextLong() {
+ long ret = next;
+ if (ret < 0) {
+ throw new NoSuchElementException();
+ }
+ next = findNext();
+ return ret;
+ }
+ };
+ }
+
+ /**
+ * @param from
+ * the first row of the range to be checked for connection
+ * @param to
+ * the last row of the range to be checked for connection
+ * @return {@code true} if the specified range is overlapping (have common elements) with one of the ranges
+ */
+ public boolean isOverlapping(long from, long to) {
+ return Collections.binarySearch(ranges, new Range(from, to),
+ (r1, r2) -> r1.isBefore(r2) ? -1 : r1.isAfter(r2) ? 1 : 0) >= 0;
+ }
+
+ @Override
+ public String toString() {
+ return ranges.toString();
+ }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBoundaryOrder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBoundaryOrder.java
new file mode 100644
index 0000000..3d2a924
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBoundaryOrder.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.PrimitiveIterator;
+import java.util.Random;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder.ColumnIndexBase;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+/**
+ * Tests the operator implementations in {@link BoundaryOrder}.
+ */
+public class TestBoundaryOrder {
+ private static class SpyValueComparatorBuilder extends ColumnIndexBase<Integer> {
+ class SpyValueComparator extends ValueComparator {
+ private final ColumnIndexBase<?>.ValueComparator delegate;
+ private int compareCount;
+
+ SpyValueComparator(ColumnIndexBase<?>.ValueComparator delegate) {
+ this.delegate = delegate;
+ }
+
+ int getCompareCount() {
+ return compareCount;
+ }
+
+ @Override
+ int arrayLength() {
+ return delegate.arrayLength();
+ }
+
+ @Override
+ int translate(int arrayIndex) {
+ return delegate.translate(arrayIndex);
+ }
+
+ @Override
+ int compareValueToMin(int arrayIndex) {
+ ++compareCount;
+ return delegate.compareValueToMin(arrayIndex);
+ }
+
+ @Override
+ int compareValueToMax(int arrayIndex) {
+ ++compareCount;
+ return delegate.compareValueToMax(arrayIndex);
+ }
+ }
+
+ private SpyValueComparatorBuilder() {
+ super(TYPE);
+ }
+
+ SpyValueComparator build(ColumnIndexBase<?>.ValueComparator comparator) {
+ return new SpyValueComparator(comparator);
+ }
+
+ @Override
+ ByteBuffer getMinValueAsBytes(int arrayIndex) {
+ throw new Error("Shall never be invoked");
+ }
+
+ @Override
+ ByteBuffer getMaxValueAsBytes(int arrayIndex) {
+ throw new Error("Shall never be invoked");
+ }
+
+ @Override
+ String getMinValueAsString(int arrayIndex) {
+ throw new Error("Shall never be invoked");
+ }
+
+ @Override
+ String getMaxValueAsString(int arrayIndex) {
+ throw new Error("Shall never be invoked");
+ }
+
+ @Override
+ <T extends Comparable<T>> org.apache.parquet.filter2.predicate.Statistics<T> createStats(int arrayIndex) {
+ throw new Error("Shall never be invoked");
+ }
+
+ @Override
+ ColumnIndexBase<Integer>.ValueComparator createValueComparator(Object value) {
+ throw new Error("Shall never be invoked");
+ }
+ }
+
+ private static class ExecStats {
+ private long linearTime;
+ private long binaryTime;
+ private int linearCompareCount;
+ private int binaryCompareCount;
+ private int execCount;
+
+ IntList measureLinear(Function<ColumnIndexBase<?>.ValueComparator, PrimitiveIterator.OfInt> op,
+ ColumnIndexBase<?>.ValueComparator comparator) {
+ IntList list = new IntArrayList(comparator.arrayLength());
+ SpyValueComparatorBuilder.SpyValueComparator spyComparator = SPY_COMPARATOR_BUILDER.build(comparator);
+ long start = System.nanoTime();
+ op.apply(spyComparator).forEachRemaining((int value) -> list.add(value));
+ linearTime = System.nanoTime() - start;
+ linearCompareCount += spyComparator.getCompareCount();
+ return list;
+ }
+
+ IntList measureBinary(Function<ColumnIndexBase<?>.ValueComparator, PrimitiveIterator.OfInt> op,
+ ColumnIndexBase<?>.ValueComparator comparator) {
+ IntList list = new IntArrayList(comparator.arrayLength());
+ SpyValueComparatorBuilder.SpyValueComparator spyComparator = SPY_COMPARATOR_BUILDER.build(comparator);
+ long start = System.nanoTime();
+ op.apply(spyComparator).forEachRemaining((int value) -> list.add(value));
+ binaryTime = System.nanoTime() - start;
+ binaryCompareCount += spyComparator.getCompareCount();
+ return list;
+ }
+
+ void add(ExecStats stats) {
+ linearTime += stats.linearTime;
+ linearCompareCount += stats.linearCompareCount;
+ binaryTime += stats.binaryTime;
+ binaryCompareCount += stats.binaryCompareCount;
+ ++execCount;
+ }
+
+ @Override
+ public String toString() {
+ double linearMs = linearTime / 1_000_000.0;
+ double binaryMs = binaryTime / 1_000_000.0;
+ return String.format(
+ "Linear search: %.2fms (avg: %.6fms); number of compares: %d (avg: %d) [100.00%%]%n"
+ + "Binary search: %.2fms (avg: %.6fms); number of compares: %d (avg: %d) [%.2f%%]",
+ linearMs, linearMs / execCount, linearCompareCount, linearCompareCount / execCount,
+ binaryMs, binaryMs / execCount, binaryCompareCount, binaryCompareCount / execCount,
+ 100.0 * binaryCompareCount / linearCompareCount);
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestBoundaryOrder.class);
+ private static final PrimitiveType TYPE = Types.required(PrimitiveTypeName.INT32).named("test_int32");
+ private static final int FROM = -15;
+ private static final int TO = 15;
+ private static final ColumnIndexBase<?> ASCENDING;
+ private static final ColumnIndexBase<?> DESCENDING;
+ private static final int SINGLE_FROM = -1;
+ private static final int SINGLE_TO = 1;
+ private static final ColumnIndexBase<?> SINGLE;
+ private static final Random RANDOM = new Random(42);
+ private static final int RAND_FROM = -2000;
+ private static final int RAND_TO = 2000;
+ private static final int RAND_COUNT = 2000;
+ private static final ColumnIndexBase<?> RAND_ASCENDING;
+ private static final ColumnIndexBase<?> RAND_DESCENDING;
+ private static final SpyValueComparatorBuilder SPY_COMPARATOR_BUILDER = new SpyValueComparatorBuilder();
+ static {
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+ builder.add(stats(FROM, -12));
+ builder.add(stats(-10, -8));
+ builder.add(stats(-8, -4));
+ builder.add(stats(-6, -4));
+ builder.add(stats(-6, -3));
+ builder.add(stats(-6, -3));
+ builder.add(stats(-6, -3));
+ builder.add(stats(0, 3));
+ builder.add(stats(3, 5));
+ builder.add(stats(3, 5));
+ builder.add(stats(5, 8));
+ builder.add(stats(10, TO));
+ ASCENDING = (ColumnIndexBase<?>) builder.build();
+
+ builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+ builder.add(stats(10, TO));
+ builder.add(stats(5, 8));
+ builder.add(stats(3, 5));
+ builder.add(stats(3, 5));
+ builder.add(stats(0, 3));
+ builder.add(stats(-6, -3));
+ builder.add(stats(-6, -3));
+ builder.add(stats(-6, -3));
+ builder.add(stats(-6, -4));
+ builder.add(stats(-8, -4));
+ builder.add(stats(-10, -8));
+ builder.add(stats(FROM, -12));
+ DESCENDING = (ColumnIndexBase<?>) builder.build();
+
+ builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+ builder.add(stats(SINGLE_FROM, SINGLE_TO));
+ SINGLE = (ColumnIndexBase<?>) builder.build();
+
+ builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+ for (PrimitiveIterator.OfInt it = IntStream.generate(() -> RANDOM.nextInt(RAND_TO - RAND_FROM + 1) + RAND_FROM)
+ .limit(RAND_COUNT * 2).sorted().iterator(); it.hasNext();) {
+ builder.add(stats(it.nextInt(), it.nextInt()));
+ }
+ RAND_ASCENDING = (ColumnIndexBase<?>) builder.build();
+
+ builder = ColumnIndexBuilder.getBuilder(TYPE, Integer.MAX_VALUE);
+ for (Iterator<Integer> it = IntStream.generate(() -> RANDOM.nextInt(RAND_TO - RAND_FROM + 1) + RAND_FROM)
+ .limit(RAND_COUNT * 2).mapToObj(Integer::valueOf).sorted(Collections.reverseOrder()).iterator(); it
+ .hasNext();) {
+ builder.add(stats(it.next(), it.next()));
+ }
+ RAND_DESCENDING = (ColumnIndexBase<?>) builder.build();
+ }
+
+ private static Statistics<?> stats(int min, int max) {
+ Statistics<?> stats = Statistics.createStats(TYPE);
+ stats.updateStats(min);
+ stats.updateStats(max);
+ return stats;
+ }
+
+ private static ExecStats validateOperator(String msg,
+ Function<ColumnIndexBase<?>.ValueComparator, PrimitiveIterator.OfInt> validatorOp,
+ Function<ColumnIndexBase<?>.ValueComparator, PrimitiveIterator.OfInt> actualOp,
+ ColumnIndexBase<?>.ValueComparator comparator) {
+ ExecStats stats = new ExecStats();
+
+ IntList expected = stats.measureLinear(validatorOp, comparator);
+ IntList actual = stats.measureBinary(actualOp, comparator);
+
+ Assert.assertEquals(msg, expected, actual);
+
+ return stats;
+ }
+
+ @Test
+ public void testEq() {
+ for (int i = FROM - 1; i <= TO + 1; ++i) {
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::eq,
+ BoundaryOrder.ASCENDING::eq,
+ ASCENDING.createValueComparator(i));
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::eq,
+ BoundaryOrder.DESCENDING::eq,
+ DESCENDING.createValueComparator(i));
+ }
+ for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+ ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::eq,
+ BoundaryOrder.ASCENDING::eq,
+ singleComparator);
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::eq,
+ BoundaryOrder.DESCENDING::eq,
+ singleComparator);
+ }
+ ExecStats stats = new ExecStats();
+ for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::eq,
+ BoundaryOrder.ASCENDING::eq,
+ RAND_ASCENDING.createValueComparator(i)));
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::eq,
+ BoundaryOrder.DESCENDING::eq,
+ RAND_DESCENDING.createValueComparator(i)));
+ }
+ LOGGER.info("Executed eq on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+ RAND_TO - RAND_FROM + 2, stats);
+ }
+
+ @Test
+ public void testGt() {
+ for (int i = FROM - 1; i <= TO + 1; ++i) {
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::gt,
+ BoundaryOrder.ASCENDING::gt,
+ ASCENDING.createValueComparator(i));
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::gt,
+ BoundaryOrder.DESCENDING::gt,
+ DESCENDING.createValueComparator(i));
+ }
+ for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+ ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::gt,
+ BoundaryOrder.ASCENDING::gt,
+ singleComparator);
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::gt,
+ BoundaryOrder.DESCENDING::gt,
+ singleComparator);
+ }
+ ExecStats stats = new ExecStats();
+ for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::gt,
+ BoundaryOrder.ASCENDING::gt,
+ RAND_ASCENDING.createValueComparator(i)));
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::gt,
+ BoundaryOrder.DESCENDING::gt,
+ RAND_DESCENDING.createValueComparator(i)));
+ }
+ LOGGER.info("Executed gt on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+ RAND_TO - RAND_FROM + 2, stats);
+ }
+
+ @Test
+ public void testGtEq() {
+ for (int i = FROM - 1; i <= TO + 1; ++i) {
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::gtEq,
+ BoundaryOrder.ASCENDING::gtEq,
+ ASCENDING.createValueComparator(i));
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::gtEq,
+ BoundaryOrder.DESCENDING::gtEq,
+ DESCENDING.createValueComparator(i));
+ }
+ for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+ ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::gtEq,
+ BoundaryOrder.ASCENDING::gtEq,
+ singleComparator);
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::gtEq,
+ BoundaryOrder.DESCENDING::gtEq,
+ singleComparator);
+ }
+ ExecStats stats = new ExecStats();
+ for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::gtEq,
+ BoundaryOrder.ASCENDING::gtEq,
+ RAND_ASCENDING.createValueComparator(i)));
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::gtEq,
+ BoundaryOrder.DESCENDING::gtEq,
+ RAND_DESCENDING.createValueComparator(i)));
+ }
+ LOGGER.info("Executed gtEq on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+ RAND_TO - RAND_FROM + 2, stats);
+ }
+
+ @Test
+ public void testLt() {
+ for (int i = FROM - 1; i <= TO + 1; ++i) {
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::lt,
+ BoundaryOrder.ASCENDING::lt,
+ ASCENDING.createValueComparator(i));
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::lt,
+ BoundaryOrder.DESCENDING::lt,
+ DESCENDING.createValueComparator(i));
+ }
+ for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+ ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::lt,
+ BoundaryOrder.ASCENDING::lt,
+ singleComparator);
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::lt,
+ BoundaryOrder.DESCENDING::lt,
+ singleComparator);
+ }
+ ExecStats stats = new ExecStats();
+ for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::lt,
+ BoundaryOrder.ASCENDING::lt,
+ RAND_ASCENDING.createValueComparator(i)));
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::lt,
+ BoundaryOrder.DESCENDING::lt,
+ RAND_DESCENDING.createValueComparator(i)));
+ }
+ LOGGER.info("Executed lt on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+ RAND_TO - RAND_FROM + 2, stats);
+ }
+
+ @Test
+ public void testLtEq() {
+ for (int i = FROM - 1; i <= TO + 1; ++i) {
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::ltEq,
+ BoundaryOrder.ASCENDING::ltEq,
+ ASCENDING.createValueComparator(i));
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::ltEq,
+ BoundaryOrder.DESCENDING::ltEq,
+ DESCENDING.createValueComparator(i));
+ }
+ for (int i = SINGLE_FROM - 1; i <= SINGLE_TO + 1; ++i) {
+ ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::ltEq,
+ BoundaryOrder.ASCENDING::ltEq,
+ singleComparator);
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::ltEq,
+ BoundaryOrder.DESCENDING::ltEq,
+ singleComparator);
+ }
+ ExecStats stats = new ExecStats();
+ for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::ltEq,
+ BoundaryOrder.ASCENDING::ltEq,
+ RAND_ASCENDING.createValueComparator(i)));
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::ltEq,
+ BoundaryOrder.DESCENDING::ltEq,
+ RAND_DESCENDING.createValueComparator(i)));
+ }
+ LOGGER.info("Executed ltEq on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+ RAND_TO - RAND_FROM + 2, stats);
+ }
+
+ @Test
+ public void testNotEq() {
+ for (int i = -16; i <= 16; ++i) {
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::notEq,
+ BoundaryOrder.ASCENDING::notEq,
+ ASCENDING.createValueComparator(i));
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::notEq,
+ BoundaryOrder.DESCENDING::notEq,
+ DESCENDING.createValueComparator(i));
+ }
+ for (int i = FROM - 1; i <= TO + 1; ++i) {
+ ColumnIndexBase<?>.ValueComparator singleComparator = SINGLE.createValueComparator(i);
+ validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::notEq,
+ BoundaryOrder.ASCENDING::notEq,
+ singleComparator);
+ validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::notEq,
+ BoundaryOrder.DESCENDING::notEq,
+ singleComparator);
+ }
+ ExecStats stats = new ExecStats();
+ for (int i = RAND_FROM - 1; i <= RAND_TO + 1; ++i) {
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with ASCENDING order",
+ BoundaryOrder.UNORDERED::notEq,
+ BoundaryOrder.ASCENDING::notEq,
+ RAND_ASCENDING.createValueComparator(i)));
+ stats.add(validateOperator("Mismatching page indexes for value " + i + " with DESCENDING order",
+ BoundaryOrder.UNORDERED::notEq,
+ BoundaryOrder.DESCENDING::notEq,
+ RAND_DESCENDING.createValueComparator(i)));
+ }
+ LOGGER.info("Executed notEq on random data (page count: {}, values searched: {}):\n{}", RAND_COUNT,
+ RAND_TO - RAND_FROM + 2, stats);
+ }
+
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
index 7a5745e..e4655b2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -19,6 +19,20 @@
package org.apache.parquet.internal.column.columnindex;
import static java.util.Arrays.asList;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
import static org.apache.parquet.schema.OriginalType.UINT_8;
import static org.apache.parquet.schema.OriginalType.UTF8;
@@ -39,19 +53,19 @@ import static org.junit.Assert.fail;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.internal.column.columnindex.BinaryColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.BooleanColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.DoubleColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.FloatColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.IntColumnIndexBuilder;
-import org.apache.parquet.internal.column.columnindex.LongColumnIndexBuilder;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
@@ -62,12 +76,167 @@ import org.junit.Test;
*/
public class TestColumnIndexBuilder {
+ public static class BinaryDecimalIsNullOrZeroUdp extends UserDefinedPredicate<Binary> {
+ private static final Binary ZERO = decimalBinary("0.0");
+
+ @Override
+ public boolean keep(Binary value) {
+ return value == null || value.equals(ZERO);
+ }
+
+ @Override
+ public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Binary> statistics) {
+ Comparator<Binary> cmp = statistics.getComparator();
+ return cmp.compare(statistics.getMin(), ZERO) > 0 || cmp.compare(statistics.getMax(), ZERO) < 0;
+ }
+
+ @Override
+ public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Binary> statistics) {
+ Comparator<Binary> cmp = statistics.getComparator();
+ return cmp.compare(statistics.getMin(), ZERO) == 0 && cmp.compare(statistics.getMax(), ZERO) == 0;
+ }
+ }
+
+ public static class BinaryUtf8StartsWithB extends UserDefinedPredicate<Binary> {
+ private static final Binary B = stringBinary("B");
+ private static final Binary C = stringBinary("C");
+
+ @Override
+ public boolean keep(Binary value) {
+ return value != null && value.length() > 0 && value.getBytesUnsafe()[0] == 'B';
+ }
+
+ @Override
+ public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Binary> statistics) {
+ Comparator<Binary> cmp = statistics.getComparator();
+ return cmp.compare(statistics.getMin(), C) >= 0 || cmp.compare(statistics.getMax(), B) < 0;
+ }
+
+ @Override
+ public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Binary> statistics) {
+ Comparator<Binary> cmp = statistics.getComparator();
+ return cmp.compare(statistics.getMin(), B) >= 0 && cmp.compare(statistics.getMax(), C) < 0;
+ }
+ }
+
+ public static class BooleanIsTrueOrNull extends UserDefinedPredicate<Boolean> {
+ @Override
+ public boolean keep(Boolean value) {
+ return value == null || value;
+ }
+
+ @Override
+ public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Boolean> statistics) {
+ return statistics.getComparator().compare(statistics.getMax(), true) != 0;
+ }
+
+ @Override
+ public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Boolean> statistics) {
+ return statistics.getComparator().compare(statistics.getMin(), true) == 0;
+ }
+ }
+
+ public static class DoubleIsInteger extends UserDefinedPredicate<Double> {
+ @Override
+ public boolean keep(Double value) {
+ return value != null && Math.floor(value) == value;
+ }
+
+ @Override
+ public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Double> statistics) {
+ double min = statistics.getMin();
+ double max = statistics.getMax();
+ Comparator<Double> cmp = statistics.getComparator();
+ return cmp.compare(Math.floor(min), Math.floor(max)) == 0 && cmp.compare(Math.floor(min), min) != 0
+ && cmp.compare(Math.floor(max), max) != 0;
+ }
+
+ @Override
+ public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Double> statistics) {
+ double min = statistics.getMin();
+ double max = statistics.getMax();
+ Comparator<Double> cmp = statistics.getComparator();
+ return cmp.compare(min, max) == 0 && cmp.compare(Math.floor(min), min) == 0;
+ }
+ }
+
+ public static class FloatIsInteger extends UserDefinedPredicate<Float> {
+ private static float floor(float value) {
+ return (float) Math.floor(value);
+ }
+
+ @Override
+ public boolean keep(Float value) {
+ return value != null && Math.floor(value) == value;
+ }
+
+ @Override
+ public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Float> statistics) {
+ float min = statistics.getMin();
+ float max = statistics.getMax();
+ Comparator<Float> cmp = statistics.getComparator();
+ return cmp.compare(floor(min), floor(max)) == 0 && cmp.compare(floor(min), min) != 0
+ && cmp.compare(floor(max), max) != 0;
+ }
+
+ @Override
+ public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Float> statistics) {
+ float min = statistics.getMin();
+ float max = statistics.getMax();
+ Comparator<Float> cmp = statistics.getComparator();
+ return cmp.compare(min, max) == 0 && cmp.compare(floor(min), min) == 0;
+ }
+ }
+
+ public static class IntegerIsDivisableWith3 extends UserDefinedPredicate<Integer> {
+ @Override
+ public boolean keep(Integer value) {
+ return value != null && value % 3 == 0;
+ }
+
+ @Override
+ public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Integer> statistics) {
+ int min = statistics.getMin();
+ int max = statistics.getMax();
+ return min % 3 != 0 && max % 3 != 0 && max - min < 3;
+ }
+
+ @Override
+ public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Integer> statistics) {
+ int min = statistics.getMin();
+ int max = statistics.getMax();
+ return min == max && min % 3 == 0;
+ }
+ }
+
+ public static class LongIsDivisableWith3 extends UserDefinedPredicate<Long> {
+ @Override
+ public boolean keep(Long value) {
+ return value != null && value % 3 == 0;
+ }
+
+ @Override
+ public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<Long> statistics) {
+ long min = statistics.getMin();
+ long max = statistics.getMax();
+ return min % 3 != 0 && max % 3 != 0 && max - min < 3;
+ }
+
+ @Override
+ public boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics<Long> statistics) {
+ long min = statistics.getMin();
+ long max = statistics.getMax();
+ return min == max && min % 3 == 0;
+ }
+ }
+
@Test
public void testBuildBinaryDecimal() {
PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal");
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
assertNull(builder.build());
+ BinaryColumn col = binaryColumn("test_col");
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
@@ -102,6 +271,16 @@ public class TestColumnIndexBuilder {
null,
null,
decimalBinary("87656273"));
+ assertCorrectFiltering(columnIndex, eq(col, decimalBinary("0.0")), 1, 4);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 5, 6);
+ assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("87656273")), 0, 1, 2, 3, 4, 5, 6);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 4, 7);
+ assertCorrectFiltering(columnIndex, gt(col, decimalBinary("2348978.45")), 1);
+ assertCorrectFiltering(columnIndex, gtEq(col, decimalBinary("2348978.45")), 1, 4);
+ assertCorrectFiltering(columnIndex, lt(col, decimalBinary("-234.23")), 4);
+ assertCorrectFiltering(columnIndex, ltEq(col, decimalBinary("-234.23")), 2, 4);
+ assertCorrectFiltering(columnIndex, userDefined(col, BinaryDecimalIsNullOrZeroUdp.class), 0, 1, 2, 3, 4, 5, 6);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryDecimalIsNullOrZeroUdp.class)), 1, 2, 4, 7);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -137,6 +316,16 @@ public class TestColumnIndexBuilder {
null,
decimalBinary("1234567890.12"),
null);
+ assertCorrectFiltering(columnIndex, eq(col, decimalBinary("87656273")), 2, 4);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 3, 5, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("87656273")), 0, 1, 2, 3, 5, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 4, 6);
+ assertCorrectFiltering(columnIndex, gt(col, decimalBinary("87656273")), 6);
+ assertCorrectFiltering(columnIndex, gtEq(col, decimalBinary("87656273")), 2, 4, 6);
+ assertCorrectFiltering(columnIndex, lt(col, decimalBinary("-0.17")), 1);
+ assertCorrectFiltering(columnIndex, ltEq(col, decimalBinary("-0.17")), 1, 2);
+ assertCorrectFiltering(columnIndex, userDefined(col, BinaryDecimalIsNullOrZeroUdp.class), 0, 2, 3, 5, 6, 7);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryDecimalIsNullOrZeroUdp.class)), 1, 2, 4, 6);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -172,6 +361,16 @@ public class TestColumnIndexBuilder {
decimalBinary("-0.17"),
null,
decimalBinary("-9999293.23"));
+ assertCorrectFiltering(columnIndex, eq(col, decimalBinary("1234567890.12")), 2, 4);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 6);
+ assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("0.0")), 0, 1, 2, 3, 4, 5, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 2, 4, 5, 7);
+ assertCorrectFiltering(columnIndex, gt(col, decimalBinary("1234567890.12")));
+ assertCorrectFiltering(columnIndex, gtEq(col, decimalBinary("1234567890.12")), 2, 4);
+ assertCorrectFiltering(columnIndex, lt(col, decimalBinary("-0.17")), 7);
+ assertCorrectFiltering(columnIndex, ltEq(col, decimalBinary("-0.17")), 5, 7);
+ assertCorrectFiltering(columnIndex, userDefined(col, BinaryDecimalIsNullOrZeroUdp.class), 0, 1, 2, 3, 5, 6);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryDecimalIsNullOrZeroUdp.class)), 2, 4, 5, 7);
}
@Test
@@ -180,6 +379,7 @@ public class TestColumnIndexBuilder {
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
assertNull(builder.build());
+ BinaryColumn col = binaryColumn("test_col");
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
@@ -214,6 +414,16 @@ public class TestColumnIndexBuilder {
stringBinary("Dent"),
stringBinary("Beeblebrox"),
null);
+ assertCorrectFiltering(columnIndex, eq(col, stringBinary("Marvin")), 1, 4, 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 5, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Beeblebrox")), 0, 1, 2, 3, 4, 5, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5, 6);
+ assertCorrectFiltering(columnIndex, gt(col, stringBinary("Prefect")), 1, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, stringBinary("Prefect")), 1, 4, 5);
+ assertCorrectFiltering(columnIndex, lt(col, stringBinary("Dent")), 4, 6);
+ assertCorrectFiltering(columnIndex, ltEq(col, stringBinary("Dent")), 4, 5, 6);
+ assertCorrectFiltering(columnIndex, userDefined(col, BinaryUtf8StartsWithB.class), 4, 6);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryUtf8StartsWithB.class)), 0, 1, 2, 3, 4, 5, 7);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -249,6 +459,16 @@ public class TestColumnIndexBuilder {
null,
stringBinary("Slartibartfast"),
null);
+ assertCorrectFiltering(columnIndex, eq(col, stringBinary("Jeltz")), 3, 4);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 4, 5, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Slartibartfast")), 0, 1, 2, 3, 4, 5, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 0, 3, 4, 6);
+ assertCorrectFiltering(columnIndex, gt(col, stringBinary("Marvin")), 4, 6);
+ assertCorrectFiltering(columnIndex, gtEq(col, stringBinary("Marvin")), 4, 6);
+ assertCorrectFiltering(columnIndex, lt(col, stringBinary("Dent")), 0);
+ assertCorrectFiltering(columnIndex, ltEq(col, stringBinary("Dent")), 0, 3, 4);
+ assertCorrectFiltering(columnIndex, userDefined(col, BinaryUtf8StartsWithB.class), 0);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryUtf8StartsWithB.class)), 0, 1, 2, 3, 4, 5, 6, 7);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -284,6 +504,16 @@ public class TestColumnIndexBuilder {
null,
null,
stringBinary("Beeblebrox"));
+ assertCorrectFiltering(columnIndex, eq(col, stringBinary("Marvin")), 3);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 5, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Dent")), 0, 1, 2, 3, 5, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 4, 7);
+ assertCorrectFiltering(columnIndex, gt(col, stringBinary("Prefect")), 1);
+ assertCorrectFiltering(columnIndex, gtEq(col, stringBinary("Prefect")), 1, 3);
+ assertCorrectFiltering(columnIndex, lt(col, stringBinary("Marvin")), 3, 4, 7);
+ assertCorrectFiltering(columnIndex, ltEq(col, stringBinary("Marvin")), 3, 4, 7);
+ assertCorrectFiltering(columnIndex, userDefined(col, BinaryUtf8StartsWithB.class), 7);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryUtf8StartsWithB.class)), 0, 1, 2, 3, 4, 5, 6, 7);
}
@Test
@@ -335,11 +565,68 @@ public class TestColumnIndexBuilder {
}
@Test
+ public void testFilterWithoutNullCounts() {
+ ColumnIndex columnIndex = ColumnIndexBuilder.build(
+ Types.required(BINARY).as(UTF8).named("test_binary_utf8"),
+ BoundaryOrder.ASCENDING,
+ asList(true, true, false, false, true, false, true, false),
+ null,
+ toBBList(
+ null,
+ null,
+ stringBinary("Beeblebrox"),
+ stringBinary("Dent"),
+ null,
+ stringBinary("Jeltz"),
+ null,
+ stringBinary("Slartibartfast")),
+ toBBList(
+ null,
+ null,
+ stringBinary("Dent"),
+ stringBinary("Dent"),
+ null,
+ stringBinary("Prefect"),
+ null,
+ stringBinary("Slartibartfast")));
+ assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+ assertNull(columnIndex.getNullCounts());
+ assertCorrectNullPages(columnIndex, true, true, false, false, true, false, true, false);
+ assertCorrectValues(columnIndex.getMaxValues(),
+ null,
+ null,
+ stringBinary("Dent"),
+ stringBinary("Dent"),
+ null,
+ stringBinary("Prefect"),
+ null,
+ stringBinary("Slartibartfast"));
+ assertCorrectValues(columnIndex.getMinValues(),
+ null,
+ null,
+ stringBinary("Beeblebrox"),
+ stringBinary("Dent"),
+ null,
+ stringBinary("Jeltz"),
+ null,
+ stringBinary("Slartibartfast"));
+
+ BinaryColumn col = binaryColumn("test_col");
+ assertCorrectFiltering(columnIndex, eq(col, stringBinary("Dent")), 2, 3);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 5, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Dent")), 0, 1, 2, 3, 4, 5, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 2, 3, 5, 7);
+ assertCorrectFiltering(columnIndex, userDefined(col, BinaryDecimalIsNullOrZeroUdp.class), 0, 1, 2, 3, 4, 5, 6, 7);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BinaryDecimalIsNullOrZeroUdp.class)), 2, 3, 5, 7);
+ }
+
+ @Test
public void testBuildBoolean() {
PrimitiveType type = Types.required(BOOLEAN).named("test_boolean");
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(BooleanColumnIndexBuilder.class));
assertNull(builder.build());
+ BooleanColumn col = booleanColumn("test_col");
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
StatsBuilder sb = new StatsBuilder();
@@ -356,6 +643,12 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, false, false, false, true, false);
assertCorrectValues(columnIndex.getMaxValues(), true, true, true, null, false);
assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, false);
+ assertCorrectFiltering(columnIndex, eq(col, true), 0, 1, 2);
+ assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+ assertCorrectFiltering(columnIndex, notEq(col, true), 0, 1, 2, 3, 4);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4);
+ assertCorrectFiltering(columnIndex, userDefined(col, BooleanIsTrueOrNull.class), 0, 1, 2, 3);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BooleanIsTrueOrNull.class)), 0, 1, 4);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -374,6 +667,12 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, true, true, false, false, true);
assertCorrectValues(columnIndex.getMaxValues(), null, false, null, null, true, true, null);
assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, false, false, null);
+ assertCorrectFiltering(columnIndex, eq(col, true), 4, 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 5, 6);
+ assertCorrectFiltering(columnIndex, notEq(col, true), 0, 1, 2, 3, 4, 5, 6);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, BooleanIsTrueOrNull.class), 0, 2, 3, 4, 5, 6);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BooleanIsTrueOrNull.class)), 1, 4, 5);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -392,6 +691,12 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, true, true, false, false, true);
assertCorrectValues(columnIndex.getMaxValues(), null, true, null, null, true, false, null);
assertCorrectValues(columnIndex.getMinValues(), null, true, null, null, false, false, null);
+ assertCorrectFiltering(columnIndex, eq(col, true), 1, 4);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 5, 6);
+ assertCorrectFiltering(columnIndex, notEq(col, true), 0, 2, 3, 4, 5, 6);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, BooleanIsTrueOrNull.class), 0, 1, 2, 3, 4, 5, 6);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, BooleanIsTrueOrNull.class)), 4, 5);
}
@Test
@@ -416,6 +721,7 @@ public class TestColumnIndexBuilder {
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(DoubleColumnIndexBuilder.class));
assertNull(builder.build());
+ DoubleColumn col = doubleColumn("test_col");
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(type, -4.2, -4.1));
@@ -432,6 +738,16 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
assertCorrectValues(columnIndex.getMaxValues(), -4.1, 7.0, 2.2, null, 2.32, 8.1);
assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 1.9, -21.0);
+ assertCorrectFiltering(columnIndex, eq(col, 0.0), 1, 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+ assertCorrectFiltering(columnIndex, notEq(col, 2.2), 0, 1, 2, 3, 4, 5);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, gt(col, 2.2), 1, 4, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2.2), 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, lt(col, -4.2), 1, 5);
+ assertCorrectFiltering(columnIndex, ltEq(col, -4.2), 0, 1, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, DoubleIsInteger.class), 1, 4, 5);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, DoubleIsInteger.class)), 0, 1, 2, 3, 4, 5);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -452,6 +768,16 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
assertCorrectValues(columnIndex.getMaxValues(), null, -345.2, -234.6, null, null, 2.99999, null, 42.83, null);
assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, null, null, -234.6, null, 3.0, null);
+ assertCorrectFiltering(columnIndex, eq(col, 0.0), 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, 0.0), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+ assertCorrectFiltering(columnIndex, gt(col, 2.99999), 7);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2.99999), 5, 7);
+ assertCorrectFiltering(columnIndex, lt(col, -234.6), 1, 2);
+ assertCorrectFiltering(columnIndex, ltEq(col, -234.6), 1, 2, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, DoubleIsInteger.class), 1, 5, 7);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, DoubleIsInteger.class)), 0, 1, 2, 3, 4, 5, 6, 7, 8);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -472,6 +798,16 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
assertCorrectValues(columnIndex.getMaxValues(), null, 532.3, null, 234.7, null, 234.69, null, null, -3.0);
assertCorrectValues(columnIndex.getMinValues(), null, 345.2, null, 234.6, null, -2.99999, null, null, -42.83);
+ assertCorrectFiltering(columnIndex, eq(col, 234.6), 3, 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, 2.2), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+ assertCorrectFiltering(columnIndex, gt(col, 2.2), 1, 3, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, 234.69), 1, 3, 5);
+ assertCorrectFiltering(columnIndex, lt(col, -2.99999), 8);
+ assertCorrectFiltering(columnIndex, ltEq(col, -2.99999), 5, 8);
+ assertCorrectFiltering(columnIndex, userDefined(col, DoubleIsInteger.class), 1, 5, 8);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, DoubleIsInteger.class)), 0, 1, 2, 3, 4, 5, 6, 7, 8);
}
@Test
@@ -496,6 +832,7 @@ public class TestColumnIndexBuilder {
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(FloatColumnIndexBuilder.class));
assertNull(builder.build());
+ FloatColumn col = floatColumn("test_col");
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(type, -4.2f, -4.1f));
@@ -512,6 +849,16 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
assertCorrectValues(columnIndex.getMaxValues(), -4.1f, 7.0f, 2.2f, null, 2.32f, 8.1f);
assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 1.9f, -21.0f);
+ assertCorrectFiltering(columnIndex, eq(col, 0.0f), 1, 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+ assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, gt(col, 2.2f), 1, 4, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2.2f), 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, lt(col, 0.0f), 0, 1, 5);
+ assertCorrectFiltering(columnIndex, ltEq(col, 1.9f), 0, 1, 4, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, FloatIsInteger.class), 1, 4, 5);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, FloatIsInteger.class)), 0, 1, 2, 3, 4, 5);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -532,6 +879,16 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
assertCorrectValues(columnIndex.getMaxValues(), null, -345.2f, -234.7f, null, null, 2.99999f, null, 42.83f, null);
assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, null, null, -234.6f, null, 3.0f, null);
+ assertCorrectFiltering(columnIndex, eq(col, 0.0f), 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+ assertCorrectFiltering(columnIndex, gt(col, 2.2f), 5, 7);
+ assertCorrectFiltering(columnIndex, gtEq(col, -234.7f), 2, 5, 7);
+ assertCorrectFiltering(columnIndex, lt(col, -234.6f), 1, 2);
+ assertCorrectFiltering(columnIndex, ltEq(col, -234.6f), 1, 2, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, FloatIsInteger.class), 1, 2, 5, 7);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, FloatIsInteger.class)), 0, 1, 2, 3, 4, 5, 6, 7, 8);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -552,6 +909,16 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
assertCorrectValues(columnIndex.getMaxValues(), null, 532.3f, null, 234.7f, null, 234.6f, null, null, -3.0f);
assertCorrectValues(columnIndex.getMinValues(), null, 345.2f, null, 234.6f, null, -2.99999f, null, null, -42.83f);
+ assertCorrectFiltering(columnIndex, eq(col, 234.65f), 3);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+ assertCorrectFiltering(columnIndex, gt(col, 2.2f), 1, 3, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2.2f), 1, 3, 5);
+ assertCorrectFiltering(columnIndex, lt(col, 0.0f), 5, 8);
+ assertCorrectFiltering(columnIndex, ltEq(col, 0.0f), 5, 8);
+ assertCorrectFiltering(columnIndex, userDefined(col, FloatIsInteger.class), 1, 5, 8);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, FloatIsInteger.class)), 0, 1, 2, 3, 4, 5, 6, 7, 8);
}
@Test
@@ -576,6 +943,7 @@ public class TestColumnIndexBuilder {
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
assertNull(builder.build());
+ IntColumn col = intColumn("test_col");
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(type, -4, 10));
@@ -592,6 +960,16 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
assertCorrectValues(columnIndex.getMaxValues(), 10, 7, 2, null, 2, 8);
assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21);
+ assertCorrectFiltering(columnIndex, eq(col, 2), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+ assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, gt(col, 2), 0, 1, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, lt(col, 2), 0, 1, 4, 5);
+ assertCorrectFiltering(columnIndex, ltEq(col, 2), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 0, 1, 5);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -612,6 +990,17 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
assertCorrectValues(columnIndex.getMaxValues(), null, -345, -42, null, null, 2, null, 42, null);
assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, null, -42, null, 3, null);
+ assertCorrectFiltering(columnIndex, eq(col, 2), 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+ assertCorrectFiltering(columnIndex, gt(col, 2), 7);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2), 5, 7);
+ assertCorrectFiltering(columnIndex, lt(col, 2), 1, 2, 5);
+ assertCorrectFiltering(columnIndex, ltEq(col, 2), 1, 2, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 1, 2, 5, 7);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+ 8);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -632,6 +1021,17 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
assertCorrectValues(columnIndex.getMaxValues(), null, 532, null, 234, null, 42, null, null, -3);
assertCorrectValues(columnIndex.getMinValues(), null, 345, null, 42, null, -2, null, null, -42);
+ assertCorrectFiltering(columnIndex, eq(col, 2), 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+ assertCorrectFiltering(columnIndex, gt(col, 2), 1, 3, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2), 1, 3, 5);
+ assertCorrectFiltering(columnIndex, lt(col, 2), 5, 8);
+ assertCorrectFiltering(columnIndex, ltEq(col, 2), 5, 8);
+ assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 1, 3, 5, 8);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+ 8);
}
@Test
@@ -656,6 +1056,7 @@ public class TestColumnIndexBuilder {
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
assertNull(builder.build());
+ IntColumn col = intColumn("test_col");
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(type, 4, 10));
@@ -672,6 +1073,16 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
assertCorrectValues(columnIndex.getMaxValues(), 10, 17, 2, null, 0xFF, 0xFA);
assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF);
+ assertCorrectFiltering(columnIndex, eq(col, 2), 2, 4);
+ assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+ assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, gt(col, 2), 0, 1, 4, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, lt(col, 0xEF), 0, 1, 2, 4);
+ assertCorrectFiltering(columnIndex, ltEq(col, 0xEF), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 0, 1, 4, 5);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -692,6 +1103,17 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
assertCorrectValues(columnIndex.getMaxValues(), null, 0, 42, null, null, 0xEE, null, 0xFF, null);
assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 42, null, 0xEF, null);
+ assertCorrectFiltering(columnIndex, eq(col, 2), 2);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+ assertCorrectFiltering(columnIndex, gt(col, 0xEE), 7);
+ assertCorrectFiltering(columnIndex, gtEq(col, 0xEE), 5, 7);
+ assertCorrectFiltering(columnIndex, lt(col, 42), 1, 2);
+ assertCorrectFiltering(columnIndex, ltEq(col, 42), 1, 2, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 1, 2, 5, 7);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+ 8);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -712,6 +1134,17 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
assertCorrectValues(columnIndex.getMaxValues(), null, 0xFF, null, 0xEF, null, 0xEE, null, null, 41);
assertCorrectValues(columnIndex.getMinValues(), null, 0xFF, null, 0xEA, null, 42, null, null, 0);
+ assertCorrectFiltering(columnIndex, eq(col, 0xAB), 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, 0xFF), 0, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+ assertCorrectFiltering(columnIndex, gt(col, 0xFF));
+ assertCorrectFiltering(columnIndex, gtEq(col, 0xFF), 1);
+ assertCorrectFiltering(columnIndex, lt(col, 42), 8);
+ assertCorrectFiltering(columnIndex, ltEq(col, 42), 5, 8);
+ assertCorrectFiltering(columnIndex, userDefined(col, IntegerIsDivisableWith3.class), 1, 3, 5, 8);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, IntegerIsDivisableWith3.class)), 0, 2, 3, 4, 5, 6, 7,
+ 8);
}
@Test
@@ -720,6 +1153,7 @@ public class TestColumnIndexBuilder {
ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(LongColumnIndexBuilder.class));
assertNull(builder.build());
+ LongColumn col = longColumn("test_col");
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(type, -4l, 10l));
@@ -736,6 +1170,16 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
assertCorrectValues(columnIndex.getMaxValues(), 10l, 7l, 2l, null, 2l, 8l);
assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, -21l);
+ assertCorrectFiltering(columnIndex, eq(col, 0l), 0, 1, 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3);
+ assertCorrectFiltering(columnIndex, notEq(col, 0l), 0, 1, 2, 3, 4, 5);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, gt(col, 2l), 0, 1, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2l), 0, 1, 2, 4, 5);
+ assertCorrectFiltering(columnIndex, lt(col, -21l));
+ assertCorrectFiltering(columnIndex, ltEq(col, -21l), 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, LongIsDivisableWith3.class), 0, 1, 5);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, LongIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -756,6 +1200,17 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
assertCorrectValues(columnIndex.getMaxValues(), null, -345l, -42l, null, null, 2l, null, 42l, null);
assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, null, -42l, null, -3l, null);
+ assertCorrectFiltering(columnIndex, eq(col, -42l), 2, 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, -42l), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7);
+ assertCorrectFiltering(columnIndex, gt(col, 2l), 7);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2l), 5, 7);
+ assertCorrectFiltering(columnIndex, lt(col, -42l), 1, 2);
+ assertCorrectFiltering(columnIndex, ltEq(col, -42l), 1, 2, 5);
+ assertCorrectFiltering(columnIndex, userDefined(col, LongIsDivisableWith3.class), 1, 2, 5, 7);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, LongIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+ 8);
builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
@@ -776,6 +1231,17 @@ public class TestColumnIndexBuilder {
assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
assertCorrectValues(columnIndex.getMaxValues(), null, 532l, null, 234l, null, 42l, null, null, -3l);
assertCorrectValues(columnIndex.getMinValues(), null, 345l, null, 42l, null, -2l, null, null, -42l);
+ assertCorrectFiltering(columnIndex, eq(col, 0l), 5);
+ assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7);
+ assertCorrectFiltering(columnIndex, notEq(col, 0l), 0, 1, 2, 3, 4, 5, 6, 7, 8);
+ assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8);
+ assertCorrectFiltering(columnIndex, gt(col, 2l), 1, 3, 5);
+ assertCorrectFiltering(columnIndex, gtEq(col, 2l), 1, 3, 5);
+ assertCorrectFiltering(columnIndex, lt(col, -42l));
+ assertCorrectFiltering(columnIndex, ltEq(col, -42l), 8);
+ assertCorrectFiltering(columnIndex, userDefined(col, LongIsDivisableWith3.class), 1, 3, 5, 8);
+ assertCorrectFiltering(columnIndex, invert(userDefined(col, LongIsDivisableWith3.class)), 0, 1, 2, 3, 4, 5, 6, 7,
+ 8);
}
@Test
@@ -1034,4 +1500,8 @@ public class TestColumnIndexBuilder {
return minMaxSize;
}
}
+
+ private static void assertCorrectFiltering(ColumnIndex ci, FilterPredicate predicate, int... expectedIndexes) {
+ TestIndexIterator.assertEquals(predicate.accept(ci), expectedIndexes);
+ }
}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java
new file mode 100644
index 0000000..d9047f2
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestIndexIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.util.Arrays;
+import java.util.PrimitiveIterator;
+
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+/**
+ * Unit test for {@link IndexIterator}.
+ */
+public class TestIndexIterator {
+ @Test
+ public void testAll() {
+ assertEquals(IndexIterator.all(10), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ }
+
+ @Test
+ public void testFilter() {
+ assertEquals(IndexIterator.filter(30, value -> value % 3 == 0), 0, 3, 6, 9, 12, 15, 18, 21, 24, 27);
+ }
+
+ @Test
+ public void testFilterTranslate() {
+ assertEquals(IndexIterator.filterTranslate(20, value -> value < 5, Math::negateExact), 0, -1, -2, -3, -4);
+ }
+
+ @Test
+ public void testRangeTranslate() {
+ assertEquals(IndexIterator.rangeTranslate(11, 18, i -> i - 10), 1, 2, 3, 4, 5, 6, 7, 8);
+ }
+
+ static void assertEquals(PrimitiveIterator.OfInt actualIt, int... expectedValues) {
+ IntList actualList = new IntArrayList();
+ actualIt.forEachRemaining((int value) -> actualList.add(value));
+ int[] actualValues = actualList.toIntArray();
+ assertArrayEquals(
+ "ExpectedValues: " + Arrays.toString(expectedValues) + " ActualValues: " + Arrays.toString(actualValues),
+ expectedValues, actualValues);
+ }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
index 6207084..1e1275c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
@@ -106,6 +106,8 @@ public class TestOffsetIndexBuilder {
offsetIndex.getCompressedPageSize(i));
assertEquals("Invalid firstRowIndex at page " + i, offset_size_rowIndex_triplets[3 * i + 2],
offsetIndex.getFirstRowIndex(i));
+ long expectedLastPageIndex = (i < pageCount - 1) ? (offset_size_rowIndex_triplets[3 * i + 5] - 1) : 999;
+ assertEquals("Invalid lastRowIndex at page " + i, expectedLastPageIndex, offsetIndex.getLastRowIndex(i, 1000));
}
}
}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
new file mode 100644
index 0000000..ec85c27
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
+import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.ASCENDING;
+import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.DESCENDING;
+import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.UNORDERED;
+import static org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.calculateRowRanges;
+import static org.apache.parquet.io.api.Binary.fromString;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Types.optional;
+import static org.junit.Assert.assertArrayEquals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.LongStream;
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.BinaryUtf8StartsWithB;
+import org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.DoubleIsInteger;
+import org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.IntegerIsDivisableWith3;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+
+/**
+ * Unit tests of {@link ColumnIndexFilter}
+ */
+public class TestColumnIndexFilter {
+ private static class CIBuilder {
+ private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
+ private final PrimitiveType type;
+ private final BoundaryOrder order;
+ private List<Boolean> nullPages = new ArrayList<>();
+ private List<Long> nullCounts = new ArrayList<>();
+ private List<ByteBuffer> minValues = new ArrayList<>();
+ private List<ByteBuffer> maxValues = new ArrayList<>();
+
+ CIBuilder(PrimitiveType type, BoundaryOrder order) {
+ this.type = type;
+ this.order = order;
+ }
+
+ CIBuilder addNullPage(long nullCount) {
+ nullPages.add(true);
+ nullCounts.add(nullCount);
+ minValues.add(EMPTY);
+ maxValues.add(EMPTY);
+ return this;
+ }
+
+ CIBuilder addPage(long nullCount, int min, int max) {
+ nullPages.add(false);
+ nullCounts.add(nullCount);
+ minValues.add(ByteBuffer.wrap(BytesUtils.intToBytes(min)));
+ maxValues.add(ByteBuffer.wrap(BytesUtils.intToBytes(max)));
+ return this;
+ }
+
+ CIBuilder addPage(long nullCount, String min, String max) {
+ nullPages.add(false);
+ nullCounts.add(nullCount);
+ minValues.add(ByteBuffer.wrap(min.getBytes(UTF_8)));
+ maxValues.add(ByteBuffer.wrap(max.getBytes(UTF_8)));
+ return this;
+ }
+
+ CIBuilder addPage(long nullCount, double min, double max) {
+ nullPages.add(false);
+ nullCounts.add(nullCount);
+ minValues.add(ByteBuffer.wrap(BytesUtils.longToBytes(Double.doubleToLongBits(min))));
+ maxValues.add(ByteBuffer.wrap(BytesUtils.longToBytes(Double.doubleToLongBits(max))));
+ return this;
+ }
+
+ ColumnIndex build() {
+ return ColumnIndexBuilder.build(type, order, nullPages, nullCounts, minValues, maxValues);
+ }
+ }
+
+ private static class OIBuilder {
+ private final OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+
+ OIBuilder addPage(long rowCount) {
+ builder.add(1234, rowCount);
+ return this;
+ }
+
+ OffsetIndex build() {
+ return builder.build();
+ }
+ }
+
+ public static class AnyInt extends UserDefinedPredicate<Integer> {
+
+ @Override
+ public boolean keep(Integer value) {
+ return true;
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Integer> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Integer> statistics) {
+ return true;
+ }
+
+ }
+
+ /**
+ * <pre>
+ * row column1 column2 column3 column4 (no column index)
+ * ------0------ ------0------ ------0------ ------0------
+ * 0. 1 Zulu 2.03
+ * ------1------ ------1------ ------1------ ------1------
+ * 1. 2 Yankee 4.67
+ * 2. 3 Xray 3.42
+ * 3. 4 Whiskey 8.71
+ * ------2------ ------2------
+ * 4. 5 Victor 0.56
+ * 5. 6 Uniform 4.30
+ * ------2------ ------3------
+ * 6. null null null
+ * ------2------ ------4------
+ * 7. 7 Tango 3.50
+ * ------3------
+ * 8. 7 null 3.14
+ * ------3------
+ * 9. 7 null null
+ * ------3------
+ * 10. null null 9.99
+ * ------4------
+ * 11. 8 Sierra 8.78
+ * ------5------
+ * 12. 9 Romeo 9.56
+ * 13. 10 Quebec 2.71
+ * ------4------
+ * 14. 11 Papa 5.71
+ * 15. 12 Oscar 4.09
+ * ------5------ ------4------ ------6------
+ * 16. 13 November null
+ * 17. 14 Mike null
+ * 18. 15 Lima 0.36
+ * 19. 16 Kilo 2.94
+ * 20. 17 Juliett 4.23
+ * ------5------ ------6------ ------7------
+ * 21. 18 India null
+ * 22. 19 Hotel 5.32
+ * ------5------
+ * 23. 20 Golf 4.17
+ * 24. 21 Foxtrot 7.92
+ * 25. 22 Echo 7.95
+ * ------6------
+ * 26. 23 Delta null
+ * ------6------
+ * 27. 24 Charlie null
+ * ------8------
+ * 28. 25 Bravo null
+ * ------7------
+ * 29. 26 Alfa null
+ * </pre>
+ */
+ private static final long TOTAL_ROW_COUNT = 30;
+ private static final ColumnIndex COLUMN1_CI = new CIBuilder(optional(INT32).named("column1"), ASCENDING)
+ .addPage(0, 1, 1)
+ .addPage(1, 2, 6)
+ .addPage(0, 7, 7)
+ .addPage(1, 7, 10)
+ .addPage(0, 11, 17)
+ .addPage(0, 18, 23)
+ .addPage(0, 24, 26)
+ .build();
+ private static final OffsetIndex COLUMN1_OI = new OIBuilder()
+ .addPage(1)
+ .addPage(6)
+ .addPage(2)
+ .addPage(5)
+ .addPage(7)
+ .addPage(6)
+ .addPage(3)
+ .build();
+ private static final ColumnIndex COLUMN2_CI = new CIBuilder(optional(BINARY).as(UTF8).named("column2"), DESCENDING)
+ .addPage(0, "Zulu", "Zulu")
+ .addPage(0, "Whiskey", "Yankee")
+ .addPage(1, "Tango", "Victor")
+ .addNullPage(3)
+ .addPage(0, "Oscar", "Sierra")
+ .addPage(0, "Juliett", "November")
+ .addPage(0, "Bravo", "India")
+ .addPage(0, "Alfa", "Alfa")
+ .build();
+ private static final OffsetIndex COLUMN2_OI = new OIBuilder()
+ .addPage(1)
+ .addPage(3)
+ .addPage(4)
+ .addPage(3)
+ .addPage(5)
+ .addPage(5)
+ .addPage(8)
+ .addPage(1)
+ .build();
+ private static final ColumnIndex COLUMN3_CI = new CIBuilder(optional(DOUBLE).named("column3"), UNORDERED)
+ .addPage(0, 2.03, 2.03)
+ .addPage(0, 0.56, 8.71)
+ .addPage(2, 3.14, 3.50)
+ .addPage(0, 2.71, 9.99)
+ .addPage(3, 0.36, 5.32)
+ .addPage(0, 4.17, 7.95)
+ .addNullPage(4)
+ .build();
+ private static final OffsetIndex COLUMN3_OI = new OIBuilder()
+ .addPage(1)
+ .addPage(5)
+ .addPage(4)
+ .addPage(6)
+ .addPage(7)
+ .addPage(3)
+ .addPage(4)
+ .build();
+ private static final ColumnIndex COLUMN4_CI = null;
+ private static final OffsetIndex COLUMN4_OI = new OIBuilder()
+ .addPage(1)
+ .addPage(3)
+ .addPage(2)
+ .addPage(1)
+ .addPage(5)
+ .addPage(4)
+ .addPage(5)
+ .addPage(7)
+ .addPage(2)
+ .build();
+ private static final ColumnIndexStore STORE = new ColumnIndexStore() {
+ @Override
+ public ColumnIndex getColumnIndex(ColumnPath column) {
+ switch (column.toDotString()) {
+ case "column1":
+ return COLUMN1_CI;
+ case "column2":
+ return COLUMN2_CI;
+ case "column3":
+ return COLUMN3_CI;
+ case "column4":
+ return COLUMN4_CI;
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public OffsetIndex getOffsetIndex(ColumnPath column) {
+ switch (column.toDotString()) {
+ case "column1":
+ return COLUMN1_OI;
+ case "column2":
+ return COLUMN2_OI;
+ case "column3":
+ return COLUMN3_OI;
+ case "column4":
+ return COLUMN4_OI;
+ default:
+ throw new MissingOffsetIndexException(column);
+ }
+ }
+ };
+
+ private static Set<ColumnPath> paths(String... columns) {
+ Set<ColumnPath> paths = new HashSet<>();
+ for (String column : columns) {
+ paths.add(ColumnPath.fromDotString(column));
+ }
+ return paths;
+ }
+
+ private static void assertAllRows(RowRanges ranges, long rowCount) {
+ LongList actualList = new LongArrayList();
+ ranges.allRows().forEachRemaining((long value) -> actualList.add(value));
+ LongList expectedList = new LongArrayList();
+ LongStream.range(0, rowCount).forEach(expectedList::add);
+ assertArrayEquals(expectedList + " != " + actualList, expectedList.toLongArray(), actualList.toLongArray());
+ }
+
+ private static void assertRows(RowRanges ranges, long... expectedRows) {
+ LongList actualList = new LongArrayList();
+ ranges.allRows().forEachRemaining((long value) -> actualList.add(value));
+ assertArrayEquals(Arrays.toString(expectedRows) + " != " + actualList, expectedRows, actualList.toLongArray());
+ }
+
+ @Test
+ public void testFiltering() {
+ Set<ColumnPath> paths = paths("column1", "column2", "column3", "column4");
+
+ assertAllRows(
+ calculateRowRanges(FilterCompat.get(
+ userDefined(intColumn("column1"), AnyInt.class)), STORE, paths, TOTAL_ROW_COUNT),
+ TOTAL_ROW_COUNT);
+ assertRows(calculateRowRanges(FilterCompat.get(
+ and(
+ and(
+ eq(intColumn("column1"), null),
+ eq(binaryColumn("column2"), null)),
+ and(
+ eq(doubleColumn("column3"), null),
+ eq(booleanColumn("column4"), null)))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ 6, 9);
+ assertRows(calculateRowRanges(FilterCompat.get(
+ and(
+ and(
+ notEq(intColumn("column1"), null),
+ notEq(binaryColumn("column2"), null)),
+ and(
+ notEq(doubleColumn("column3"), null),
+ notEq(booleanColumn("column4"), null)))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ 0, 1, 2, 3, 4, 5, 6, 7, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25);
+ assertRows(calculateRowRanges(FilterCompat.get(
+ or(
+ and(
+ lt(intColumn("column1"), 20),
+ gtEq(binaryColumn("column2"), fromString("Quebec"))),
+ and(
+ gt(doubleColumn("column3"), 5.32),
+ ltEq(binaryColumn("column4"), fromString("XYZ"))))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ 0, 1, 2, 3, 4, 5, 6, 7, 10, 11, 12, 13, 14, 15, 23, 24, 25);
+ assertRows(calculateRowRanges(FilterCompat.get(
+ and(
+ and(
+ gtEq(intColumn("column1"), 7),
+ gt(binaryColumn("column2"), fromString("India"))),
+ and(
+ eq(doubleColumn("column3"), null),
+ notEq(binaryColumn("column4"), null)))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ 7, 16, 17, 18, 19, 20);
+ assertRows(calculateRowRanges(FilterCompat.get(
+ and(
+ or(
+ invert(userDefined(intColumn("column1"), AnyInt.class)),
+ eq(binaryColumn("column2"), fromString("Echo"))),
+ eq(doubleColumn("column3"), 6.0))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ 23, 24, 25);
+ assertRows(calculateRowRanges(FilterCompat.get(
+ and(
+ userDefined(intColumn("column1"), IntegerIsDivisableWith3.class),
+ and(
+ userDefined(binaryColumn("column2"), BinaryUtf8StartsWithB.class),
+ userDefined(doubleColumn("column3"), DoubleIsInteger.class)))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ 21, 22, 23, 24, 25);
+ assertRows(calculateRowRanges(FilterCompat.get(
+ and(
+ and(
+ gtEq(intColumn("column1"), 7),
+ lt(intColumn("column1"), 11)),
+ and(
+ gt(binaryColumn("column2"), fromString("Romeo")),
+ ltEq(binaryColumn("column2"), fromString("Tango"))))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ 7, 11, 12, 13);
+ }
+
+ @Test
+ public void testFilteringOnMissingColumns() {
+ Set<ColumnPath> paths = paths("column1", "column2", "column3", "column4");
+
+ // Missing column filter is always true
+ assertAllRows(calculateRowRanges(FilterCompat.get(
+ notEq(intColumn("missing_column"), 0)),
+ STORE, paths, TOTAL_ROW_COUNT),
+ TOTAL_ROW_COUNT);
+ assertRows(calculateRowRanges(FilterCompat.get(
+ and(
+ and(
+ gtEq(intColumn("column1"), 7),
+ lt(intColumn("column1"), 11)),
+ eq(binaryColumn("missing_column"), null))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ 7, 8, 9, 10, 11, 12, 13);
+
+ // Missing column filter is always false
+ assertRows(calculateRowRanges(FilterCompat.get(
+ or(
+ and(
+ gtEq(intColumn("column1"), 7),
+ lt(intColumn("column1"), 11)),
+ notEq(binaryColumn("missing_column"), null))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ 7, 8, 9, 10, 11, 12, 13);
+ assertRows(calculateRowRanges(FilterCompat.get(
+ gt(intColumn("missing_column"), 0)),
+ STORE, paths, TOTAL_ROW_COUNT));
+ }
+
+ @Test
+ public void testFilteringWithMissingOffsetIndex() {
+ Set<ColumnPath> paths = paths("column1", "column2", "column3", "column4", "column_wo_oi");
+
+ assertAllRows(calculateRowRanges(FilterCompat.get(
+ and(
+ and(
+ gtEq(intColumn("column1"), 7),
+ lt(intColumn("column1"), 11)),
+ and(
+ gt(binaryColumn("column2"), fromString("Romeo")),
+ ltEq(binaryColumn("column_wo_oi"), fromString("Tango"))))),
+ STORE, paths, TOTAL_ROW_COUNT),
+ TOTAL_ROW_COUNT);
+ }
+
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
new file mode 100644
index 0000000..071785f
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.filter2.columnindex;
+
+import static org.apache.parquet.internal.filter2.columnindex.RowRanges.intersection;
+import static org.apache.parquet.internal.filter2.columnindex.RowRanges.union;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.PrimitiveIterator;
+
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+
+/**
+ * Unit test for {@link RowRanges}
+ */
+public class TestRowRanges {
+ private static RowRanges buildRanges(long... rowIndexes) {
+ if (rowIndexes.length == 0) {
+ return RowRanges.EMPTY;
+ }
+ OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+ for (int i = 0, n = rowIndexes.length; i < n; i += 2) {
+ long from = rowIndexes[i];
+ long to = rowIndexes[i + 1];
+ builder.add(0, 0, from);
+ builder.add(0, 0, to + 1);
+ }
+ PrimitiveIterator.OfInt pageIndexes = new PrimitiveIterator.OfInt() {
+ private int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return index < rowIndexes.length;
+ }
+
+ @Override
+ public int nextInt() {
+ int ret = index;
+ index += 2;
+ return ret;
+ }
+ };
+ return RowRanges.build(rowIndexes[rowIndexes.length - 1], pageIndexes, builder.build());
+ }
+
+ private static void assertAllRowsEqual(PrimitiveIterator.OfLong actualIt, long... expectedValues) {
+ LongList actualList = new LongArrayList();
+ actualIt.forEachRemaining((long value) -> actualList.add(value));
+ assertArrayEquals(Arrays.toString(expectedValues) + "!= " + actualList, expectedValues, actualList.toLongArray());
+ }
+
+ @Test
+ public void testCreate() {
+ RowRanges ranges = buildRanges(
+ 1, 2,
+ 3, 4,
+ 6, 7,
+ 7, 10,
+ 15, 17);
+ assertAllRowsEqual(ranges.allRows(), 1, 2, 3, 4, 6, 7, 8, 9, 10, 15, 16, 17);
+ assertEquals(12, ranges.rowCount());
+ assertTrue(ranges.isOverlapping(4, 5));
+ assertFalse(ranges.isOverlapping(5, 5));
+ assertTrue(ranges.isOverlapping(10, 14));
+ assertFalse(ranges.isOverlapping(11, 14));
+ assertFalse(ranges.isOverlapping(18, Long.MAX_VALUE));
+
+ ranges = RowRanges.single(5);
+ assertAllRowsEqual(ranges.allRows(), 0, 1, 2, 3, 4);
+ assertEquals(5, ranges.rowCount());
+ assertTrue(ranges.isOverlapping(0, 100));
+ assertFalse(ranges.isOverlapping(5, Long.MAX_VALUE));
+
+ ranges = RowRanges.EMPTY;
+ assertAllRowsEqual(ranges.allRows());
+ assertEquals(0, ranges.rowCount());
+ assertFalse(ranges.isOverlapping(0, Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testUnion() {
+ RowRanges ranges1 = buildRanges(
+ 2, 5,
+ 7, 9,
+ 14, 14,
+ 20, 24);
+ RowRanges ranges2 = buildRanges(
+ 1, 2,
+ 4, 5,
+ 11, 12,
+ 14, 15,
+ 21, 22);
+ RowRanges empty = buildRanges();
+ assertAllRowsEqual(union(ranges1, ranges2).allRows(), 1, 2, 3, 4, 5, 7, 8, 9, 11, 12, 14, 15, 20, 21, 22, 23, 24);
+ assertAllRowsEqual(union(ranges2, ranges1).allRows(), 1, 2, 3, 4, 5, 7, 8, 9, 11, 12, 14, 15, 20, 21, 22, 23, 24);
+ assertAllRowsEqual(union(ranges1, ranges1).allRows(), 2, 3, 4, 5, 7, 8, 9, 14, 20, 21, 22, 23, 24);
+ assertAllRowsEqual(union(ranges1, empty).allRows(), 2, 3, 4, 5, 7, 8, 9, 14, 20, 21, 22, 23, 24);
+ assertAllRowsEqual(union(empty, ranges1).allRows(), 2, 3, 4, 5, 7, 8, 9, 14, 20, 21, 22, 23, 24);
+ assertAllRowsEqual(union(ranges2, ranges2).allRows(), 1, 2, 4, 5, 11, 12, 14, 15, 21, 22);
+ assertAllRowsEqual(union(ranges2, empty).allRows(), 1, 2, 4, 5, 11, 12, 14, 15, 21, 22);
+ assertAllRowsEqual(union(empty, ranges2).allRows(), 1, 2, 4, 5, 11, 12, 14, 15, 21, 22);
+ assertAllRowsEqual(union(empty, empty).allRows());
+ }
+
+ @Test
+ public void testIntersection() {
+ RowRanges ranges1 = buildRanges(
+ 2, 5,
+ 7, 9,
+ 14, 14,
+ 20, 24);
+ RowRanges ranges2 = buildRanges(
+ 1, 2,
+ 6, 7,
+ 9, 9,
+ 11, 12,
+ 14, 15,
+ 21, 22);
+ RowRanges empty = buildRanges();
+ assertAllRowsEqual(intersection(ranges1, ranges2).allRows(), 2, 7, 9, 14, 21, 22);
+ assertAllRowsEqual(intersection(ranges2, ranges1).allRows(), 2, 7, 9, 14, 21, 22);
+ assertAllRowsEqual(intersection(ranges1, ranges1).allRows(), 2, 3, 4, 5, 7, 8, 9, 14, 20, 21, 22, 23, 24);
+ assertAllRowsEqual(intersection(ranges1, empty).allRows());
+ assertAllRowsEqual(intersection(empty, ranges1).allRows());
+ assertAllRowsEqual(intersection(ranges2, ranges2).allRows(), 1, 2, 6, 7, 9, 11, 12, 14, 15, 21, 22);
+ assertAllRowsEqual(intersection(ranges2, empty).allRows());
+ assertAllRowsEqual(intersection(empty, ranges2).allRows());
+ assertAllRowsEqual(intersection(empty, empty).allRows());
+ }
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 8d3e48d..5f75460 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -28,6 +28,7 @@ import org.apache.parquet.hadoop.util.HadoopCodecs;
import java.util.Map;
+import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
@@ -43,6 +44,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
boolean useStatsFilter,
boolean useDictionaryFilter,
boolean useRecordFilter,
+ boolean useColumnIndexFilter,
FilterCompat.Filter recordFilter,
MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
@@ -51,8 +53,8 @@ public class HadoopReadOptions extends ParquetReadOptions {
Map<String, String> properties,
Configuration conf) {
super(
- useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, recordFilter,
- metadataFilter, codecFactory, allocator, maxAllocationSize, properties
+ useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
+ recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties
);
this.conf = conf;
}
@@ -83,6 +85,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
useDictionaryFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
useStatsFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
+ useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true));
withCodecFactory(HadoopCodecs.newFactory(conf, 0));
withRecordFilter(getFilter(conf));
withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
@@ -95,7 +98,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
@Override
public ParquetReadOptions build() {
return new HadoopReadOptions(
- useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+ useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties,
conf);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index 4ef2460..846d3bd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -38,12 +38,14 @@ public class ParquetReadOptions {
private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
+ private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true;
private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
private final boolean useSignedStringMinMax;
private final boolean useStatsFilter;
private final boolean useDictionaryFilter;
private final boolean useRecordFilter;
+ private final boolean useColumnIndexFilter;
private final FilterCompat.Filter recordFilter;
private final ParquetMetadataConverter.MetadataFilter metadataFilter;
private final CompressionCodecFactory codecFactory;
@@ -55,6 +57,7 @@ public class ParquetReadOptions {
boolean useStatsFilter,
boolean useDictionaryFilter,
boolean useRecordFilter,
+ boolean useColumnIndexFilter,
FilterCompat.Filter recordFilter,
ParquetMetadataConverter.MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
@@ -65,6 +68,7 @@ public class ParquetReadOptions {
this.useStatsFilter = useStatsFilter;
this.useDictionaryFilter = useDictionaryFilter;
this.useRecordFilter = useRecordFilter;
+ this.useColumnIndexFilter = useColumnIndexFilter;
this.recordFilter = recordFilter;
this.metadataFilter = metadataFilter;
this.codecFactory = codecFactory;
@@ -89,6 +93,10 @@ public class ParquetReadOptions {
return useRecordFilter;
}
+ public boolean useColumnIndexFilter() {
+ return useColumnIndexFilter;
+ }
+
public FilterCompat.Filter getRecordFilter() {
return recordFilter;
}
@@ -134,6 +142,7 @@ public class ParquetReadOptions {
protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
+ protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT;
protected FilterCompat.Filter recordFilter = null;
protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER;
// the page size parameter isn't used when only using the codec factory to get decompressors
@@ -182,6 +191,15 @@ public class ParquetReadOptions {
return this;
}
+ public Builder useColumnIndexFilter(boolean useColumnIndexFilter) {
+ this.useColumnIndexFilter = useColumnIndexFilter;
+ return this;
+ }
+
+ public Builder useColumnIndexFilter() {
+ return useColumnIndexFilter(true);
+ }
+
public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) {
this.recordFilter = rowGroupFilter;
return this;
@@ -239,7 +257,7 @@ public class ParquetReadOptions {
public ParquetReadOptions build() {
return new ParquetReadOptions(
- useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+ useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties);
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 37dfd6d..5caca07 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -18,24 +18,28 @@
*/
package org.apache.parquet.hadoop;
+import static org.apache.parquet.Ints.checkedCast;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-
+import java.util.PrimitiveIterator;
import org.apache.parquet.Ints;
+import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.NotInPageFilteringModeException;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
-import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,8 +66,13 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
private final long valueCount;
private final List<DataPage> compressedPages;
private final DictionaryPage compressedDictionaryPage;
+ // null means no page synchronization is required; firstRowIndex will not be returned by the pages
+ private final OffsetIndex offsetIndex;
+ private final long rowCount;
+ private int pageIndex = 0;
- ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) {
+ ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> compressedPages,
+ DictionaryPage compressedDictionaryPage, OffsetIndex offsetIndex, long rowCount) {
this.decompressor = decompressor;
this.compressedPages = new LinkedList<DataPage>(compressedPages);
this.compressedDictionaryPage = compressedDictionaryPage;
@@ -72,6 +81,8 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
count += p.getValueCount();
}
this.valueCount = count;
+ this.offsetIndex = offsetIndex;
+ this.rowCount = rowCount;
}
@Override
@@ -85,18 +96,34 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
return null;
}
DataPage compressedPage = compressedPages.remove(0);
+ final int currentPageIndex = pageIndex++;
return compressedPage.accept(new DataPage.Visitor<DataPage>() {
@Override
public DataPage visit(DataPageV1 dataPageV1) {
try {
- return new DataPageV1(
- decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize()),
- dataPageV1.getValueCount(),
- dataPageV1.getUncompressedSize(),
- dataPageV1.getStatistics(),
- dataPageV1.getRlEncoding(),
- dataPageV1.getDlEncoding(),
- dataPageV1.getValueEncoding());
+ BytesInput decompressed = decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize());
+ if (offsetIndex == null) {
+ return new DataPageV1(
+ decompressed,
+ dataPageV1.getValueCount(),
+ dataPageV1.getUncompressedSize(),
+ dataPageV1.getStatistics(),
+ dataPageV1.getRlEncoding(),
+ dataPageV1.getDlEncoding(),
+ dataPageV1.getValueEncoding());
+ } else {
+ long firstRowIndex = offsetIndex.getFirstRowIndex(currentPageIndex);
+ return new DataPageV1(
+ decompressed,
+ dataPageV1.getValueCount(),
+ dataPageV1.getUncompressedSize(),
+ firstRowIndex,
+ checkedCast(offsetIndex.getLastRowIndex(currentPageIndex, rowCount) - firstRowIndex + 1),
+ dataPageV1.getStatistics(),
+ dataPageV1.getRlEncoding(),
+ dataPageV1.getDlEncoding(),
+ dataPageV1.getValueEncoding());
+ }
} catch (IOException e) {
throw new ParquetDecodingException("could not decompress page", e);
}
@@ -105,23 +132,49 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
@Override
public DataPage visit(DataPageV2 dataPageV2) {
if (!dataPageV2.isCompressed()) {
- return dataPageV2;
+ if (offsetIndex == null) {
+ return dataPageV2;
+ } else {
+ return DataPageV2.uncompressed(
+ dataPageV2.getRowCount(),
+ dataPageV2.getNullCount(),
+ dataPageV2.getValueCount(),
+ offsetIndex.getFirstRowIndex(currentPageIndex),
+ dataPageV2.getRepetitionLevels(),
+ dataPageV2.getDefinitionLevels(),
+ dataPageV2.getDataEncoding(),
+ dataPageV2.getData(),
+ dataPageV2.getStatistics());
+ }
}
try {
int uncompressedSize = Ints.checkedCast(
dataPageV2.getUncompressedSize()
- - dataPageV2.getDefinitionLevels().size()
- - dataPageV2.getRepetitionLevels().size());
- return DataPageV2.uncompressed(
- dataPageV2.getRowCount(),
- dataPageV2.getNullCount(),
- dataPageV2.getValueCount(),
- dataPageV2.getRepetitionLevels(),
- dataPageV2.getDefinitionLevels(),
- dataPageV2.getDataEncoding(),
- decompressor.decompress(dataPageV2.getData(), uncompressedSize),
- dataPageV2.getStatistics()
- );
+ - dataPageV2.getDefinitionLevels().size()
+ - dataPageV2.getRepetitionLevels().size());
+ BytesInput decompressed = decompressor.decompress(dataPageV2.getData(), uncompressedSize);
+ if (offsetIndex == null) {
+ return DataPageV2.uncompressed(
+ dataPageV2.getRowCount(),
+ dataPageV2.getNullCount(),
+ dataPageV2.getValueCount(),
+ dataPageV2.getRepetitionLevels(),
+ dataPageV2.getDefinitionLevels(),
+ dataPageV2.getDataEncoding(),
+ decompressed,
+ dataPageV2.getStatistics());
+ } else {
+ return DataPageV2.uncompressed(
+ dataPageV2.getRowCount(),
+ dataPageV2.getNullCount(),
+ dataPageV2.getValueCount(),
+ offsetIndex.getFirstRowIndex(currentPageIndex),
+ dataPageV2.getRepetitionLevels(),
+ dataPageV2.getDefinitionLevels(),
+ dataPageV2.getDataEncoding(),
+ decompressed,
+ dataPageV2.getStatistics());
+ }
} catch (IOException e) {
throw new ParquetDecodingException("could not decompress page", e);
}
@@ -147,9 +200,16 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
private final Map<ColumnDescriptor, ColumnChunkPageReader> readers = new HashMap<ColumnDescriptor, ColumnChunkPageReader>();
private final long rowCount;
+ private final RowRanges rowRanges;
public ColumnChunkPageReadStore(long rowCount) {
this.rowCount = rowCount;
+ rowRanges = null;
+ }
+
+ ColumnChunkPageReadStore(RowRanges rowRanges) {
+ this.rowRanges = rowRanges;
+ rowCount = rowRanges.rowCount();
}
@Override
@@ -170,6 +230,19 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
return readers.get(descriptor).readDictionaryPage();
}
+ @Override
+ public PrimitiveIterator.OfLong getRowIndexes() {
+ if (rowRanges == null) {
+ throw new NotInPageFilteringModeException("Row indexes are not available");
+ }
+ return rowRanges.allRows();
+ }
+
+ @Override
+ public boolean isInPageFilteringMode() {
+ return rowRanges != null;
+ }
+
void addColumn(ColumnDescriptor path, ColumnChunkPageReader reader) {
if (readers.put(path, reader) != null) {
throw new RuntimeException(path+ " was added twice");
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
new file mode 100644
index 0000000..448515e
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Formatter;
+import java.util.List;
+
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+/**
+ * Internal utility class to help at column index based filtering.
+ */
+class ColumnIndexFilterUtils {
+ static class OffsetRange {
+ private final long offset;
+ private long length;
+
+ private OffsetRange(long offset, int length) {
+ this.offset = offset;
+ this.length = length;
+ }
+
+ long getOffset() {
+ return offset;
+ }
+
+ long getLength() {
+ return length;
+ }
+
+ private boolean extend(long offset, int length) {
+ if (this.offset + this.length == offset) {
+ this.length += length;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private static class FilteredOffsetIndex implements OffsetIndex {
+ private final OffsetIndex offsetIndex;
+ private final int[] indexMap;
+
+ private FilteredOffsetIndex(OffsetIndex offsetIndex, int[] indexMap) {
+ this.offsetIndex = offsetIndex;
+ this.indexMap = indexMap;
+ }
+
+ @Override
+ public int getPageCount() {
+ return indexMap.length;
+ }
+
+ @Override
+ public long getOffset(int pageIndex) {
+ return offsetIndex.getOffset(indexMap[pageIndex]);
+ }
+
+ @Override
+ public int getCompressedPageSize(int pageIndex) {
+ return offsetIndex.getCompressedPageSize(indexMap[pageIndex]);
+ }
+
+ @Override
+ public long getFirstRowIndex(int pageIndex) {
+ return offsetIndex.getFirstRowIndex(indexMap[pageIndex]);
+ }
+
+ @Override
+ public long getLastRowIndex(int pageIndex, long totalRowCount) {
+ int nextIndex = indexMap[pageIndex] + 1;
+ return (nextIndex >= offsetIndex.getPageCount() ? totalRowCount : offsetIndex.getFirstRowIndex(nextIndex)) - 1;
+ }
+
+ @Override
+ public String toString() {
+ try (Formatter formatter = new Formatter()) {
+ formatter.format("%-12s %20s %16s %20s\n", "", "offset", "compressed size", "first row index");
+ for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) {
+ int index = Arrays.binarySearch(indexMap, i);
+ boolean isHidden = index < 0;
+ formatter.format("%spage-%-5d %20d %16d %20d\n",
+ isHidden ? "- " : " ",
+ isHidden ? i : index,
+ offsetIndex.getOffset(i),
+ offsetIndex.getCompressedPageSize(i),
+ offsetIndex.getFirstRowIndex(i));
+ }
+ return formatter.toString();
+ }
+ }
+ }
+
+ /*
+ * Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
+ */
+ static OffsetIndex filterOffsetIndex(OffsetIndex offsetIndex, RowRanges rowRanges, long totalRowCount) {
+ IntList indexMap = new IntArrayList();
+ for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) {
+ long from = offsetIndex.getFirstRowIndex(i);
+ if (rowRanges.isOverlapping(from, offsetIndex.getLastRowIndex(i, totalRowCount))) {
+ indexMap.add(i);
+ }
+ }
+ return new FilteredOffsetIndex(offsetIndex, indexMap.toIntArray());
+ }
+
+ static List<OffsetRange> calculateOffsetRanges(OffsetIndex offsetIndex, ColumnChunkMetaData cm,
+ long firstPageOffset) {
+ List<OffsetRange> ranges = new ArrayList<>();
+ int n = offsetIndex.getPageCount();
+ if (n > 0) {
+ OffsetRange currentRange = null;
+
+ // Add a range for the dictionary page if required
+ long rowGroupOffset = cm.getStartingPos();
+ if (rowGroupOffset < firstPageOffset) {
+ currentRange = new OffsetRange(rowGroupOffset, (int) (firstPageOffset - rowGroupOffset));
+ ranges.add(currentRange);
+ }
+
+ for (int i = 0; i < n; ++i) {
+ long offset = offsetIndex.getOffset(i);
+ int length = offsetIndex.getCompressedPageSize(i);
+ if (currentRange == null || !currentRange.extend(offset, length)) {
+ currentRange = new OffsetRange(offset, length);
+ ranges.add(currentRange);
+ }
+ }
+ }
+ return ranges;
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexStoreImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexStoreImpl.java
new file mode 100644
index 0000000..684c5f2
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexStoreImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.util.Collections.emptySet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal implementation of {@link ColumnIndexStore}.
+ */
+class ColumnIndexStoreImpl implements ColumnIndexStore {
+
+ private interface IndexStore {
+ ColumnIndex getColumnIndex();
+
+ OffsetIndex getOffsetIndex();
+ }
+
+ private class IndexStoreImpl implements IndexStore {
+ private final ColumnChunkMetaData meta;
+ private ColumnIndex columnIndex;
+ private boolean columnIndexRead;
+ private final OffsetIndex offsetIndex;
+
+ IndexStoreImpl(ColumnChunkMetaData meta) {
+ this.meta = meta;
+ OffsetIndex oi;
+ try {
+ oi = reader.readOffsetIndex(meta);
+ } catch (IOException e) {
+ // If the I/O issue still stands it will fail the reading later;
+ // otherwise we fail the filtering only with a missing offset index.
+ LOGGER.warn("Unable to read offset index for column {}", meta.getPath(), e);
+ oi = null;
+ }
+ if (oi == null) {
+ throw new MissingOffsetIndexException(meta.getPath());
+ }
+ offsetIndex = oi;
+ }
+
+ @Override
+ public ColumnIndex getColumnIndex() {
+ if (!columnIndexRead) {
+ try {
+ columnIndex = reader.readColumnIndex(meta);
+ } catch (IOException e) {
+ // If the I/O issue still stands it will fail the reading later;
+ // otherwise we fail the filtering only with a missing column index.
+ LOGGER.warn("Unable to read column index for column {}", meta.getPath(), e);
+ }
+ columnIndexRead = true;
+ }
+ return columnIndex;
+ }
+
+ @Override
+ public OffsetIndex getOffsetIndex() {
+ return offsetIndex;
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ColumnIndexStoreImpl.class);
+ // Used for columns are not in this parquet file
+ private static final IndexStore MISSING_INDEX_STORE = new IndexStore() {
+ @Override
+ public ColumnIndex getColumnIndex() {
+ return null;
+ }
+
+ @Override
+ public OffsetIndex getOffsetIndex() {
+ return null;
+ }
+ };
+ private static final ColumnIndexStoreImpl EMPTY = new ColumnIndexStoreImpl(null, new BlockMetaData(), emptySet()) {
+ @Override
+ public ColumnIndex getColumnIndex(ColumnPath column) {
+ return null;
+ }
+
+ @Override
+ public OffsetIndex getOffsetIndex(ColumnPath column) {
+ throw new MissingOffsetIndexException(column);
+ }
+ };
+
+ private final ParquetFileReader reader;
+ private final Map<ColumnPath, IndexStore> store;
+
+ /*
+ * Creates a column index store which lazily reads column/offset indexes for the columns in paths. (paths are the set
+ * of columns used for the projection)
+ */
+ static ColumnIndexStore create(ParquetFileReader reader, BlockMetaData block, Set<ColumnPath> paths) {
+ try {
+ return new ColumnIndexStoreImpl(reader, block, paths);
+ } catch (MissingOffsetIndexException e) {
+ return EMPTY;
+ }
+ }
+
+ private ColumnIndexStoreImpl(ParquetFileReader reader, BlockMetaData block, Set<ColumnPath> paths) {
+ // TODO[GS]: Offset index for every paths will be required; pre-read the consecutive ones at once?
+ // TODO[GS]: Pre-read column index based on filter?
+ this.reader = reader;
+ Map<ColumnPath, IndexStore> store = new HashMap<>();
+ for (ColumnChunkMetaData column : block.getColumns()) {
+ ColumnPath path = column.getPath();
+ if (paths.contains(path)) {
+ store.put(path, new IndexStoreImpl(column));
+ }
+ }
+ this.store = store;
+ }
+
+ @Override
+ public ColumnIndex getColumnIndex(ColumnPath column) {
+ return store.getOrDefault(column, MISSING_INDEX_STORE).getColumnIndex();
+ }
+
+ @Override
+ public OffsetIndex getOffsetIndex(ColumnPath column) {
+ return store.getOrDefault(column, MISSING_INDEX_STORE).getOffsetIndex();
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index a048878..e57f3cb 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -124,7 +124,7 @@ class InternalParquetRecordReader<T> {
LOG.info("at row " + current + ". reading next block");
long t0 = System.currentTimeMillis();
- PageReadStore pages = reader.readNextRowGroup();
+ PageReadStore pages = reader.readNextFilteredRowGroup();
if (pages == null) {
throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
}
@@ -182,7 +182,7 @@ class InternalParquetRecordReader<T> {
this.columnCount = requestedSchema.getPaths().size();
this.recordConverter = readSupport.prepareForRead(conf, fileMetadata, fileSchema, readContext);
this.strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true);
- this.total = reader.getRecordCount();
+ this.total = reader.getFilteredRecordCount();
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(options, total);
this.filterRecords = options.useRecordFilter();
reader.setRequestedSchema(requestedSchema);
@@ -204,7 +204,7 @@ class InternalParquetRecordReader<T> {
this.recordConverter = readSupport.prepareForRead(
configuration, fileMetadata, fileSchema, readContext);
this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
- this.total = reader.getRecordCount();
+ this.total = reader.getFilteredRecordCount();
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
this.filterRecords = configuration.getBoolean(RECORD_FILTERING_ENABLED, true);
reader.setRequestedSchema(requestedSchema);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index eda4745..add0e09 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -23,6 +23,8 @@ import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTI
import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.calculateOffsetRanges;
+import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.filterOffsetIndex;
import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
@@ -46,28 +48,28 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
+import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DictionaryPageReadStore;
-import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.compat.RowGroupFilter;
-
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.RowGroupFilter;
import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.DictionaryPageHeader;
@@ -76,20 +78,24 @@ import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
+import org.apache.parquet.hadoop.ColumnIndexFilterUtils.OffsetRange;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
-import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
+import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
-import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.yetus.audience.InterfaceAudience.Private;
@@ -605,6 +611,8 @@ public class ParquetFileReader implements Closeable {
private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
private final FileMetaData fileMetaData; // may be null
private final List<BlockMetaData> blocks;
+ private final List<ColumnIndexStore> blockIndexStores;
+ private final List<RowRanges> blockRowRanges;
// not final. in some cases, this may be lazily loaded for backward-compat.
private ParquetMetadata footer;
@@ -646,6 +654,8 @@ public class ParquetFileReader implements Closeable {
this.f = file.newStream();
this.options = HadoopReadOptions.builder(configuration).build();
this.blocks = filterRowGroups(blocks);
+ this.blockIndexStores = listWithNulls(this.blocks.size());
+ this.blockRowRanges = listWithNulls(this.blocks.size());
for (ColumnDescriptor col : columns) {
paths.put(ColumnPath.get(col.getPath()), col);
}
@@ -680,6 +690,8 @@ public class ParquetFileReader implements Closeable {
this.footer = footer;
this.fileMetaData = footer.getFileMetaData();
this.blocks = filterRowGroups(footer.getBlocks());
+ this.blockIndexStores = listWithNulls(this.blocks.size());
+ this.blockRowRanges = listWithNulls(this.blocks.size());
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
@@ -693,11 +705,17 @@ public class ParquetFileReader implements Closeable {
this.footer = readFooter(file, options, f, converter);
this.fileMetaData = footer.getFileMetaData();
this.blocks = filterRowGroups(footer.getBlocks());
+ this.blockIndexStores = listWithNulls(this.blocks.size());
+ this.blockRowRanges = listWithNulls(this.blocks.size());
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
}
+ private static <T> List<T> listWithNulls(int size) {
+ return Stream.generate(() -> (T) null).limit(size).collect(Collectors.toCollection(ArrayList<T>::new));
+ }
+
public ParquetMetadata getFooter() {
if (footer == null) {
try {
@@ -725,6 +743,17 @@ public class ParquetFileReader implements Closeable {
return total;
}
+ long getFilteredRecordCount() {
+ if (!options.useColumnIndexFilter()) {
+ return getRecordCount();
+ }
+ long total = 0;
+ for (int i = 0, n = blocks.size(); i < n; ++i) {
+ total += getRowRanges(i).rowCount();
+ }
+ return total;
+ }
+
/**
* @return the path for this file
* @deprecated will be removed in 2.0.0; use {@link #getFile()} instead
@@ -787,30 +816,111 @@ public class ParquetFileReader implements Closeable {
throw new RuntimeException("Illegal row group of 0 rows");
}
this.currentRowGroup = new ColumnChunkPageReadStore(block.getRowCount());
- // prepare the list of consecutive chunks to read them in one scan
- List<ConsecutiveChunkList> allChunks = new ArrayList<ConsecutiveChunkList>();
- ConsecutiveChunkList currentChunks = null;
+ // prepare the list of consecutive parts to read them in one scan
+ List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
+ ConsecutivePartList currentParts = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
ColumnDescriptor columnDescriptor = paths.get(pathKey);
if (columnDescriptor != null) {
long startingPos = mc.getStartingPos();
- // first chunk or not consecutive => new list
- if (currentChunks == null || currentChunks.endPos() != startingPos) {
- currentChunks = new ConsecutiveChunkList(startingPos);
- allChunks.add(currentChunks);
+ // first part or not consecutive => new list
+ if (currentParts == null || currentParts.endPos() != startingPos) {
+ currentParts = new ConsecutivePartList(startingPos);
+ allParts.add(currentParts);
}
- currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
+ currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
}
}
// actually read all the chunks
- for (ConsecutiveChunkList consecutiveChunks : allChunks) {
- final List<Chunk> chunks = consecutiveChunks.readAll(f);
- for (Chunk chunk : chunks) {
- currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+ ChunkListBuilder builder = new ChunkListBuilder();
+ for (ConsecutivePartList consecutiveChunks : allParts) {
+ consecutiveChunks.readAll(f, builder);
+ }
+ for (Chunk chunk : builder.build()) {
+ currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+ }
+
+ // avoid re-reading bytes the dictionary reader is used after this call
+ if (nextDictionaryReader != null) {
+ nextDictionaryReader.setRowGroup(currentRowGroup);
+ }
+
+ advanceToNextBlock();
+
+ return currentRowGroup;
+ }
+
+ /**
+ * Reads all the columns requested from the row group at the current file position. It may skip specific pages based
+ * on the column indexes according to the actual filter. As the rows are not aligned among the pages of the different
+ * columns row synchronization might be required.
+ *
+ * @return the PageReadStore which can provide PageReaders for each column
+ * @throws IOException
+ * if any I/O error occurs while reading
+ * @see {@link PageReadStore#isInPageFilteringMode()}
+ */
+ public PageReadStore readNextFilteredRowGroup() throws IOException {
+ if (currentBlock == blocks.size()) {
+ return null;
+ }
+ if (!options.useColumnIndexFilter()) {
+ return readNextRowGroup();
+ }
+ BlockMetaData block = blocks.get(currentBlock);
+ if (block.getRowCount() == 0) {
+ throw new RuntimeException("Illegal row group of 0 rows");
+ }
+ ColumnIndexStore ciStore = getColumnIndexStore(currentBlock);
+ RowRanges rowRanges = getRowRanges(currentBlock);
+ long rowCount = rowRanges.rowCount();
+ if (rowCount == 0) {
+ // There are no matching rows -> skipping this row-group
+ advanceToNextBlock();
+ return readNextFilteredRowGroup();
+ }
+ if (rowCount == block.getRowCount()) {
+ // All rows are matching -> fall back to the non-filtering path
+ return readNextRowGroup();
+ }
+
+ this.currentRowGroup = new ColumnChunkPageReadStore(rowRanges);
+ // prepare the list of consecutive parts to read them in one scan
+ ChunkListBuilder builder = new ChunkListBuilder();
+ List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
+ ConsecutivePartList currentParts = null;
+ for (ColumnChunkMetaData mc : block.getColumns()) {
+ ColumnPath pathKey = mc.getPath();
+ ColumnDescriptor columnDescriptor = paths.get(pathKey);
+ if (columnDescriptor != null) {
+ OffsetIndex offsetIndex = ciStore.getOffsetIndex(mc.getPath());
+
+ OffsetIndex filteredOffsetIndex = filterOffsetIndex(offsetIndex, rowRanges,
+ block.getRowCount());
+ for (OffsetRange range : calculateOffsetRanges(filteredOffsetIndex, mc, offsetIndex.getOffset(0))) {
+ BenchmarkCounter.incrementTotalBytes(range.getLength());
+ long startingPos = range.getOffset();
+ // first part or not consecutive => new list
+ if (currentParts == null || currentParts.endPos() != startingPos) {
+ currentParts = new ConsecutivePartList(startingPos);
+ allParts.add(currentParts);
+ }
+ ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos,
+ (int) range.getLength());
+ currentParts.addChunk(chunkDescriptor);
+ builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex);
+ }
}
}
+ // actually read all the chunks
+ for (ConsecutivePartList consecutiveChunks : allParts) {
+ consecutiveChunks.readAll(f, builder);
+ }
+ for (Chunk chunk : builder.build()) {
+ currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+ }
// avoid re-reading bytes the dictionary reader is used after this call
if (nextDictionaryReader != null) {
@@ -822,6 +932,25 @@ public class ParquetFileReader implements Closeable {
return currentRowGroup;
}
+ private ColumnIndexStore getColumnIndexStore(int blockIndex) {
+ ColumnIndexStore ciStore = blockIndexStores.get(blockIndex);
+ if (ciStore == null) {
+ ciStore = ColumnIndexStoreImpl.create(this, blocks.get(blockIndex), paths.keySet());
+ blockIndexStores.set(blockIndex, ciStore);
+ }
+ return ciStore;
+ }
+
+ private RowRanges getRowRanges(int blockIndex) {
+ RowRanges rowRanges = blockRowRanges.get(blockIndex);
+ if (rowRanges == null) {
+ rowRanges = ColumnIndexFilter.calculateRowRanges(options.getRecordFilter(), getColumnIndexStore(blockIndex),
+ paths.keySet(), blocks.get(blockIndex).getRowCount());
+ blockRowRanges.set(blockIndex, rowRanges);
+ }
+ return rowRanges;
+ }
+
public boolean skipNextRowGroup() {
return advanceToNextBlock();
}
@@ -952,6 +1081,57 @@ public class ParquetFileReader implements Closeable {
}
}
+ /*
+ * Builder to concatenate the buffers of the discontinuous parts for the same column. These parts are generated as a
+ * result of the column-index based filtering when some pages might be skipped at reading.
+ */
+ private class ChunkListBuilder {
+ private class ChunkData {
+ final List<ByteBuffer> buffers = new ArrayList<>();
+ OffsetIndex offsetIndex;
+ }
+
+ private final Map<ChunkDescriptor, ChunkData> map = new HashMap<>();
+ private ChunkDescriptor lastDescriptor;
+ private SeekableInputStream f;
+
+ void add(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
+ ChunkData data = map.get(descriptor);
+ if (data == null) {
+ data = new ChunkData();
+ map.put(descriptor, data);
+ }
+ data.buffers.addAll(buffers);
+
+ lastDescriptor = descriptor;
+ this.f = f;
+ }
+
+ void setOffsetIndex(ChunkDescriptor descriptor, OffsetIndex offsetIndex) {
+ ChunkData data = map.get(descriptor);
+ if (data == null) {
+ data = new ChunkData();
+ map.put(descriptor, data);
+ }
+ data.offsetIndex = offsetIndex;
+ }
+
+ List<Chunk> build() {
+ List<Chunk> chunks = new ArrayList<>();
+ for (Entry<ChunkDescriptor, ChunkData> entry : map.entrySet()) {
+ ChunkDescriptor descriptor = entry.getKey();
+ ChunkData data = entry.getValue();
+ if (descriptor.equals(lastDescriptor)) {
+ // because of a bug, the last chunk might be larger than descriptor.size
+ chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex));
+ } else {
+ chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex));
+ }
+ }
+ return chunks;
+ }
+ }
+
/**
* The data for a column chunk
*/
@@ -959,15 +1139,17 @@ public class ParquetFileReader implements Closeable {
protected final ChunkDescriptor descriptor;
protected final ByteBufferInputStream stream;
+ final OffsetIndex offsetIndex;
/**
- *
* @param descriptor descriptor for the chunk
* @param buffers ByteBuffers that contain the chunk
+ * @param offsetIndex the offset index for this column; might be null
*/
- public Chunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers) {
+ public Chunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, OffsetIndex offsetIndex) {
this.descriptor = descriptor;
this.stream = ByteBufferInputStream.wrap(buffers);
+ this.offsetIndex = offsetIndex;
}
protected PageHeader readPageHeader() throws IOException {
@@ -984,7 +1166,8 @@ public class ParquetFileReader implements Closeable {
PrimitiveType type = getFileMetaData().getSchema()
.getType(descriptor.col.getPath()).asPrimitiveType();
long valuesCountReadSoFar = 0;
- while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) {
+ int dataPageCountReadSoFar = 0;
+ while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
PageHeader pageHeader = readPageHeader();
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
@@ -994,8 +1177,8 @@ public class ParquetFileReader implements Closeable {
if (dictionaryPage != null) {
throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
}
- DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
- dictionaryPage =
+ DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
+ dictionaryPage =
new DictionaryPage(
this.readAsBytesInput(compressedPageSize),
uncompressedPageSize,
@@ -1019,6 +1202,7 @@ public class ParquetFileReader implements Closeable {
converter.getEncoding(dataHeaderV1.getEncoding())
));
valuesCountReadSoFar += dataHeaderV1.getNum_values();
+ ++dataPageCountReadSoFar;
break;
case DATA_PAGE_V2:
DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
@@ -1040,6 +1224,7 @@ public class ParquetFileReader implements Closeable {
dataHeaderV2.isIs_compressed()
));
valuesCountReadSoFar += dataHeaderV2.getNum_values();
+ ++dataPageCountReadSoFar;
break;
default:
LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
@@ -1047,7 +1232,7 @@ public class ParquetFileReader implements Closeable {
break;
}
}
- if (valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
+ if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException(
"Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
@@ -1056,7 +1241,13 @@ public class ParquetFileReader implements Closeable {
+ " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
}
BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
- return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage);
+ return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
+ blocks.get(currentBlock).getRowCount());
+ }
+
+ private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
+ return offsetIndex == null ? valuesCountReadSoFar < descriptor.metadata.getValueCount()
+ : dataPageCountReadSoFar < offsetIndex.getPageCount();
}
/**
@@ -1081,8 +1272,8 @@ public class ParquetFileReader implements Closeable {
* @param descriptor the descriptor of the chunk
* @param f the file stream positioned at the end of this chunk
*/
- private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
- super(descriptor, buffers);
+ private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f, OffsetIndex offsetIndex) {
+ super(descriptor, buffers, offsetIndex);
this.f = f;
}
@@ -1131,7 +1322,7 @@ public class ParquetFileReader implements Closeable {
/**
- * information needed to read a column chunk
+ * Information needed to read a column chunk or a part of it.
*/
private static class ChunkDescriptor {
@@ -1157,12 +1348,29 @@ public class ParquetFileReader implements Closeable {
this.fileOffset = fileOffset;
this.size = size;
}
+
+ @Override
+ public int hashCode() {
+ return col.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (obj instanceof ChunkDescriptor) {
+ return col.equals(((ChunkDescriptor) obj).col);
+ } else {
+ return false;
+ }
+ }
}
/**
- * describes a list of consecutive column chunks to be read at once.
+ * Describes a list of consecutive parts to be read at once. A consecutive part may contain whole column chunks or
+ * only parts of them (some pages).
*/
- private class ConsecutiveChunkList {
+ private class ConsecutivePartList {
private final long offset;
private int length;
@@ -1171,7 +1379,7 @@ public class ParquetFileReader implements Closeable {
/**
* @param offset where the first chunk starts
*/
- ConsecutiveChunkList(long offset) {
+ ConsecutivePartList(long offset) {
this.offset = offset;
}
@@ -1187,11 +1395,10 @@ public class ParquetFileReader implements Closeable {
/**
* @param f file to read the chunks from
- * @return the chunks
+ * @param builder used to build chunk list to read the pages for the different columns
* @throws IOException if there is an error while reading from the stream
*/
- public List<Chunk> readAll(SeekableInputStream f) throws IOException {
- List<Chunk> result = new ArrayList<Chunk>(chunks.size());
+ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
f.seek(offset);
int fullAllocations = length / options.getMaxAllocationSize();
@@ -1218,14 +1425,8 @@ public class ParquetFileReader implements Closeable {
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
for (int i = 0; i < chunks.size(); i++) {
ChunkDescriptor descriptor = chunks.get(i);
- if (i < chunks.size() - 1) {
- result.add(new Chunk(descriptor, stream.sliceBuffers(descriptor.size)));
- } else {
- // because of a bug, the last chunk might be larger than descriptor.size
- result.add(new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f));
- }
+ builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);
}
- return result ;
}
/**
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index 2c21e52..b8fce2f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -130,6 +130,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled";
/**
+ * key to configure whether column index filtering of pages is enabled
+ */
+ public static final String COLUMN_INDEX_FILTERING_ENABLED = "parquet.filter.columnindex.enabled";
+
+ /**
* key to turn on or off task side metadata loading (default true)
* if true then metadata is read on the task side and some tasks may finish immediately.
* if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index d9b273b..de20808 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -270,6 +270,16 @@ public class ParquetReader<T> implements Closeable {
return this;
}
+ public Builder<T> useColumnIndexFilter(boolean useColumnIndexFilter) {
+ optionsBuilder.useColumnIndexFilter(useColumnIndexFilter);
+ return this;
+ }
+
+ public Builder<T> useColumnIndexFilter() {
+ optionsBuilder.useColumnIndexFilter();
+ return this;
+ }
+
public Builder<T> withFileRange(long start, long end) {
optionsBuilder.withRange(start, end);
return this;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 5f474fd..e6aa104 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -189,9 +189,7 @@ abstract public class ColumnChunkMetaData {
/**
*
* @return column identifier
- * @deprecated will be removed in 2.0.0. Use {@link #getPrimitiveType()} instead.
*/
- @Deprecated
public ColumnPath getPath() {
return properties.getPath();
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index 7acda93..18ddca0 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -31,6 +31,7 @@ import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.schema.MessageType;
@@ -91,6 +92,11 @@ public class PhoneBookWriter {
result = 31 * result + (lat != null ? lat.hashCode() : 0);
return result;
}
+
+ @Override
+ public String toString() {
+ return "Location [lon=" + lon + ", lat=" + lat + "]";
+ }
}
public static class PhoneNumber {
@@ -129,6 +135,11 @@ public class PhoneBookWriter {
result = 31 * result + (kind != null ? kind.hashCode() : 0);
return result;
}
+
+ @Override
+ public String toString() {
+ return "PhoneNumber [number=" + number + ", kind=" + kind + "]";
+ }
}
public static class User {
@@ -183,6 +194,11 @@ public class PhoneBookWriter {
result = 31 * result + (location != null ? location.hashCode() : 0);
return result;
}
+
+ @Override
+ public String toString() {
+ return "User [id=" + id + ", name=" + name + ", phoneNumbers=" + phoneNumbers + ", location=" + location + "]";
+ }
}
public static SimpleGroup groupFromUser(User user) {
@@ -216,6 +232,56 @@ public class PhoneBookWriter {
return root;
}
+ private static User userFromGroup(Group root) {
+ return new User(getLong(root, "id"), getString(root, "name"), getPhoneNumbers(getGroup(root, "phoneNumbers")),
+ getLocation(getGroup(root, "location")));
+ }
+
+ private static List<PhoneNumber> getPhoneNumbers(Group phoneNumbers) {
+ if (phoneNumbers == null) {
+ return null;
+ }
+ List<PhoneNumber> list = new ArrayList<>();
+ for (int i = 0, n = phoneNumbers.getFieldRepetitionCount("phone"); i < n; ++i) {
+ Group phone = phoneNumbers.getGroup("phone", i);
+ list.add(new PhoneNumber(getLong(phone, "number"), getString(phone, "kind")));
+ }
+ return list;
+ }
+
+ private static Location getLocation(Group location) {
+ if (location == null) {
+ return null;
+ }
+ return new Location(getDouble(location, "lon"), getDouble(location, "lat"));
+ }
+
+ private static boolean isNull(Group group, String field) {
+ int repetition = group.getFieldRepetitionCount(field);
+ if (repetition == 0) {
+ return true;
+ } else if (repetition == 1) {
+ return false;
+ }
+ throw new AssertionError("Invalid repetitionCount " + repetition + " for field " + field + " in group " + group);
+ }
+
+ private static Long getLong(Group group, String field) {
+ return isNull(group, field) ? null : group.getLong(field, 0);
+ }
+
+ private static String getString(Group group, String field) {
+ return isNull(group, field) ? null : group.getString(field, 0);
+ }
+
+ private static Double getDouble(Group group, String field) {
+ return isNull(group, field) ? null : group.getDouble(field, 0);
+ }
+
+ private static Group getGroup(Group group, String field) {
+ return isNull(group, field) ? null : group.getGroup(field, 0);
+ }
+
public static File writeToFile(List<User> users) throws IOException {
File f = File.createTempFile("phonebook", ".parquet");
f.deleteOnExit();
@@ -229,25 +295,30 @@ public class PhoneBookWriter {
}
public static void writeToFile(File f, List<User> users) throws IOException {
- Configuration conf = new Configuration();
- GroupWriteSupport.setSchema(schema, conf);
+ write(ExampleParquetWriter.builder(new Path(f.getAbsolutePath())), users);
+ }
- ParquetWriter<Group> writer = new ParquetWriter<Group>(new Path(f.getAbsolutePath()), conf, new GroupWriteSupport());
- for (User u : users) {
- writer.write(groupFromUser(u));
+ public static void write(ParquetWriter.Builder<Group, ?> builder, List<User> users) throws IOException {
+ builder.config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+ try (ParquetWriter<Group> writer = builder.build()) {
+ for (User u : users) {
+ writer.write(groupFromUser(u));
+ }
}
- writer.close();
}
- public static List<Group> readFile(File f, Filter filter) throws IOException {
+ private static ParquetReader<Group> createReader(Path file, Filter filter) throws IOException {
Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);
- ParquetReader<Group> reader =
- ParquetReader.builder(new GroupReadSupport(), new Path(f.getAbsolutePath()))
- .withConf(conf)
- .withFilter(filter)
- .build();
+ return ParquetReader.builder(new GroupReadSupport(), file)
+ .withConf(conf)
+ .withFilter(filter)
+ .build();
+ }
+
+ public static List<Group> readFile(File f, Filter filter) throws IOException {
+ ParquetReader<Group> reader = createReader(new Path(f.getAbsolutePath()), filter);
Group current;
List<Group> users = new ArrayList<Group>();
@@ -261,6 +332,16 @@ public class PhoneBookWriter {
return users;
}
+ public static List<User> readUsers(ParquetReader.Builder<Group> builder) throws IOException {
+ ParquetReader<Group> reader = builder.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()).build();
+
+ List<User> users = new ArrayList<>();
+ for (Group group = reader.read(); group != null; group = reader.read()) {
+ users.add(userFromGroup(group));
+ }
+ return users;
+ }
+
public static void main(String[] args) throws IOException {
File f = new File(args[0]);
writeToFile(f, TestRecordLevelFilters.makeUsers());
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
new file mode 100644
index 0000000..71155ce
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.util.Collections.emptyList;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.io.api.Binary;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for high level column index based filtering.
+ */
+@RunWith(Parameterized.class)
+public class TestColumnIndexFiltering {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexFiltering.class);
+ private static final Random RANDOM = new Random(42);
+ private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
+ private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
+ private static final Path FILE_V1 = createTempFile();
+ private static final Path FILE_V2 = createTempFile();
+
+ @Parameters
+ public static Collection<Object[]> params() {
+ return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+ }
+
+ private final Path file;
+
+ public TestColumnIndexFiltering(Path file) {
+ this.file = file;
+ }
+
+ private static List<User> generateData(int rowCount) {
+ List<User> users = new ArrayList<>();
+ List<String> names = generateNames(rowCount);
+ for (int i = 0; i < rowCount; ++i) {
+ users.add(new User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
+ }
+ return users;
+ }
+
+ private static List<String> generateNames(int rowCount) {
+ List<String> list = new ArrayList<>();
+
+ // Adding fix values for filtering
+ list.add("anderson");
+ list.add("anderson");
+ list.add("miller");
+ list.add("miller");
+ list.add("miller");
+ list.add("thomas");
+ list.add("thomas");
+ list.add("williams");
+
+ int nullCount = rowCount / 100;
+
+ String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
+ int maxLength = 8;
+ for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
+ int l = RANDOM.nextInt(maxLength);
+ StringBuilder builder = new StringBuilder(l);
+ for (int j = 0; j < l; ++j) {
+ builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
+ }
+ list.add(builder.toString());
+ }
+ Collections.sort(list, (str1, str2) -> -str1.compareTo(str2));
+
+ // Adding nulls to random places
+ for (int i = 0; i < nullCount; ++i) {
+ list.add(RANDOM.nextInt(list.size()), null);
+ }
+
+ return list;
+ }
+
+ private static List<PhoneNumber> generatePhoneNumbers() {
+ int length = RANDOM.nextInt(5) - 1;
+ if (length < 0) {
+ return null;
+ }
+ List<PhoneNumber> phoneNumbers = new ArrayList<>(length);
+ for (int i = 0; i < length; ++i) {
+ // 6 digits numbers
+ long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
+ phoneNumbers.add(new PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
+ }
+ return phoneNumbers;
+ }
+
+ private static Location generateLocation(int id, int rowCount) {
+ if (RANDOM.nextDouble() < 0.01) {
+ return null;
+ }
+
+ double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
+ double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
+
+ return new Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
+ }
+
+ private static Path createTempFile() {
+ try {
+ return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
+ } catch (IOException e) {
+ throw new AssertionError("Unable to create temporary file", e);
+ }
+ }
+
+ private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering) throws IOException {
+ return readUsers(FilterCompat.get(filter), useOtherFiltering, true);
+ }
+
+ private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+ throws IOException {
+ return readUsers(FilterCompat.get(filter), useOtherFiltering, useColumnIndexFilter);
+ }
+
+ private List<User> readUsers(Filter filter, boolean useOtherFiltering) throws IOException {
+ return readUsers(filter, useOtherFiltering, true);
+ }
+
+ private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+ throws IOException {
+ return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+ .withFilter(filter)
+ .useDictionaryFilter(useOtherFiltering)
+ .useStatsFilter(useOtherFiltering)
+ .useRecordFilter(useOtherFiltering)
+ .useColumnIndexFilter(useColumnIndexFilter));
+ }
+
+ // Assumes that both lists are in the same order
+ private static void assertContains(Stream<User> expected, List<User> actual) {
+ Iterator<User> expIt = expected.iterator();
+ if (!expIt.hasNext()) {
+ return;
+ }
+ User exp = expIt.next();
+ for (User act : actual) {
+ if (act.equals(exp)) {
+ if (!expIt.hasNext()) {
+ break;
+ }
+ exp = expIt.next();
+ }
+ }
+ assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
+ }
+
+ private void assertCorrectFiltering(Predicate<User> expectedFilter, FilterPredicate actualFilter)
+ throws IOException {
+ // Check with only column index based filtering
+ List<User> result = readUsers(actualFilter, false);
+
+ assertTrue("Column-index filtering should drop some pages", result.size() < DATA.size());
+ LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
+ 100 * result.size() / DATA.size());
+ // Asserts that all the required records are in the result
+ assertContains(DATA.stream().filter(expectedFilter), result);
+ // Asserts that all the retrieved records are in the file (validating non-matching records)
+ assertContains(result.stream(), DATA);
+
+ // Check with all the filtering filtering to ensure the result contains exactly the required values
+ result = readUsers(actualFilter, true);
+ assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
+ }
+
+ @BeforeClass
+ public static void createFile() throws IOException {
+ int pageSize = DATA.size() / 10; // Ensure that several pages will be created
+ int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
+ PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
+ .withWriteMode(OVERWRITE)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withWriterVersion(WriterVersion.PARQUET_1_0),
+ DATA);
+ PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
+ .withWriteMode(OVERWRITE)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withWriterVersion(WriterVersion.PARQUET_2_0),
+ DATA);
+ }
+
+ @AfterClass
+ public static void deleteFile() throws IOException {
+ FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
+ FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+ }
+
+ @Test
+ public void testSimpleFiltering() throws IOException {
+ assertCorrectFiltering(
+ record -> record.getId() == 1234,
+ eq(longColumn("id"), 1234l));
+ assertCorrectFiltering(
+ record -> "miller".equals(record.getName()),
+ eq(binaryColumn("name"), Binary.fromString("miller")));
+ assertCorrectFiltering(
+ record -> record.getName() == null,
+ eq(binaryColumn("name"), null));
+ }
+
+ @Test
+ public void testNoFiltering() throws IOException {
+ // Column index filtering with no-op filter
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, false));
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, true));
+
+ // Column index filtering turned off
+ assertEquals(DATA.stream().filter(user -> user.getId() == 1234).collect(Collectors.toList()),
+ readUsers(eq(longColumn("id"), 1234l), true, false));
+ assertEquals(DATA.stream().filter(user -> "miller".equals(user.getName())).collect(Collectors.toList()),
+ readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), true, false));
+ assertEquals(DATA.stream().filter(user -> user.getName() == null).collect(Collectors.toList()),
+ readUsers(eq(binaryColumn("name"), null), true, false));
+
+ // Every filtering mechanism turned off
+ assertEquals(DATA, readUsers(eq(longColumn("id"), 1234l), false, false));
+ assertEquals(DATA, readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), false, false));
+ assertEquals(DATA, readUsers(eq(binaryColumn("name"), null), false, false));
+ }
+
+ @Test
+ public void testComplexFiltering() throws IOException {
+ assertCorrectFiltering(
+ record -> {
+ Location loc = record.getLocation();
+ Double lat = loc == null ? null : loc.getLat();
+ Double lon = loc == null ? null : loc.getLon();
+ return lat != null && lon != null && 37 <= lat && lat <= 70 && -21 <= lon && lon <= 35;
+ },
+ and(and(gtEq(doubleColumn("location.lat"), 37.0), ltEq(doubleColumn("location.lat"), 70.0)),
+ and(gtEq(doubleColumn("location.lon"), -21.0), ltEq(doubleColumn("location.lon"), 35.0))));
+ assertCorrectFiltering(
+ record -> {
+ Location loc = record.getLocation();
+ return loc == null || (loc.getLat() == null && loc.getLon() == null);
+ },
+ and(eq(doubleColumn("location.lat"), null), eq(doubleColumn("location.lon"), null)));
+ assertCorrectFiltering(
+ record -> {
+ String name = record.getName();
+ return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4;
+ },
+ and(lt(binaryColumn("name"), Binary.fromString("thomas")), ltEq(longColumn("id"), 3l * DATA.size() / 4)));
+ }
+
+ public static class NameStartsWithVowel extends UserDefinedPredicate<Binary> {
+ private static final Binary A = Binary.fromString("a");
+ private static final Binary B = Binary.fromString("b");
+ private static final Binary E = Binary.fromString("e");
+ private static final Binary F = Binary.fromString("f");
+ private static final Binary I = Binary.fromString("i");
+ private static final Binary J = Binary.fromString("j");
+ private static final Binary O = Binary.fromString("o");
+ private static final Binary P = Binary.fromString("p");
+ private static final Binary U = Binary.fromString("u");
+ private static final Binary V = Binary.fromString("v");
+
+ private static boolean isStartingWithVowel(String str) {
+ if (str == null || str.isEmpty()) {
+ return false;
+ }
+ switch (str.charAt(0)) {
+ case 'a':
+ case 'e':
+ case 'i':
+ case 'o':
+ case 'u':
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public boolean keep(Binary value) {
+ return value != null && isStartingWithVowel(value.toStringUsingUTF8());
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Binary> statistics) {
+ Comparator<Binary> cmp = statistics.getComparator();
+ Binary min = statistics.getMin();
+ Binary max = statistics.getMax();
+ return cmp.compare(max, A) < 0
+ || (cmp.compare(min, B) >= 0 && cmp.compare(max, E) < 0)
+ || (cmp.compare(min, F) >= 0 && cmp.compare(max, I) < 0)
+ || (cmp.compare(min, J) >= 0 && cmp.compare(max, O) < 0)
+ || (cmp.compare(min, P) >= 0 && cmp.compare(max, U) < 0)
+ || cmp.compare(min, V) >= 0;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Binary> statistics) {
+ Comparator<Binary> cmp = statistics.getComparator();
+ Binary min = statistics.getMin();
+ Binary max = statistics.getMax();
+ return (cmp.compare(min, A) >= 0 && cmp.compare(max, B) < 0)
+ || (cmp.compare(min, E) >= 0 && cmp.compare(max, F) < 0)
+ || (cmp.compare(min, I) >= 0 && cmp.compare(max, J) < 0)
+ || (cmp.compare(min, O) >= 0 && cmp.compare(max, P) < 0)
+ || (cmp.compare(min, U) >= 0 && cmp.compare(max, V) < 0);
+ }
+ }
+
+ public static class IsDivisibleBy extends UserDefinedPredicate<Long> implements Serializable {
+ private long divisor;
+
+ IsDivisibleBy(long divisor) {
+ this.divisor = divisor;
+ }
+
+ @Override
+ public boolean keep(Long value) {
+ return value != null && value % divisor == 0;
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Long> statistics) {
+ long min = statistics.getMin();
+ long max = statistics.getMax();
+ return min % divisor != 0 && max % divisor != 0 && min / divisor == max / divisor;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Long> statistics) {
+ long min = statistics.getMin();
+ long max = statistics.getMax();
+ return min == max && min % divisor == 0;
+ }
+ }
+
+ @Test
+ public void testUDF() throws IOException {
+ assertCorrectFiltering(
+ record -> NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0,
+ or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
+ userDefined(longColumn("id"), new IsDivisibleBy(234))));
+ assertCorrectFiltering(
+ record -> !(NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0),
+ not(or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
+ userDefined(longColumn("id"), new IsDivisibleBy(234)))));
+ }
+
+ @Test
+ public void testFilteringWithMissingColumns() throws IOException {
+ // Missing column filter is always true
+ assertEquals(DATA, readUsers(notEq(binaryColumn("not-existing-binary"), Binary.EMPTY), true));
+ assertCorrectFiltering(
+ record -> record.getId() == 1234,
+ and(eq(longColumn("id"), 1234l),
+ eq(longColumn("not-existing-long"), null)));
+ assertCorrectFiltering(
+ record -> "miller".equals(record.getName()),
+ and(eq(binaryColumn("name"), Binary.fromString("miller")),
+ invert(userDefined(binaryColumn("not-existing-binary"), NameStartsWithVowel.class))));
+
+ // Missing column filter is always false
+ assertEquals(emptyList(), readUsers(lt(longColumn("not-existing-long"), 0l), true));
+ assertCorrectFiltering(
+ record -> "miller".equals(record.getName()),
+ or(eq(binaryColumn("name"), Binary.fromString("miller")),
+ gtEq(binaryColumn("not-existing-binary"), Binary.EMPTY)));
+ assertCorrectFiltering(
+ record -> record.getId() == 1234,
+ or(eq(longColumn("id"), 1234l),
+ userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1))));
+ }
+}