You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2021/09/14 14:19:14 UTC

[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #923: [PARQUET-1968] FilterApi support In predicate

gszadovszky commented on a change in pull request #923:
URL: https://github.com/apache/parquet-mr/pull/923#discussion_r708260743



##########
File path: parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
##########
@@ -293,8 +290,48 @@ boolean isNullPage(int pageIndex) {
 
     @Override
     public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(In<T> in) {
-      IntSet indexes = getMatchingIndexes(in);
-      return IndexIterator.filter(getPageCount(), indexes::contains);
+      Set<T> values = in.getValues();
+      TreeSet<T> myTreeSet = new TreeSet<>();
+      IntSet matchingIndexes1 = new IntOpenHashSet();  // for null
+      Iterator<T> it = values.iterator();
+      while(it.hasNext()) {
+        T value = it.next();
+        if (value != null) {
+          myTreeSet.add(value);
+        } else {
+          if (nullCounts == null) {
+            // Searching for nulls so if we don't have null related statistics we have to return all pages
+            return IndexIterator.all(getPageCount());
+          } else {
+            for (int i = 0; i < nullCounts.length; i++) {
+              if (nullCounts[i] > 0) {
+                matchingIndexes1.add(i);
+              }
+            }
+          }
+        }
+      }
+
+      IntSet matchingIndexes2 = new IntOpenHashSet();

Review comment:
       Please choose better naming for the final implementation.

##########
File path: parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
##########
@@ -1,14 +1,14 @@
-/* 

Review comment:
       We have to validate In and NotIn for the record level filtering as well. All the tests here are validating higher levels only. (Record level filtering is when you actually read the data and check the values one by one to return only those values that are fulfill the filter. This is done by the class `IncrementallyUpdatedFilterPredicateBuilder` generated by this one.) 
   
   In would suggest checking and extending the test classes `TestRecordLevelFilters`, `TestColumnIndexFiltering` and `TestBloomFiltering`. 

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java
##########
@@ -118,6 +120,45 @@ private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
     return BLOCK_MIGHT_MATCH;
   }
 
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.In<T> in) {
+    Set<T> values = in.getValues();
+
+    if (values.contains(null)) {
+      // the bloom filter bitset contains only non-null values so isn't helpful. this
+      // could check the column stats, but the StatisticsFilter is responsible
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    Operators.Column<T> filterColumn = in.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+    if (meta == null) {
+      // the column isn't in this file so all values are null, but the value
+      // must be non-null because of the above check.
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    try {
+      BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta);
+      if (bloomFilter != null) {
+        for (T value : values) {
+          if (bloomFilter.findHash(bloomFilter.hash(value))) {
+            return BLOCK_MIGHT_MATCH;
+          }
+        }
+        return BLOCK_CANNOT_MATCH;
+      }
+    } catch (RuntimeException e) {
+      LOG.warn(e.getMessage());
+    }

Review comment:
       Why is this necessary? Shouldn't we simply allow throwing though a RuntimeExeption from bloom filters?

##########
File path: parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
##########
@@ -67,15 +67,18 @@ public void run() throws IOException {
     add("package org.apache.parquet.filter2.recordlevel;\n" +
         "\n" +
         "import java.util.List;\n" +
+        "import java.util.Set;\n" +
         "\n" +
         "import org.apache.parquet.hadoop.metadata.ColumnPath;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.Eq;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.Gt;\n" +
         "import org.apache.parquet.filter2.predicate.Operators.GtEq;\n" +
+      "import org.apache.parquet.filter2.predicate.Operators.In;\n" +

Review comment:
       Please correct indention.

##########
File path: parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java
##########
@@ -250,27 +250,16 @@ public Eq(Column<T> column, T value) {
     }
   }
 
-  // base class for In and NotIn
+  // base class for In and NotIn. In is used to filter data based on a list of values. NotIn is used to filter data that
+  // are not in the list of values.

Review comment:
       Since you have added proper comments to a public class I would suggest using the javadoc style comment.

##########
File path: parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java
##########
@@ -410,6 +410,7 @@ public void testFiltering() {
     Set<Integer> set5 = new HashSet<>();
     set5.add(7);
     set5.add(20);
+    System.out.println(in(intColumn("column5"), set5).toString());

Review comment:
       I guess this is not for the final code.

##########
File path: parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
##########
@@ -287,6 +291,27 @@ boolean isNullPage(int pageIndex) {
           pageIndex -> nullCounts[pageIndex] > 0 || matchingIndexes.contains(pageIndex));
     }
 
+    @Override

Review comment:
       I think the concept is good. You may do a similar way for `StatisticsFilter` so you do not need to check all the values in the set but the min/max values of it. In this case you might search for min/max values once inside `In` and `NotIn` instead of doing so for the two filters.
   I am not sure what is the best way to get the values of the set in the API. For example if it wasn't be a `Set` but a `List` or a vararg then you would be able to build up your own `TreeSet` by default. Or you may expect a `SortedSet` directly.
   In the other hand you do not need the whole set to be sorted. You only need the min and max and I think it is faster to search them than sorting the whole.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org