You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/29 06:12:34 UTC
[flink-table-store] branch master updated: [FLINK-27846] Remove AlwaysTrue and AlwaysFalse
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 9a3885cf [FLINK-27846] Remove AlwaysTrue and AlwaysFalse
9a3885cf is described below
commit 9a3885cfd9033c3e2e8e2837acde958848b814d5
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Nov 29 14:12:29 2022 +0800
[FLINK-27846] Remove AlwaysTrue and AlwaysFalse
This closes #407
---
.../table/store/file/predicate/AlwaysFalse.java | 46 ------------------
.../table/store/file/predicate/AlwaysTrue.java | 46 ------------------
.../store/file/schema/SchemaEvolutionUtil.java | 55 +++++++++-------------
.../store/file/schema/SchemaEvolutionUtilTest.java | 32 +++----------
4 files changed, 27 insertions(+), 152 deletions(-)
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysFalse.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysFalse.java
deleted file mode 100644
index e9b77c8b..00000000
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysFalse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.predicate;
-
-import org.apache.flink.table.store.format.FieldStats;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.util.Optional;
-
-/** Return false for all values. TODO add leaf function without fields. */
-public class AlwaysFalse extends LeafUnaryFunction {
- public static final AlwaysFalse INSTANCE = new AlwaysFalse();
-
- private AlwaysFalse() {}
-
- @Override
- public Optional<LeafFunction> negate() {
- return Optional.of(AlwaysTrue.INSTANCE);
- }
-
- @Override
- public boolean test(LogicalType type, Object value) {
- return false;
- }
-
- @Override
- public boolean test(LogicalType type, long rowCount, FieldStats fieldStats) {
- return false;
- }
-}
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysTrue.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysTrue.java
deleted file mode 100644
index 951719c7..00000000
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/AlwaysTrue.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.predicate;
-
-import org.apache.flink.table.store.format.FieldStats;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.util.Optional;
-
-/** Return true for all values. TODO add leaf function without fields. */
-public class AlwaysTrue extends LeafUnaryFunction {
- public static final AlwaysTrue INSTANCE = new AlwaysTrue();
-
- private AlwaysTrue() {}
-
- @Override
- public Optional<LeafFunction> negate() {
- return Optional.of(AlwaysFalse.INSTANCE);
- }
-
- @Override
- public boolean test(LogicalType type, Object value) {
- return true;
- }
-
- @Override
- public boolean test(LogicalType type, long rowCount, FieldStats fieldStats) {
- return true;
- }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
index af1d0b8e..53a523ee 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
@@ -20,10 +20,7 @@ package org.apache.flink.table.store.file.schema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.predicate.AlwaysFalse;
-import org.apache.flink.table.store.file.predicate.AlwaysTrue;
import org.apache.flink.table.store.file.predicate.CompoundPredicate;
-import org.apache.flink.table.store.file.predicate.IsNull;
import org.apache.flink.table.store.file.predicate.LeafPredicate;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.utils.ProjectedRowData;
@@ -36,6 +33,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -247,8 +245,7 @@ public class SchemaEvolutionUtil {
/**
* Create predicate list from data fields. We will visit all predicate in filters, reset it's
- * field index, name and type, and use {@link AlwaysFalse} or {@link AlwaysTrue} if the field is
- * not exist.
+ * field index, name and type, and ignore predicate if the field is not exist.
*
* @param tableFields the table fields
* @param dataFields the underlying data fields
@@ -268,12 +265,13 @@ public class SchemaEvolutionUtil {
dataFields.forEach(f -> idToDataFields.put(f.id(), f));
List<Predicate> dataFilters = new ArrayList<>(filters.size());
for (Predicate predicate : filters) {
- dataFilters.add(createDataPredicate(nameToTableFields, idToDataFields, predicate));
+ createDataPredicate(nameToTableFields, idToDataFields, predicate)
+ .ifPresent(dataFilters::add);
}
return dataFilters;
}
- private static Predicate createDataPredicate(
+ private static Optional<Predicate> createDataPredicate(
Map<String, DataField> tableFields,
LinkedHashMap<Integer, DataField> dataFields,
Predicate predicate) {
@@ -282,45 +280,34 @@ public class SchemaEvolutionUtil {
List<Predicate> children = compoundPredicate.children();
List<Predicate> dataChildren = new ArrayList<>(children.size());
for (Predicate child : children) {
- Predicate dataPredicate = createDataPredicate(tableFields, dataFields, child);
- dataChildren.add(dataPredicate);
+ Optional<Predicate> childPredicate =
+ createDataPredicate(tableFields, dataFields, child);
+ if (childPredicate.isPresent()) {
+ dataChildren.add(childPredicate.get());
+ } else {
+ return Optional.empty();
+ }
}
- return new CompoundPredicate(compoundPredicate.function(), dataChildren);
+ return Optional.of(new CompoundPredicate(compoundPredicate.function(), dataChildren));
} else if (predicate instanceof LeafPredicate) {
LeafPredicate leafPredicate = (LeafPredicate) predicate;
-
DataField tableField =
checkNotNull(
tableFields.get(leafPredicate.fieldName()),
String.format("Find no field %s", leafPredicate.fieldName()));
DataField dataField = dataFields.get(tableField.id());
if (dataField == null) {
- // The table field is not exist in data fields, check the predicate function
- if (leafPredicate.function() instanceof IsNull) {
- // Just get the first value
- return new LeafPredicate(
- AlwaysTrue.INSTANCE,
- leafPredicate.type(),
- 0,
- null,
- leafPredicate.literals());
- } else {
- return new LeafPredicate(
- AlwaysFalse.INSTANCE,
- leafPredicate.type(),
- 0,
- null,
- leafPredicate.literals());
- }
+ return Optional.empty();
}
/// TODO Should deal with column type schema evolution here
- return new LeafPredicate(
- leafPredicate.function(),
- leafPredicate.type(),
- indexOf(dataField, dataFields),
- dataField.name(),
- leafPredicate.literals());
+ return Optional.of(
+ new LeafPredicate(
+ leafPredicate.function(),
+ leafPredicate.type(),
+ indexOf(dataField, dataFields),
+ dataField.name(),
+ leafPredicate.literals()));
} else {
throw new UnsupportedOperationException(
String.format(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
index 073c45ac..b46e4b77 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
@@ -19,13 +19,9 @@
package org.apache.flink.table.store.file.schema;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.store.file.predicate.AlwaysFalse;
-import org.apache.flink.table.store.file.predicate.AlwaysTrue;
-import org.apache.flink.table.store.file.predicate.CompoundPredicate;
import org.apache.flink.table.store.file.predicate.IsNotNull;
import org.apache.flink.table.store.file.predicate.IsNull;
import org.apache.flink.table.store.file.predicate.LeafPredicate;
-import org.apache.flink.table.store.file.predicate.Or;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.utils.Projection;
@@ -160,9 +156,8 @@ public class SchemaEvolutionUtilTest {
@Test
public void testCreateDataFilters() {
- List<Predicate> children = new ArrayList<>();
- CompoundPredicate predicate = new CompoundPredicate(Or.INSTANCE, children);
- children.add(
+ List<Predicate> predicates = new ArrayList<>();
+ predicates.add(
new LeafPredicate(
IsNull.INSTANCE,
DataTypes.INT().getLogicalType(),
@@ -170,7 +165,7 @@ public class SchemaEvolutionUtilTest {
"c",
Collections.emptyList()));
// Field 9->e is not exist in data
- children.add(
+ predicates.add(
new LeafPredicate(
IsNotNull.INSTANCE,
DataTypes.INT().getLogicalType(),
@@ -178,7 +173,7 @@ public class SchemaEvolutionUtilTest {
"e",
Collections.emptyList()));
// Field 7->a is not exist in data
- children.add(
+ predicates.add(
new LeafPredicate(
IsNull.INSTANCE,
DataTypes.INT().getLogicalType(),
@@ -187,28 +182,13 @@ public class SchemaEvolutionUtilTest {
Collections.emptyList()));
List<Predicate> filters =
- SchemaEvolutionUtil.createDataFilters(
- tableFields2, dataFields, Collections.singletonList(predicate));
+ SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates);
assert filters != null;
assertThat(filters.size()).isEqualTo(1);
- CompoundPredicate dataFilter = (CompoundPredicate) filters.get(0);
- assertThat(dataFilter.function()).isEqualTo(Or.INSTANCE);
- assertThat(dataFilter.children().size()).isEqualTo(3);
-
- LeafPredicate child1 = (LeafPredicate) dataFilter.children().get(0);
+ LeafPredicate child1 = (LeafPredicate) filters.get(0);
assertThat(child1.function()).isEqualTo(IsNull.INSTANCE);
assertThat(child1.fieldName()).isEqualTo("b");
assertThat(child1.index()).isEqualTo(1);
-
- LeafPredicate child2 = (LeafPredicate) dataFilter.children().get(1);
- assertThat(child2.function()).isEqualTo(AlwaysFalse.INSTANCE);
- assertThat(child2.fieldName()).isNull();
- assertThat(child2.index()).isEqualTo(0);
-
- LeafPredicate child3 = (LeafPredicate) dataFilter.children().get(2);
- assertThat(child3.function()).isEqualTo(AlwaysTrue.INSTANCE);
- assertThat(child3.fieldName()).isNull();
- assertThat(child3.index()).isEqualTo(0);
}
}