You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2017/01/19 01:27:43 UTC

[32/50] [abbrv] parquet-mr git commit: PARQUET-654: Add option to disable record-level filtering.

PARQUET-654: Add option to disable record-level filtering.

This can be used by frameworks that use codegen for filtering to avoid
running filters within Parquet.

Author: Ryan Blue <bl...@apache.org>

Closes #353 from rdblue/PARQUET-654-add-record-level-filter-option and squashes the following commits:

b497e7f [Ryan Blue] PARQUET-654: Add option to disable record-level filtering.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/6b60c79e
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/6b60c79e
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/6b60c79e

Branch: refs/heads/parquet-1.8.x
Commit: 6b60c79e59e3e9a67dc0c2339675d5eb6cd1b510
Parents: d59381a
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Jul 13 14:50:08 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Mon Jan 9 16:54:54 2017 -0800

----------------------------------------------------------------------
 .../hadoop/InternalParquetRecordReader.java       |  8 +++++++-
 .../apache/parquet/hadoop/ParquetFileReader.java  | 10 ++++++++--
 .../apache/parquet/hadoop/ParquetInputFormat.java | 18 ++++++++++++++++++
 3 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b60c79e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index f74e57c..d43fd7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -46,6 +46,8 @@ import org.apache.parquet.schema.MessageType;
 import static java.lang.String.format;
 import static org.apache.parquet.Log.DEBUG;
 import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED_DEFAULT;
 import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
 
 class InternalParquetRecordReader<T> {
@@ -53,6 +55,7 @@ class InternalParquetRecordReader<T> {
 
   private ColumnIOFactory columnIOFactory = null;
   private final Filter filter;
+  private boolean filterRecords = true;
 
   private MessageType requestedSchema;
   private MessageType fileSchema;
@@ -130,7 +133,8 @@ class InternalParquetRecordReader<T> {
       if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
       if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
       MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
-      recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
+      recordReader = columnIO.getRecordReader(pages, recordConverter,
+          filterRecords ? filter : FilterCompat.NOOP);
       startedAssemblingCurrentBlockAt = System.currentTimeMillis();
       totalCountLoadedSoFar += pages.getRowCount();
       ++ currentBlock;
@@ -173,6 +177,8 @@ class InternalParquetRecordReader<T> {
     this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
     this.total = reader.getRecordCount();
     this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
+    this.filterRecords = configuration.getBoolean(
+        RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT);
     LOG.info("RecordReader initialized will read a total of " + total + " records.");
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b60c79e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 3c4c6fc..2d1c62b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -28,6 +28,10 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromP
 import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
+import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED_DEFAULT;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED_DEFAULT;
 
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
@@ -598,11 +602,13 @@ public class ParquetFileReader implements Closeable {
     // set up data filters based on configured levels
     List<RowGroupFilter.FilterLevel> levels = new ArrayList<RowGroupFilter.FilterLevel>();
 
-    if (conf.getBoolean("parquet.filter.statistics.enabled", true)) {
+    if (conf.getBoolean(
+        STATS_FILTERING_ENABLED, STATS_FILTERING_ENABLED_DEFAULT)) {
       levels.add(STATISTICS);
     }
 
-    if (conf.getBoolean("parquet.filter.dictionary.enabled", false)) {
+    if (conf.getBoolean(
+        DICTIONARY_FILTERING_ENABLED, DICTIONARY_FILTERING_ENABLED_DEFAULT)) {
       levels.add(DICTIONARY);
     }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b60c79e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index e3536d7..1fe57f9 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -116,6 +116,24 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
 
   /**
+   * key to configure whether record-level filtering is enabled
+   */
+  public static final String RECORD_FILTERING_ENABLED = "parquet.filter.record-level.enabled";
+  static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
+
+  /**
+   * key to configure whether row group stats filtering is enabled
+   */
+  public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled";
+  static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
+
+  /**
+   * key to configure whether row group dictionary filtering is enabled
+   */
+  public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled";
+  static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = false;
+
+  /**
    * key to turn on or off task side metadata loading (default true)
    * if true then metadata is read on the task side and some tasks may finish immediately.
    * if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.