You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 08:30:31 UTC

[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #212: [FLINK-28285] Push filter into orc reader

tsreaper commented on code in PR #212:
URL: https://github.com/apache/flink-table-store/pull/212#discussion_r919747108


##########
flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java:
##########
@@ -88,19 +91,39 @@ public Predicate startsWith(int idx, Object patternLiteral) {
     }
 
     public Predicate leaf(NullFalseLeafBinaryFunction function, int idx, Object literal) {
-        return new LeafPredicate(function, rowType.getTypeAt(idx), idx, singletonList(literal));
+        RowType.RowField field = rowType.getFields().get(idx);
+        return new LeafPredicate(
+                function, field.getType(), idx, field.getName(), singletonList(literal));
     }
 
     public Predicate leaf(LeafUnaryFunction function, int idx) {
-        return new LeafPredicate(function, rowType.getTypeAt(idx), idx, Collections.emptyList());
+        RowType.RowField field = rowType.getFields().get(idx);
+        return new LeafPredicate(
+                function, field.getType(), idx, field.getName(), Collections.emptyList());
     }
 
     public Predicate in(int idx, List<Object> literals) {
-        return new LeafPredicate(In.INSTANCE, rowType.getTypeAt(idx), idx, literals);
+        RowType.RowField field = rowType.getFields().get(idx);
+        LeafPredicate in =
+                new LeafPredicate(In.INSTANCE, field.getType(), idx, field.getName(), literals);
+        if (literals.size() > 20) {
+            return in;
+        }
+
+        List<Predicate> equals = new ArrayList<>(literals.size());
+        for (Object literal : literals) {
+            if (literal == null) {
+                // We can not skip NULL, consider negate -> NOT IN
+                return in;
+            }

Review Comment:
   It is OK not to check this. `Equal` and `NotEqual` will both return `false` when literal is `null`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java:
##########
@@ -142,15 +145,40 @@ protected void withNonPartitionFilter(Predicate predicate) {
                                 tableSchema.fieldNames(),
                                 tableSchema.trimmedPrimaryKeys());
                 if (keyFilters.size() > 0) {
-                    scan.withKeyFilter(PredicateBuilder.and(keyFilters));
+                    scan.withKeyFilter(and(keyFilters));
                 }
             }
         };
     }
 
     @Override
     public TableRead newRead() {
+        List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
+        Set<String> nonPrimaryKeys =
+                tableSchema.fieldNames().stream()
+                        .filter(name -> !primaryKeys.contains(name))
+                        .collect(Collectors.toSet());
         return new KeyValueTableRead(store.newRead()) {
+
+            @Override
+            public TableRead withFilter(Predicate predicate) {
+                List<Predicate> predicates = new ArrayList<>();
+                for (Predicate sub : splitAnd(predicate)) {
+                    // TODO support value filter
+                    if (containsFields(sub, nonPrimaryKeys)) {
+                        continue;
+                    }
+
+                    // TODO Actually, the index is wrong, but it is OK. The orc filter
+                    // just use name instead of index.
+                    predicates.add(sub);

Review Comment:
   Are the names really matched? Key field names are prefixed with `_KEY_` but I suspect field names in predicate don't have this prefix.



##########
flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java:
##########
@@ -60,6 +60,10 @@ public void writeSinglePartition() throws Exception {
         testWritePreemptMemory(true);
     }
 
+    @Override
+    @Test
+    public void testReadFilter() {}

Review Comment:
   Why removing this test?



##########
flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java:
##########
@@ -209,6 +209,47 @@ private void writeData() throws Exception {
         write.close();
     }
 
+    @Override
+    @Test
+    public void testReadFilter() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
+
+        write.write(GenericRowData.of(1, 10, 100L));
+        write.write(GenericRowData.of(1, 20, 200L));
+        commit.commit("0", write.prepareCommit());
+
+        write.write(GenericRowData.of(1, 30, 300L));
+        write.write(GenericRowData.of(1, 40, 400L));
+        commit.commit("0", write.prepareCommit());

Review Comment:
   Although it does not affect the test results, different commits should have different identifiers.



##########
flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java:
##########
@@ -88,19 +91,39 @@ public Predicate startsWith(int idx, Object patternLiteral) {
     }
 
     public Predicate leaf(NullFalseLeafBinaryFunction function, int idx, Object literal) {
-        return new LeafPredicate(function, rowType.getTypeAt(idx), idx, singletonList(literal));
+        RowType.RowField field = rowType.getFields().get(idx);
+        return new LeafPredicate(
+                function, field.getType(), idx, field.getName(), singletonList(literal));
     }
 
     public Predicate leaf(LeafUnaryFunction function, int idx) {
-        return new LeafPredicate(function, rowType.getTypeAt(idx), idx, Collections.emptyList());
+        RowType.RowField field = rowType.getFields().get(idx);
+        return new LeafPredicate(
+                function, field.getType(), idx, field.getName(), Collections.emptyList());
     }
 
     public Predicate in(int idx, List<Object> literals) {
-        return new LeafPredicate(In.INSTANCE, rowType.getTypeAt(idx), idx, literals);
+        RowType.RowField field = rowType.getFields().get(idx);
+        LeafPredicate in =
+                new LeafPredicate(In.INSTANCE, field.getType(), idx, field.getName(), literals);
+        if (literals.size() > 20) {

Review Comment:
   Add more tests for `IN` predicate. Check if the result is correct with less (more) than 20 literals, and with (or without) nulls.



##########
flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java:
##########
@@ -95,15 +103,15 @@ public boolean equals(Object o) {
             return false;
         }
         LeafPredicate that = (LeafPredicate) o;
-        return index == that.index
+        return fieldIndex == that.fieldIndex
                 && Objects.equals(function, that.function)
                 && Objects.equals(type, that.type)
                 && Objects.equals(literals, that.literals);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(function, type, index, literals);
+        return Objects.hash(function, type, fieldIndex, literals);
     }

Review Comment:
   What about `fieldName`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org