You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/29 06:12:34 UTC

[flink-table-store] branch master updated: [FLINK-27846] Remove AlwaysTrue and AlwaysFalse

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a3885cf [FLINK-27846] Remove AlwaysTrue and AlwaysFalse
9a3885cf is described below

commit 9a3885cfd9033c3e2e8e2837acde958848b814d5
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Nov 29 14:12:29 2022 +0800

    [FLINK-27846] Remove AlwaysTrue and AlwaysFalse
    
    This closes #407
---
 .../table/store/file/predicate/AlwaysFalse.java    | 46 ------------------
 .../table/store/file/predicate/AlwaysTrue.java     | 46 ------------------
 .../store/file/schema/SchemaEvolutionUtil.java     | 55 +++++++++-------------
 .../store/file/schema/SchemaEvolutionUtilTest.java | 32 +++----------
 4 files changed, 27 insertions(+), 152 deletions(-)

diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysFalse.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysFalse.java
deleted file mode 100644
index e9b77c8b..00000000
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysFalse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.predicate;
-
-import org.apache.flink.table.store.format.FieldStats;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.util.Optional;
-
-/** Return false for all values. TODO add leaf function without fields. */
-public class AlwaysFalse extends LeafUnaryFunction {
-    public static final AlwaysFalse INSTANCE = new AlwaysFalse();
-
-    private AlwaysFalse() {}
-
-    @Override
-    public Optional<LeafFunction> negate() {
-        return Optional.of(AlwaysTrue.INSTANCE);
-    }
-
-    @Override
-    public boolean test(LogicalType type, Object value) {
-        return false;
-    }
-
-    @Override
-    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats) {
-        return false;
-    }
-}
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysTrue.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysTrue.java
deleted file mode 100644
index 951719c7..00000000
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysTrue.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.predicate;
-
-import org.apache.flink.table.store.format.FieldStats;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.util.Optional;
-
-/** Return true for all values. TODO add leaf function without fields. */
-public class AlwaysTrue extends LeafUnaryFunction {
-    public static final AlwaysTrue INSTANCE = new AlwaysTrue();
-
-    private AlwaysTrue() {}
-
-    @Override
-    public Optional<LeafFunction> negate() {
-        return Optional.of(AlwaysFalse.INSTANCE);
-    }
-
-    @Override
-    public boolean test(LogicalType type, Object value) {
-        return true;
-    }
-
-    @Override
-    public boolean test(LogicalType type, long rowCount, FieldStats fieldStats) {
-        return true;
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
index af1d0b8e..53a523ee 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
@@ -20,10 +20,7 @@ package org.apache.flink.table.store.file.schema;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.predicate.AlwaysFalse;
-import org.apache.flink.table.store.file.predicate.AlwaysTrue;
 import org.apache.flink.table.store.file.predicate.CompoundPredicate;
-import org.apache.flink.table.store.file.predicate.IsNull;
 import org.apache.flink.table.store.file.predicate.LeafPredicate;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.utils.ProjectedRowData;
@@ -36,6 +33,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -247,8 +245,7 @@ public class SchemaEvolutionUtil {
 
     /**
      * Create predicate list from data fields. We will visit all predicate in filters, reset it's
-     * field index, name and type, and use {@link AlwaysFalse} or {@link AlwaysTrue} if the field is
-     * not exist.
+     * field index, name and type, and ignore predicate if the field is not exist.
      *
      * @param tableFields the table fields
      * @param dataFields the underlying data fields
@@ -268,12 +265,13 @@ public class SchemaEvolutionUtil {
         dataFields.forEach(f -> idToDataFields.put(f.id(), f));
         List<Predicate> dataFilters = new ArrayList<>(filters.size());
         for (Predicate predicate : filters) {
-            dataFilters.add(createDataPredicate(nameToTableFields, idToDataFields, predicate));
+            createDataPredicate(nameToTableFields, idToDataFields, predicate)
+                    .ifPresent(dataFilters::add);
         }
         return dataFilters;
     }
 
-    private static Predicate createDataPredicate(
+    private static Optional<Predicate> createDataPredicate(
             Map<String, DataField> tableFields,
             LinkedHashMap<Integer, DataField> dataFields,
             Predicate predicate) {
@@ -282,45 +280,34 @@ public class SchemaEvolutionUtil {
             List<Predicate> children = compoundPredicate.children();
             List<Predicate> dataChildren = new ArrayList<>(children.size());
             for (Predicate child : children) {
-                Predicate dataPredicate = createDataPredicate(tableFields, dataFields, child);
-                dataChildren.add(dataPredicate);
+                Optional<Predicate> childPredicate =
+                        createDataPredicate(tableFields, dataFields, child);
+                if (childPredicate.isPresent()) {
+                    dataChildren.add(childPredicate.get());
+                } else {
+                    return Optional.empty();
+                }
             }
-            return new CompoundPredicate(compoundPredicate.function(), dataChildren);
+            return Optional.of(new CompoundPredicate(compoundPredicate.function(), dataChildren));
         } else if (predicate instanceof LeafPredicate) {
             LeafPredicate leafPredicate = (LeafPredicate) predicate;
-
             DataField tableField =
                     checkNotNull(
                             tableFields.get(leafPredicate.fieldName()),
                             String.format("Find no field %s", leafPredicate.fieldName()));
             DataField dataField = dataFields.get(tableField.id());
             if (dataField == null) {
-                // The table field is not exist in data fields, check the predicate function
-                if (leafPredicate.function() instanceof IsNull) {
-                    // Just get the first value
-                    return new LeafPredicate(
-                            AlwaysTrue.INSTANCE,
-                            leafPredicate.type(),
-                            0,
-                            null,
-                            leafPredicate.literals());
-                } else {
-                    return new LeafPredicate(
-                            AlwaysFalse.INSTANCE,
-                            leafPredicate.type(),
-                            0,
-                            null,
-                            leafPredicate.literals());
-                }
+                return Optional.empty();
             }
 
             /// TODO Should deal with column type schema evolution here
-            return new LeafPredicate(
-                    leafPredicate.function(),
-                    leafPredicate.type(),
-                    indexOf(dataField, dataFields),
-                    dataField.name(),
-                    leafPredicate.literals());
+            return Optional.of(
+                    new LeafPredicate(
+                            leafPredicate.function(),
+                            leafPredicate.type(),
+                            indexOf(dataField, dataFields),
+                            dataField.name(),
+                            leafPredicate.literals()));
         } else {
             throw new UnsupportedOperationException(
                     String.format(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
index 073c45ac..b46e4b77 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
@@ -19,13 +19,9 @@
 package org.apache.flink.table.store.file.schema;
 
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.store.file.predicate.AlwaysFalse;
-import org.apache.flink.table.store.file.predicate.AlwaysTrue;
-import org.apache.flink.table.store.file.predicate.CompoundPredicate;
 import org.apache.flink.table.store.file.predicate.IsNotNull;
 import org.apache.flink.table.store.file.predicate.IsNull;
 import org.apache.flink.table.store.file.predicate.LeafPredicate;
-import org.apache.flink.table.store.file.predicate.Or;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.utils.Projection;
 
@@ -160,9 +156,8 @@ public class SchemaEvolutionUtilTest {
 
     @Test
     public void testCreateDataFilters() {
-        List<Predicate> children = new ArrayList<>();
-        CompoundPredicate predicate = new CompoundPredicate(Or.INSTANCE, children);
-        children.add(
+        List<Predicate> predicates = new ArrayList<>();
+        predicates.add(
                 new LeafPredicate(
                         IsNull.INSTANCE,
                         DataTypes.INT().getLogicalType(),
@@ -170,7 +165,7 @@ public class SchemaEvolutionUtilTest {
                         "c",
                         Collections.emptyList()));
         // Field 9->e is not exist in data
-        children.add(
+        predicates.add(
                 new LeafPredicate(
                         IsNotNull.INSTANCE,
                         DataTypes.INT().getLogicalType(),
@@ -178,7 +173,7 @@ public class SchemaEvolutionUtilTest {
                         "e",
                         Collections.emptyList()));
         // Field 7->a is not exist in data
-        children.add(
+        predicates.add(
                 new LeafPredicate(
                         IsNull.INSTANCE,
                         DataTypes.INT().getLogicalType(),
@@ -187,28 +182,13 @@ public class SchemaEvolutionUtilTest {
                         Collections.emptyList()));
 
         List<Predicate> filters =
-                SchemaEvolutionUtil.createDataFilters(
-                        tableFields2, dataFields, Collections.singletonList(predicate));
+                SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates);
         assert filters != null;
         assertThat(filters.size()).isEqualTo(1);
 
-        CompoundPredicate dataFilter = (CompoundPredicate) filters.get(0);
-        assertThat(dataFilter.function()).isEqualTo(Or.INSTANCE);
-        assertThat(dataFilter.children().size()).isEqualTo(3);
-
-        LeafPredicate child1 = (LeafPredicate) dataFilter.children().get(0);
+        LeafPredicate child1 = (LeafPredicate) filters.get(0);
         assertThat(child1.function()).isEqualTo(IsNull.INSTANCE);
         assertThat(child1.fieldName()).isEqualTo("b");
         assertThat(child1.index()).isEqualTo(1);
-
-        LeafPredicate child2 = (LeafPredicate) dataFilter.children().get(1);
-        assertThat(child2.function()).isEqualTo(AlwaysFalse.INSTANCE);
-        assertThat(child2.fieldName()).isNull();
-        assertThat(child2.index()).isEqualTo(0);
-
-        LeafPredicate child3 = (LeafPredicate) dataFilter.children().get(2);
-        assertThat(child3.function()).isEqualTo(AlwaysTrue.INSTANCE);
-        assertThat(child3.fieldName()).isNull();
-        assertThat(child3.index()).isEqualTo(0);
     }
 }