You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2017/01/19 01:27:43 UTC
[32/50] [abbrv] parquet-mr git commit: PARQUET-654: Add option to
disable record-level filtering.
PARQUET-654: Add option to disable record-level filtering.
This can be used by frameworks that use codegen for filtering to avoid
running filters within Parquet.
Author: Ryan Blue <bl...@apache.org>
Closes #353 from rdblue/PARQUET-654-add-record-level-filter-option and squashes the following commits:
b497e7f [Ryan Blue] PARQUET-654: Add option to disable record-level filtering.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/6b60c79e
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/6b60c79e
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/6b60c79e
Branch: refs/heads/parquet-1.8.x
Commit: 6b60c79e59e3e9a67dc0c2339675d5eb6cd1b510
Parents: d59381a
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Jul 13 14:50:08 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Mon Jan 9 16:54:54 2017 -0800
----------------------------------------------------------------------
.../hadoop/InternalParquetRecordReader.java | 8 +++++++-
.../apache/parquet/hadoop/ParquetFileReader.java | 10 ++++++++--
.../apache/parquet/hadoop/ParquetInputFormat.java | 18 ++++++++++++++++++
3 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b60c79e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index f74e57c..d43fd7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -46,6 +46,8 @@ import org.apache.parquet.schema.MessageType;
import static java.lang.String.format;
import static org.apache.parquet.Log.DEBUG;
import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED_DEFAULT;
import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
class InternalParquetRecordReader<T> {
@@ -53,6 +55,7 @@ class InternalParquetRecordReader<T> {
private ColumnIOFactory columnIOFactory = null;
private final Filter filter;
+ private boolean filterRecords = true;
private MessageType requestedSchema;
private MessageType fileSchema;
@@ -130,7 +133,8 @@ class InternalParquetRecordReader<T> {
if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
- recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
+ recordReader = columnIO.getRecordReader(pages, recordConverter,
+ filterRecords ? filter : FilterCompat.NOOP);
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
totalCountLoadedSoFar += pages.getRowCount();
++ currentBlock;
@@ -173,6 +177,8 @@ class InternalParquetRecordReader<T> {
this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
this.total = reader.getRecordCount();
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
+ this.filterRecords = configuration.getBoolean(
+ RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT);
LOG.info("RecordReader initialized will read a total of " + total + " records.");
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b60c79e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 3c4c6fc..2d1c62b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -28,6 +28,10 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromP
import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
+import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED_DEFAULT;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED_DEFAULT;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
@@ -598,11 +602,13 @@ public class ParquetFileReader implements Closeable {
// set up data filters based on configured levels
List<RowGroupFilter.FilterLevel> levels = new ArrayList<RowGroupFilter.FilterLevel>();
- if (conf.getBoolean("parquet.filter.statistics.enabled", true)) {
+ if (conf.getBoolean(
+ STATS_FILTERING_ENABLED, STATS_FILTERING_ENABLED_DEFAULT)) {
levels.add(STATISTICS);
}
- if (conf.getBoolean("parquet.filter.dictionary.enabled", false)) {
+ if (conf.getBoolean(
+ DICTIONARY_FILTERING_ENABLED, DICTIONARY_FILTERING_ENABLED_DEFAULT)) {
levels.add(DICTIONARY);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b60c79e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index e3536d7..1fe57f9 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -116,6 +116,24 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
/**
+ * key to configure whether record-level filtering is enabled
+ */
+ public static final String RECORD_FILTERING_ENABLED = "parquet.filter.record-level.enabled";
+ static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
+
+ /**
+ * key to configure whether row group stats filtering is enabled
+ */
+ public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled";
+ static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
+
+ /**
+ * key to configure whether row group dictionary filtering is enabled
+ */
+ public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled";
+ static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = false;
+
+ /**
* key to turn on or off task side metadata loading (default true)
* if true then metadata is read on the task side and some tasks may finish immediately.
* if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.